博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink - state
阅读量:6087 次
发布时间:2019-06-20

本文共 16710 字,大约阅读时间需要 55 分钟。

public class StreamTaskState implements Serializable, Closeable {    private static final long serialVersionUID = 1L;        private StateHandle
operatorState; private StateHandle
functionState; private HashMap
> kvStates;

Flink中state分为三种,

可以看到,StreamTaskState是对三种state的封装,

1. KVState

是最基本的state,

抽象是一对,KvState和KvStateSnapshot 

通过两个接口,互相转化

/** * Key/Value state implementation for user-defined state. The state is backed by a state * backend, which typically follows one of the following patterns: Either the state is stored * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the * state backend into some store (during checkpoints), or the key/value state is in fact backed * by an external key/value store as the state backend, and checkpoints merely record the * metadata of what is considered part of the checkpoint. *  * @param 
The type of the key. * @param
The type of the namespace. * @param
The type of {
@link State} this {
@code KvState} holds. * @param
The type of the {
@link StateDescriptor} for state {
@code S}. * @param
The type of {
@link AbstractStateBackend} that manages this {
@code KvState}. */public interface KvState
, Backend extends AbstractStateBackend> { /** * Sets the current key, which will be used when using the state access methods. * * @param key The key. */ void setCurrentKey(K key); /** * Sets the current namespace, which will be used when using the state access methods. * * @param namespace The namespace. */ void setCurrentNamespace(N namespace); /** * Creates a snapshot of this state. * * @param checkpointId The ID of the checkpoint for which the snapshot should be created. * @param timestamp The timestamp of the checkpoint. * @return A snapshot handle for this key/value state. * * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system * can react to failed snapshots. */ KvStateSnapshot
snapshot(long checkpointId, long timestamp) throws Exception; /** * Disposes the key/value state, releasing all occupied resources. */ void dispose();}
定义也比较简单,关键是snapshot接口,产生KvStateSnapshot
public interface KvStateSnapshot
, Backend extends AbstractStateBackend> extends StateObject { /** * Loads the key/value state back from this snapshot. * * @param stateBackend The state backend that created this snapshot and can restore the key/value state * from this snapshot. * @param keySerializer The serializer for the keys. * @param classLoader The class loader for user-defined types. * * @return An instance of the key/value state loaded from this snapshot. * * @throws Exception Exceptions can occur during the state loading and are forwarded. */ KvState
restoreState( Backend stateBackend, TypeSerializer
keySerializer, ClassLoader classLoader) throws Exception;}

KvStateSnapshot,对应于KvState,关键是restoreState接口

以具体的,FsState为例,

public abstract class AbstractFsState
> extends AbstractHeapState
{

可以看到AbstractFsState是继承AbstractHeapState的,因为对于FsState的状态也是cache在Heap中的,只是在snapshot的时候需要写文件

所以先看下AbstractHeapState,

/** * Base class for partitioned {
@link ListState} implementations that are backed by a regular * heap hash map. The concrete implementations define how the state is checkpointed. * * @param
The type of the key. * @param
The type of the namespace. * @param
The type of the values in the state. * @param
The type of State * @param
The type of StateDescriptor for the State S * @param
The type of the backend that snapshots this key/value state. */public abstract class AbstractHeapState
, Backend extends AbstractStateBackend> implements KvState
, State { /** Map containing the actual key/value pairs */ protected final HashMap
> state; //可以看到这里,多了个namespace的概念,避免key太容易重复 /** Serializer for the state value. The state value could be a List
, for example. */ protected final TypeSerializer
stateSerializer; /** The serializer for the keys */ protected final TypeSerializer
keySerializer; /** The serializer for the namespace */ protected final TypeSerializer
namespaceSerializer; /** This holds the name of the state and can create an initial default value for the state. */ protected final SD stateDesc; //StateDescriptor,用于放一些state的信息,比如default值 /** The current key, which the next value methods will refer to */ protected K currentKey; /** The current namespace, which the access methods will refer to. */ protected N currentNamespace = null; /** Cache the state map for the current key. */ protected Map
currentNSState; /** * Creates a new empty key/value state. * * @param keySerializer The serializer for the keys. * @param namespaceSerializer The serializer for the namespace. * @param stateDesc The state identifier for the state. This contains name * and can create a default state value. */ protected AbstractHeapState(TypeSerializer
keySerializer, TypeSerializer
namespaceSerializer, TypeSerializer
stateSerializer, SD stateDesc) { this(keySerializer, namespaceSerializer, stateSerializer, stateDesc, new HashMap
>()); }
 
AbstractFsState
public abstract class AbstractFsState
> extends AbstractHeapState
{ /** The file system state backend backing snapshots of this state */ private final FsStateBackend backend; public abstract KvStateSnapshot
createHeapSnapshot(Path filePath); // @Override public KvStateSnapshot
snapshot(long checkpointId, long timestamp) throws Exception { try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { // // serialize the state to the output stream DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out)); outView.writeInt(state.size()); for (Map.Entry
> namespaceState: state.entrySet()) { N namespace = namespaceState.getKey(); namespaceSerializer.serialize(namespace, outView); outView.writeInt(namespaceState.getValue().size()); for (Map.Entry
entry: namespaceState.getValue().entrySet()) { keySerializer.serialize(entry.getKey(), outView); stateSerializer.serialize(entry.getValue(), outView); } } outView.flush(); //真实的内容是刷到文件的 // create a handle to the state return createHeapSnapshot(out.closeAndGetPath()); //snapshot里面需要的只是path } }}

 

对于kv state,也分为好几类,valuestate,liststate,reducestate,foldstate,

简单起见,先看valuestate

public class FsValueState
extends AbstractFsState
, ValueStateDescriptor
> implements ValueState
{ @Override public V value() { if (currentNSState == null) { currentNSState = state.get(currentNamespace); //现初始化当前namespace的kv } if (currentNSState != null) { V value = currentNSState.get(currentKey); return value != null ? value : stateDesc.getDefaultValue(); //取出value,如果为null,从stateDesc中取出default } return stateDesc.getDefaultValue(); } @Override public void update(V value) { if (currentKey == null) { throw new RuntimeException("No key available."); } if (value == null) { clear(); return; } if (currentNSState == null) { currentNSState = new HashMap<>(); state.put(currentNamespace, currentNSState); } currentNSState.put(currentKey, value); //更新 } @Override public KvStateSnapshot
, ValueStateDescriptor
, FsStateBackend> createHeapSnapshot(Path filePath) { return new Snapshot<>(getKeySerializer(), getNamespaceSerializer(), stateSerializer, stateDesc, filePath); //以文件路径,创建snapshot }

 

继续看FsStateSnapshot

public abstract class AbstractFsStateSnapshot
> extends AbstractFileStateHandle implements KvStateSnapshot
{ public abstract KvState
createFsState(FsStateBackend backend, HashMap
> stateMap); // @Override public KvState
restoreState( FsStateBackend stateBackend, final TypeSerializer
keySerializer, ClassLoader classLoader) throws Exception { // state restore ensureNotClosed(); try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) { // make sure the in-progress restore from the handle can be closed registerCloseable(inStream); DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream); final int numKeys = inView.readInt(); HashMap
> stateMap = new HashMap<>(numKeys); for (int i = 0; i < numKeys; i++) { N namespace = namespaceSerializer.deserialize(inView); final int numValues = inView.readInt(); Map
namespaceMap = new HashMap<>(numValues); stateMap.put(namespace, namespaceMap); for (int j = 0; j < numValues; j++) { K key = keySerializer.deserialize(inView); SV value = stateSerializer.deserialize(inView); namespaceMap.put(key, value); } } return createFsState(stateBackend, stateMap); // } catch (Exception e) { throw new Exception("Failed to restore state from file system", e); } }}

 

FsValueState内部实现的snapshot
public static class Snapshot
extends AbstractFsStateSnapshot
, ValueStateDescriptor
> { private static final long serialVersionUID = 1L; public Snapshot(TypeSerializer
keySerializer, TypeSerializer
namespaceSerializer, TypeSerializer
stateSerializer, ValueStateDescriptor
stateDescs, Path filePath) { super(keySerializer, namespaceSerializer, stateSerializer, stateDescs, filePath); } @Override public KvState
, ValueStateDescriptor
, FsStateBackend> createFsState(FsStateBackend backend, HashMap
> stateMap) { return new FsValueState<>(backend, keySerializer, namespaceSerializer, stateDesc, stateMap); }}

 

2. FunctionState

stateHandle对于KvState,更为通用一些

/** * StateHandle is a general handle interface meant to abstract operator state fetching.  * A StateHandle implementation can for example include the state itself in cases where the state  * is lightweight or fetching it lazily from some external storage when the state is too large. */public interface StateHandle
extends StateObject { /** * This retrieves and return the state represented by the handle. * * @param userCodeClassLoader Class loader for deserializing user code specific classes * * @return The state represented by the handle. * @throws java.lang.Exception Thrown, if the state cannot be fetched. */ T getState(ClassLoader userCodeClassLoader) throws Exception;}

 

3. OperatorState,典型的是windowOperater的状态

OperatorState,也是用StateHandle作为,snapshot的抽象

 

看下这三种State如何做snapshot的

AbstractStreamOperator,看看和checkpoint相关的接口,可以看到只会snapshot KvState
@Overridepublic StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {    // here, we deal with key/value state snapshots        StreamTaskState state = new StreamTaskState();    if (stateBackend != null) {        HashMap
> partitionedSnapshots = stateBackend.snapshotPartitionedState(checkpointId, timestamp); if (partitionedSnapshots != null) { state.setKvStates(partitionedSnapshots); } } return state;}@Override@SuppressWarnings("rawtypes,unchecked")public void restoreState(StreamTaskState state) throws Exception { // restore the key/value state. the actual restore happens lazily, when the function requests // the state again, because the restore method needs information provided by the user function if (stateBackend != null) { stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates()); }}@Overridepublic void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { if (stateBackend != null) { stateBackend.notifyOfCompletedCheckpoint(checkpointId); }}

 

AbstractUdfStreamOperator
public abstract class AbstractUdfStreamOperator
extends AbstractStreamOperator
implements OutputTypeConfigurable

这个首先继承了AbstractStreamOperator,看下checkpoint相关的接口,

@Overridepublic StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {    StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp); //先执行super的snapshotOperatorState,即Kv state的snapshot    if (userFunction instanceof Checkpointed) {        @SuppressWarnings("unchecked")        Checkpointed
chkFunction = (Checkpointed
) userFunction; Serializable udfState; try { udfState = chkFunction.snapshotState(checkpointId, timestamp); //snapshot,function的状态 } catch (Exception e) { throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e); } if (udfState != null) { try { AbstractStateBackend stateBackend = getStateBackend(); StateHandle
handle = stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp); //调用stateBackend存储state,并返回snapshot state.setFunctionState(handle); } catch (Exception e) { throw new Exception("Failed to add the state snapshot of the function to the checkpoint: " + e.getMessage(), e); } } } return state;}@Overridepublic void restoreState(StreamTaskState state) throws Exception { super.restoreState(state); StateHandle
stateHandle = state.getFunctionState(); if (userFunction instanceof Checkpointed && stateHandle != null) { @SuppressWarnings("unchecked") Checkpointed
chkFunction = (Checkpointed
) userFunction; Serializable functionState = stateHandle.getState(getUserCodeClassloader()); if (functionState != null) { try { chkFunction.restoreState(functionState); } catch (Exception e) { throw new Exception("Failed to restore state to function: " + e.getMessage(), e); } } }}@Overridepublic void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { super.notifyOfCompletedCheckpoint(checkpointId); if (userFunction instanceof CheckpointListener) { ((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId); }}
可以看到这个operater,会snapshot kv state,和udf中的function的state

 

WindowOperator,典型的operater state
public class WindowOperator
extends AbstractUdfStreamOperator
> implements OneInputStreamOperator
, Triggerable, InputTypeConfigurable
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {    if (mergingWindowsByKey != null) {        TupleSerializer
> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} ); ListStateDescriptor
> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer); for (Map.Entry
> key: mergingWindowsByKey.entrySet()) { setKeyContext(key.getKey()); ListState
> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor); mergeState.clear(); key.getValue().persist(mergeState); } } StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); snapshotTimers(out); taskState.setOperatorState(out.closeAndGetHandle()); return taskState;}@Overridepublic void restoreState(StreamTaskState taskState) throws Exception { super.restoreState(taskState); final ClassLoader userClassloader = getUserCodeClassloader(); @SuppressWarnings("unchecked") StateHandle
inputState = (StateHandle
) taskState.getOperatorState(); DataInputView in = inputState.getState(userClassloader); restoreTimers(in);}

转载地址:http://bvtwa.baihongyu.com/

你可能感兴趣的文章
iOS10里的通知与推送
查看>>
# C 语言编写二进制/十六进制编辑器
查看>>
EMS SQL Management Studio for MySQL
查看>>
我的友情链接
查看>>
做母亲不容易
查看>>
拷贝对象属性
查看>>
好代码、坏代码之一
查看>>
整合查询和临时表
查看>>
运维工作的前五分钟
查看>>
基于虚拟用户的FTP服务
查看>>
分类算法---朴素贝叶斯算法
查看>>
编译安装LAMP(三)——编译安装php-5.4.13
查看>>
大数据
查看>>
c#中的事件和委托
查看>>
exchange 2013 邮件收发流程
查看>>
我没你想的那么坚强
查看>>
关于坚持
查看>>
Nginx负载均衡的四种模式
查看>>
MySQL 索引类型大汇总
查看>>
js create an object
查看>>