FlinkCheckpoint源码分析

Checkpoint模块分配

  • checkpoint

    • channel:数据管道,包含checkpoint的读写

    • hooks:钩子函数

    • metadata:元数据

Channel模块

核心对象

inputChannelInfo

/**
 * 最终的物理执行层inputchannel信息,包含一个inputgate index和inputchannel index,gate和channel 1对多
 * 标识inputChannel给定的一个子任务
 * Identifies {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel} in a given subtask.
 * Note that {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannelID InputChannelID}
 * can not be used because it is generated randomly.
 */
@Internal
public class InputChannelInfo implements Serializable {
	private static final long serialVersionUID = 1L;

	// inputgate index
	private final int gateIdx;
	// inputChannel index
	private final int inputChannelIdx;

	public InputChannelInfo(int gateIdx, int inputChannelIdx) {
		this.gateIdx = gateIdx;
		this.inputChannelIdx = inputChannelIdx;
	}

	public int getGateIdx() {
		return gateIdx;
	}

	public int getInputChannelIdx() {
		return inputChannelIdx;
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) {
			return true;
		}
		if (o == null || getClass() != o.getClass()) {
			return false;
		}
		final InputChannelInfo that = (InputChannelInfo) o;
		return gateIdx == that.gateIdx && inputChannelIdx == that.inputChannelIdx;
	}

	@Override
	public int hashCode() {
		return Objects.hash(gateIdx, inputChannelIdx);
	}

	@Override
	public String toString() {
		return "InputChannelInfo{" + "gateIdx=" + gateIdx + ", inputChannelIdx=" + inputChannelIdx + '}';
	}
}

ResultSubpartitionInfo

@Internal
public class ResultSubpartitionInfo implements Serializable {
	private static final long serialVersionUID = 1L;

	// partition index 一般一个并行度一个
	private final int partitionIdx;
	// sub partition index 一般为上游和下游并行度的乘积
	private final int subPartitionIdx;

	public ResultSubpartitionInfo(int partitionIdx, int subPartitionIdx) {
		this.partitionIdx = partitionIdx;
		this.subPartitionIdx = subPartitionIdx;
	}

	public int getPartitionIdx() {
		return partitionIdx;
	}

	public int getSubPartitionIdx() {
		return subPartitionIdx;
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) {
			return true;
		}
		if (o == null || getClass() != o.getClass()) {
			return false;
		}
		final ResultSubpartitionInfo that = (ResultSubpartitionInfo) o;
		return partitionIdx == that.partitionIdx && subPartitionIdx == that.subPartitionIdx;
	}

	@Override
	public int hashCode() {
		return Objects.hash(partitionIdx, subPartitionIdx);
	}

	@Override
	public String toString() {
		return "ResultSubpartitionInfo{" + "partitionIdx=" + partitionIdx + ", subPartitionIdx=" + subPartitionIdx + '}';
	}
}

state reader

  • 读取状态的数据管道,用于判断管道内是否还有状态,读取输入/输出数据。

@Internal
public interface ChannelStateReader extends AutoCloseable {

	/**
	 * Status of reading result.
	 * 读取状态的结果
	 */
	enum ReadResult { HAS_MORE_DATA, NO_MORE_DATA }

	/**
	 * Return whether there are any channel states to be read.
	 */
	boolean hasChannelStates();

	/**
	 * Put data into the supplied buffer to be injected into
	 * {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel}.
	 */
	ReadResult readInputData(InputChannelInfo info, Buffer buffer) throws IOException;

	/**
	 * Put data into the supplied buffer to be injected into
	 * {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}.
	 */
	ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) throws IOException;

	@Override
	void close() throws Exception;

	/**
	 * 不进行操作
	 */
	ChannelStateReader NO_OP = new ChannelStateReader() {

		@Override
		public boolean hasChannelStates() {
			return false;
		}

		@Override
		public ReadResult readInputData(InputChannelInfo info, Buffer buffer) {
			return ReadResult.NO_MORE_DATA;
		}

		@Override
		public ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) {
			return ReadResult.NO_MORE_DATA;
		}

		@Override
		public void close() {
		}
	};
}

public class ChannelStateReaderImpl implements ChannelStateReader {
	private static final Logger log = LoggerFactory.getLogger(ChannelStateReaderImpl.class);


	/**
	 * 俩个结果集更执行图有关,最终执行图从task-》inputchannel-》ResultSubpartition
	 */
	// 输出管道处理器
	private final Map<InputChannelInfo, ChannelStateStreamReader> inputChannelHandleReaders;
	// 结果子分区处理器
	private final Map<ResultSubpartitionInfo, ChannelStateStreamReader> resultSubpartitionHandleReaders;
	// 是否关闭
	private boolean isClosed = false;

	public ChannelStateReaderImpl(TaskStateSnapshot snapshot) {
		this(snapshot, new ChannelStateSerializerImpl());
	}

	ChannelStateReaderImpl(TaskStateSnapshot snapshot, ChannelStateSerializer serializer) {
		RefCountingFSDataInputStreamFactory streamFactory = new RefCountingFSDataInputStreamFactory(serializer);
		final HashMap<InputChannelInfo, ChannelStateStreamReader> inputChannelHandleReadersTmp = new HashMap<>();
		final HashMap<ResultSubpartitionInfo, ChannelStateStreamReader> resultSubpartitionHandleReadersTmp = new HashMap<>();
		for (Map.Entry<OperatorID, OperatorSubtaskState> e : snapshot.getSubtaskStateMappings()) {
			addReaders(inputChannelHandleReadersTmp, e.getValue().getInputChannelState(), streamFactory);
			addReaders(resultSubpartitionHandleReadersTmp, e.getValue().getResultSubpartitionState(), streamFactory);
		}
		inputChannelHandleReaders = inputChannelHandleReadersTmp; // memory barrier to allow another thread call clear()
		resultSubpartitionHandleReaders = resultSubpartitionHandleReadersTmp; // memory barrier to allow another thread call clear()
	}

	private <T> void addReaders(
			Map<T, ChannelStateStreamReader> readerMap,
			Collection<? extends AbstractChannelStateHandle<T>> handles,
			RefCountingFSDataInputStreamFactory streamFactory) {
		for (AbstractChannelStateHandle<T> handle : handles) {
			checkState(!readerMap.containsKey(handle.getInfo()), "multiple states exist for channel: " + handle.getInfo());
			readerMap.put(handle.getInfo(), new ChannelStateStreamReader(handle, streamFactory));
		}
	}

	@Override
	public boolean hasChannelStates() {
		return !(inputChannelHandleReaders.isEmpty() && resultSubpartitionHandleReaders.isEmpty());
	}

	@Override
	public ReadResult readInputData(InputChannelInfo info, Buffer buffer) throws IOException {
		Preconditions.checkState(!isClosed, "reader is closed");
		log.debug("readInputData, resultSubpartitionInfo: {} , buffer {}", info, buffer);
		ChannelStateStreamReader reader = inputChannelHandleReaders.get(info);
		// 数据放入Buffer
		return reader == null ? ReadResult.NO_MORE_DATA : reader.readInto(buffer);
	}

	@Override
	public ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) throws IOException {
		Preconditions.checkState(!isClosed, "reader is closed");
		log.debug("readOutputData, resultSubpartitionInfo: {} , bufferBuilder {}", info, bufferBuilder);
		ChannelStateStreamReader reader = resultSubpartitionHandleReaders.get(info);
		return reader == null ? ReadResult.NO_MORE_DATA : reader.readInto(bufferBuilder);
	}

	@Override
	public void close() throws Exception {
		isClosed = true;
		try (Closer closer = Closer.create()) {
			for (Map<?, ChannelStateStreamReader> map : asList(inputChannelHandleReaders, resultSubpartitionHandleReaders)) {
				for (ChannelStateStreamReader reader : map.values()) {
					closer.register(reader);
				}
				map.clear();
			}
		}
	}

}

state writer

  • checkpoint/savepoint写入器

public interface ChannelStateWriter extends Closeable {

	/**
	 * Channel state write result.
	 */
	class ChannelStateWriteResult {
		final CompletableFuture<Collection<InputChannelStateHandle>> inputChannelStateHandles;
		final CompletableFuture<Collection<ResultSubpartitionStateHandle>> resultSubpartitionStateHandles;

		ChannelStateWriteResult() {
			this(new CompletableFuture<>(), new CompletableFuture<>());
		}

		ChannelStateWriteResult(
				CompletableFuture<Collection<InputChannelStateHandle>> inputChannelStateHandles,
				CompletableFuture<Collection<ResultSubpartitionStateHandle>> resultSubpartitionStateHandles) {
			this.inputChannelStateHandles = inputChannelStateHandles;
			this.resultSubpartitionStateHandles = resultSubpartitionStateHandles;
		}

		public CompletableFuture<Collection<InputChannelStateHandle>> getInputChannelStateHandles() {
			return inputChannelStateHandles;
		}

		public CompletableFuture<Collection<ResultSubpartitionStateHandle>> getResultSubpartitionStateHandles() {
			return resultSubpartitionStateHandles;
		}

		public static final ChannelStateWriteResult EMPTY = new ChannelStateWriteResult(
			CompletableFuture.completedFuture(Collections.emptyList()),
			CompletableFuture.completedFuture(Collections.emptyList())
		);

		public void fail(Throwable e) {
			inputChannelStateHandles.completeExceptionally(e);
			resultSubpartitionStateHandles.completeExceptionally(e);
		}

		boolean isDone() {
			return inputChannelStateHandles.isDone() && resultSubpartitionStateHandles.isDone();
		}
	}

	/**
	 * Sequence number for the buffers that were saved during the previous execution attempt; then restored; and now are
	 * to be saved again (as opposed to the buffers received from the upstream or from the operator).
	 */
	int SEQUENCE_NUMBER_RESTORED = -1;

	/**
	 * Signifies that buffer sequence number is unknown (e.g. if passing sequence numbers is not implemented).
	 */
	int SEQUENCE_NUMBER_UNKNOWN = -2;

	/**
	 * Initiate write of channel state for the given checkpoint id.
	 */
	void start(long checkpointId, CheckpointOptions checkpointOptions);

	/**
	 * Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.consumer.InputChannel InputChannel}.
	 * Must be called after {@link #start} (long)} and before {@link #finishInput(long)}.
	 * Buffers are recycled after they are written or exception occurs.
	 * @param startSeqNum sequence number of the 1st passed buffer.
	 *                    It is intended to use for incremental snapshots.
	 *                    If no data is passed it is ignored.
	 * @param data zero or more <b>data</b> buffers ordered by their sequence numbers
	 * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED
	 * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN
	 */
	void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> data);

	/**
	 * Add in-flight buffers from the {@link org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition}.
	 * Must be called after {@link #start} and before {@link #finishOutput(long)}.
	 * Buffers are recycled after they are written or exception occurs.
	 * @param startSeqNum sequence number of the 1st passed buffer.
	 *                    It is intended to use for incremental snapshots.
	 *                    If no data is passed it is ignored.
	 * @param data zero or more <b>data</b> buffers ordered by their sequence numbers
	 * @throws IllegalArgumentException if one or more passed buffers {@link Buffer#isBuffer()  isn't a buffer}
	 * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_RESTORED
	 * @see org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter#SEQUENCE_NUMBER_UNKNOWN
	 */
	void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) throws IllegalArgumentException;

	/**
	 * Finalize write of channel state data for the given checkpoint id.
	 * Must be called after {@link #start(long, CheckpointOptions)} and all of the input data of the given checkpoint added.
	 * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained
	 * using {@link #getAndRemoveWriteResult}
	 */
	void finishInput(long checkpointId);

	/**
	 * Finalize write of channel state data for the given checkpoint id.
	 * Must be called after {@link #start(long, CheckpointOptions)} and all of the output data of the given checkpoint added.
	 * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained
	 * using {@link #getAndRemoveWriteResult}
	 */
	void finishOutput(long checkpointId);

	/**
	 * Aborts the checkpoint and fails pending result for this checkpoint.
	 * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not supposed to be called afterwards.
	 */
	void abort(long checkpointId, Throwable cause, boolean cleanup);

	/**
	 * Must be called after {@link #start(long, CheckpointOptions)} once.
	 * @throws IllegalArgumentException if the passed checkpointId is not known.
	 */
	ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) throws IllegalArgumentException;

	ChannelStateWriter NO_OP = new NoOpChannelStateWriter();

	/**
	 * No-op implementation of {@link ChannelStateWriter}.
	 */
	class NoOpChannelStateWriter implements ChannelStateWriter {
		@Override
		public void start(long checkpointId, CheckpointOptions checkpointOptions) {
		}

		@Override
		public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> data) {
		}

		@Override
		public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
		}

		@Override
		public void finishInput(long checkpointId) {
		}

		@Override
		public void finishOutput(long checkpointId) {
		}

		@Override
		public void abort(long checkpointId, Throwable cause, boolean cleanup) {
		}

		@Override
		public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
			return new ChannelStateWriteResult(
				CompletableFuture.completedFuture(Collections.emptyList()),
				CompletableFuture.completedFuture(Collections.emptyList()));
		}

		@Override
		public void close() {
		}
	}
}

// 实现类
public class ChannelStateWriterImpl implements ChannelStateWriter {

	private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
	private static final int DEFAULT_MAX_CHECKPOINTS = 1000; // includes max-concurrent-checkpoints + checkpoints to be aborted (scheduled via mailbox)

	private final String taskName;
	private final ChannelStateWriteRequestExecutor executor;
	private final ConcurrentMap<Long, ChannelStateWriteResult> results;
	private final int maxCheckpoints;

	/**
	 * Creates a {@link ChannelStateWriterImpl} with {@link #DEFAULT_MAX_CHECKPOINTS} as {@link #maxCheckpoints}.
	 */
	public ChannelStateWriterImpl(String taskName, CheckpointStorageWorkerView streamFactoryResolver) {
		this(taskName, streamFactoryResolver, DEFAULT_MAX_CHECKPOINTS);
	}

	/**
	 * Creates a {@link ChannelStateWriterImpl} with {@link ChannelStateSerializerImpl default} {@link ChannelStateSerializer},
	 * and a {@link ChannelStateWriteRequestExecutorImpl}.
	 *  @param taskName
	 * @param streamFactoryResolver a factory to obtain output stream factory for a given checkpoint
	 * @param maxCheckpoints        maximum number of checkpoints to be written currently or finished but not taken yet.
	 */
	ChannelStateWriterImpl(String taskName, CheckpointStorageWorkerView streamFactoryResolver, int maxCheckpoints) {
		this(
			taskName,
			new ConcurrentHashMap<>(maxCheckpoints),
			new ChannelStateWriteRequestExecutorImpl(taskName, new ChannelStateWriteRequestDispatcherImpl(streamFactoryResolver, new ChannelStateSerializerImpl())),
			maxCheckpoints);
	}

	ChannelStateWriterImpl(
			String taskName,
			ConcurrentMap<Long, ChannelStateWriteResult> results,
			ChannelStateWriteRequestExecutor executor,
			int maxCheckpoints) {
		this.taskName = taskName;
		this.results = results;
		this.maxCheckpoints = maxCheckpoints;
		this.executor = executor;
	}

	@Override
	public void start(long checkpointId, CheckpointOptions checkpointOptions) {
		LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions);
		ChannelStateWriteResult result = new ChannelStateWriteResult();
		// 发送checkpoint开启请求
		ChannelStateWriteResult put = results.computeIfAbsent(checkpointId, id -> {
			Preconditions.checkState(results.size() < maxCheckpoints, String.format("%s can't start %d, results.size() > maxCheckpoints: %d > %d", taskName, checkpointId, results.size(), maxCheckpoints));
			// 发送请求
			enqueue(new CheckpointStartRequest(checkpointId, result, checkpointOptions.getTargetLocation()), false);
			return result;
		});
		Preconditions.checkArgument(put == result, taskName + " result future already present for checkpoint " + checkpointId);
	}

	@Override
	public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
		LOG.debug(
			"{} adding input data, checkpoint {}, channel: {}, startSeqNum: {}",
			taskName,
			checkpointId,
			info,
			startSeqNum);
		// 将inputChannel信息写入buffer迭代器
		enqueue(write(checkpointId, info, iterator), false);
	}

	@Override
	public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {
		LOG.debug(
			"{} adding output data, checkpoint {}, channel: {}, startSeqNum: {}, num buffers: {}",
			taskName,
			checkpointId,
			info,
			startSeqNum,
			data == null ? 0 : data.length);
		// 发送写请求
		enqueue(write(checkpointId, info, data), false);
	}

	@Override
	public void finishInput(long checkpointId) {
		LOG.debug("{} finishing input data, checkpoint {}", taskName, checkpointId);
		// 发送完成请求input
		enqueue(completeInput(checkpointId), false);
	}

	@Override
	public void finishOutput(long checkpointId) {
		LOG.debug("{} finishing output data, checkpoint {}", taskName, checkpointId);
		enqueue(completeOutput(checkpointId), false);
	}

	@Override
	public void abort(long checkpointId, Throwable cause, boolean cleanup) {
		LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
		// 中断开始的checkpoint和未开始的
		enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), true); // abort already started
		enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), false); // abort enqueued but not started
		if (cleanup) {
			results.remove(checkpointId);
		}
	}

	@Override
	public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
		LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
		ChannelStateWriteResult result = results.remove(checkpointId);
		Preconditions.checkArgument(result != null, taskName + " channel state write result not found for checkpoint " + checkpointId);
		return result;
	}

	public void open() {
		executor.start();
	}

	@Override
	public void close() throws IOException {
		LOG.debug("close, dropping checkpoints {}", results.keySet());
		results.clear();
		executor.close();
	}

	private void enqueue(ChannelStateWriteRequest request, boolean atTheFront) {
		// state check and previous errors check are performed inside the worker
		try {
			if (atTheFront) {
				executor.submitPriority(request);
			} else {
				// 提交checkpount请求
				executor.submit(request);
			}
		} catch (Exception e) {
			RuntimeException wrapped = new RuntimeException("unable to send request to worker", e);
			try {
				request.cancel(e);
			} catch (Exception cancelException) {
				wrapped.addSuppressed(cancelException);
			}
			throw wrapped;
		}
	}

	private static String buildBufferTypeErrorMessage(Buffer buffer) {
		try {
			AbstractEvent event = EventSerializer.fromBuffer(buffer, ChannelStateWriterImpl.class.getClassLoader());
			return String.format("Should be buffer but [%s] found", event);
		}
		catch (Exception ex) {
			return "Should be buffer";
		}
	}
}

writer requset dispatcher

/**
 * 状态写请求分发器
 */
interface ChannelStateWriteRequestDispatcher {

	// 分发请求
	void dispatch(ChannelStateWriteRequest request) throws Exception;

	// 错误
	void fail(Throwable cause);

	ChannelStateWriteRequestDispatcher NO_OP = new ChannelStateWriteRequestDispatcher() {
		@Override
		public void dispatch(ChannelStateWriteRequest request) {
		}

		@Override
		public void fail(Throwable cause) {
		}
	};
}

Hooks模块

  • 核心执行checkpoint模块,包含出发ck,恢复ck等等

triggerHook

  • 触发checkpoint

restoreMasterHooks

  • 恢复checkpoint状态

Metadata模块

  • 反序列化checkpoint元数据,提供多种反序列化器

CheckpointMetadata

  • checkpoint元数据数据模型

public class CheckpointMetadata implements Disposable {

	/** The checkpoint ID. */
	private final long checkpointId;

	/** The operator states. */
	// 算子状态
	private final Collection<OperatorState> operatorStates;

	/** The states generated by the CheckpointCoordinator. */
	// checkpoint协调器生成的状态
	private final Collection<MasterState> masterStates;

	public CheckpointMetadata(long checkpointId, Collection<OperatorState> operatorStates, Collection<MasterState> masterStates) {
		this.checkpointId = checkpointId;
		this.operatorStates = operatorStates;
		this.masterStates = checkNotNull(masterStates, "masterStates");
	}

	public long getCheckpointId() {
		return checkpointId;
	}

	public Collection<OperatorState> getOperatorStates() {
		return operatorStates;
	}

	public Collection<MasterState> getMasterStates() {
		return masterStates;
	}

	@Override
	public void dispose() throws Exception {
		for (OperatorState operatorState : operatorStates) {
			operatorState.discardState();
		}
		operatorStates.clear();
		masterStates.clear();
	}

	@Override
	public String toString() {
		return "Checkpoint Metadata";
	}
}

MetadataSerializer

  • 统一元数据序列化器,提供savepoint反序列化接口

public interface MetadataSerializer extends Versioned {

	/**
	 * 根据输入流反序列化savepoint
	 * Deserializes a savepoint from an input stream.
	 *
	 * @param dis Input stream to deserialize savepoint from
	 * @param  userCodeClassLoader the user code class loader
	 * @param externalPointer the external pointer of the given checkpoint
	 * @return The deserialized savepoint
	 * @throws IOException Serialization failures are forwarded
	 */
	CheckpointMetadata deserialize(DataInputStream dis, ClassLoader userCodeClassLoader, String externalPointer) throws IOException;
}

MetadataV2V3SerializerBase

  • 基本checkpoint元数据布局

 *  +--------------+---------------+-----------------+
 *  | checkpointID | master states | operator states |
 *  +--------------+---------------+-----------------+
  • master state

 * +--------------+---------------------+---------+------+---------------+
 *  | magic number | num remaining bytes | version | name | payload bytes |
 *  +--------------+---------------------+---------+------+---------------+
  • 主要用来序列化、反序列化基本checkpoint metadata和master state、operator state等

checkpoint配置转换

StreamExecutionEnvironment配置

Checkpoint从StreamNode传递到JobGraph

-->StreamExecutionEnvironment#enableCheckpointing
	-->PipelineExecutor#execute
		-->FlinkPipelineTranslationUtil#getJobGraph
			-->StreamGraphTranslator#translateToJobGraph
				-->StreamGraph#getJobGraph
					-->StreamingJobGraphGenerator#createJobGraph
						-->StreamingJobGraphGenerator#configureCheckpointing
  • 最终将streamGraph的CheckpointConfig转换为jobGraphSnapshotSettings(JobCheckpointingSettings)

Checkpoint从JobGraph传递到ExecutionGraph

-->ExecutionGraphBuilder#buildGraph
	-->ExecutionGraph#enableCheckpointing

Checkpoint机制

恢复状态流程

  • 生成ExecutionGraph时会尝试恢复历史状态

private ExecutionGraph createAndRestoreExecutionGraph(
		JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
		ShuffleMaster<?> shuffleMaster,
		JobMasterPartitionTracker partitionTracker,
		ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {

		ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker);

		final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();

		// 恢复checkpoint状态
		if (checkpointCoordinator != null) {
			// check whether we find a valid checkpoint
			if (!checkpointCoordinator.restoreLatestCheckpointedStateToAll(
				new HashSet<>(newExecutionGraph.getAllVertices().values()),
				false)) {

				// check whether we can restore from a savepoint
				// 检查恢复状态
				tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
			}
		}

		return newExecutionGraph;
	}

// 恢复savepoint
private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
		if (savepointRestoreSettings.restoreSavepoint()) {
			final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator();
			if (checkpointCoordinator != null) {
				// 恢复savepoint
				checkpointCoordinator.restoreSavepoint(
					savepointRestoreSettings.getRestorePath(),
					savepointRestoreSettings.allowNonRestoredState(),
					executionGraphToRestore.getAllVertices(),
					userCodeLoader);
			}
		}
	}
  • 恢复savepoint

public boolean restoreSavepoint(
			String savepointPointer,
			boolean allowNonRestored,
			Map<JobVertexID, ExecutionJobVertex> tasks,
			ClassLoader userClassLoader) throws Exception {

		Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");

		LOG.info("Starting job {} from savepoint {} ({})",
				job, savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));

		// 解析checkpoint path
		final CompletedCheckpointStorageLocation checkpointLocation = checkpointStorage.resolveCheckpoint(savepointPointer);

		// Load the savepoint as a checkpoint into the system
		// 加载校验checkpoint
		CompletedCheckpoint savepoint = Checkpoints.loadAndValidateCheckpoint(
				job, tasks, checkpointLocation, userClassLoader, allowNonRestored);

		// 添加到完成的checkpoint存储,底层存储为双端队列
		completedCheckpointStore.addCheckpoint(savepoint);

		// Reset the checkpoint ID counter
		long nextCheckpointId = savepoint.getCheckpointID() + 1;
		// 设置下次checkpointid
		checkpointIdCounter.setCount(nextCheckpointId);

		LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);

		// 恢复最后checkpoint状态
		return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, true, allowNonRestored);
	}
  • 恢复最新的checkpoint状态

private boolean restoreLatestCheckpointedStateInternal(
		final Set<ExecutionJobVertex> tasks,
		final boolean restoreCoordinators,
		final boolean errorIfNoCheckpoint,
		final boolean allowNonRestoredState) throws Exception {

		synchronized (lock) {
			if (shutdown) {
				throw new IllegalStateException("CheckpointCoordinator is shut down");
			}

			// We create a new shared state registry object, so that all pending async disposal requests from previous
			// runs will go against the old object (were they can do no harm).
			// This must happen under the checkpoint lock.
			sharedStateRegistry.close();
			sharedStateRegistry = sharedStateRegistryFactory.create(executor);

			// Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery
			// 恢复checkpoint
			completedCheckpointStore.recover();

			// Now, we re-register all (shared) states from the checkpoint store with the new registry
			for (CompletedCheckpoint completedCheckpoint : completedCheckpointStore.getAllCheckpoints()) {
				// 注册共享状态之后恢复
				completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
			}

			LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry);

			// Restore from the latest checkpoint
			// 获取最后的checkpoint
			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery);

			if (latest == null) {
				if (errorIfNoCheckpoint) {
					throw new IllegalStateException("No completed checkpoint available");
				} else {
					LOG.debug("Resetting the master hooks.");
					MasterHooks.reset(masterHooks.values(), LOG);

					return false;
				}
			}

			LOG.info("Restoring job {} from latest valid checkpoint: {}.", job, latest);

			// re-assign the task states
			final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();

			// 用于恢复savepoint和恢复checkpoint
			StateAssignmentOperation stateAssignmentOperation =
					new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);

			// 分配状态
			stateAssignmentOperation.assignStates();

			// call master hooks for restore. we currently call them also on "regional restore" because
			// there is no other failure notification mechanism in the master hooks
			// ultimately these should get removed anyways in favor of the operator coordinators

			MasterHooks.restoreMasterHooks(
					masterHooks,
					latest.getMasterHookStates(),
					latest.getCheckpointID(),
					allowNonRestoredState,
					LOG);

			if (restoreCoordinators) {
				restoreStateToCoordinators(operatorStates);
			}

			// update metrics

			if (statsTracker != null) {
				long restoreTimestamp = System.currentTimeMillis();
				RestoredCheckpointStats restored = new RestoredCheckpointStats(
					latest.getCheckpointID(),
					latest.getProperties(),
					restoreTimestamp,
					latest.getExternalPointer());

				statsTracker.reportRestoredCheckpoint(restored);
			}

			return true;
		}
	}

CheckpointCoordinator机制

初始化CheckpointCoordinator

public CheckpointCoordinator(
			JobID job,
			CheckpointCoordinatorConfiguration chkConfig,
			ExecutionVertex[] tasksToTrigger,
			ExecutionVertex[] tasksToWaitFor,
			ExecutionVertex[] tasksToCommitTo,
			Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
			CheckpointIDCounter checkpointIDCounter,
			CompletedCheckpointStore completedCheckpointStore,
			StateBackend checkpointStateBackend,
			Executor executor,
			ScheduledExecutor timer,
			SharedStateRegistryFactory sharedStateRegistryFactory,
			CheckpointFailureManager failureManager,
			Clock clock) {

		// sanity checks
		checkNotNull(checkpointStateBackend);

		// max "in between duration" can be one year - this is to prevent numeric overflows
		// 获取最小checkpoint暂停间隔
		long minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints();
		if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
			minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
		}

		// it does not make sense to schedule checkpoints more often then the desired
		// time between checkpoints
		// 获取基础间隔
		long baseInterval = chkConfig.getCheckpointInterval();
		// 如果基础间隔小雨最小checkpoint暂停间隔则基础间隔等于最小暂停
		if (baseInterval < minPauseBetweenCheckpoints) {
			baseInterval = minPauseBetweenCheckpoints;
		}

		this.job = checkNotNull(job);
		this.baseInterval = baseInterval;
		this.checkpointTimeout = chkConfig.getCheckpointTimeout();
		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
		this.tasksToTrigger = checkNotNull(tasksToTrigger);
		this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
		this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
		this.coordinatorsToCheckpoint = Collections.unmodifiableCollection(coordinatorsToCheckpoint);
		this.pendingCheckpoints = new LinkedHashMap<>();
		this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
		this.executor = checkNotNull(executor);
		this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
		this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
		this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
		this.failureManager = checkNotNull(failureManager);
		this.clock = checkNotNull(clock);
		this.isExactlyOnceMode = chkConfig.isExactlyOnce();
		this.unalignedCheckpointsEnabled = chkConfig.isUnalignedCheckpointsEnabled();

		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
		this.masterHooks = new HashMap<>();

		this.timer = timer;

		// 解析checkpoint参数
		this.checkpointProperties = CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy());

		try {
			// 创建checkpoint存储
			this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
			// 初始化checkpoint
			checkpointStorage.initializeBaseLocations();
		} catch (IOException e) {
			throw new FlinkRuntimeException("Failed to create checkpoint storage at checkpoint coordinator side.", e);
		}

		try {
			// Make sure the checkpoint ID enumerator is running. Possibly
			// issues a blocking call to ZooKeeper.
			// 启动checkpointId计数器,提供基于Atomic内存计数器和Zookeeper计数器
			checkpointIDCounter.start();
		} catch (Throwable t) {
			throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
		}
		this.requestDecider = new CheckpointRequestDecider(
			chkConfig.getMaxConcurrentCheckpoints(),
			this::rescheduleTrigger,
			this.clock,
			this.minPauseBetweenCheckpoints,
			this.pendingCheckpoints::size);
	}

触发savepoint

public CompletableFuture<CompletedCheckpoint> triggerSavepoint(@Nullable final String targetLocation) {
		// 是否不对齐checkpoint
		final CheckpointProperties properties = CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
		// 触发savepoint
		return triggerSavepointInternal(properties, false, targetLocation);
	}


private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
			final CheckpointProperties checkpointProperties,
			final boolean advanceToEndOfEventTime,
			@Nullable final String targetLocation) {

		checkNotNull(checkpointProperties);

		// TODO, call triggerCheckpoint directly after removing timer thread
		// for now, execute the trigger in timer thread to avoid competition
		// 记录结果,异步调用
		final CompletableFuture<CompletedCheckpoint> resultFuture = new CompletableFuture<>();
		// 触发checkpoint
		timer.execute(() -> triggerCheckpoint(
			checkpointProperties,
			targetLocation,
			false,
			advanceToEndOfEventTime)
		.whenComplete((completedCheckpoint, throwable) -> {
			if (throwable == null) {
				resultFuture.complete(completedCheckpoint);
			} else {
				resultFuture.completeExceptionally(throwable);
			}
		}));
		return resultFuture;
	}

	public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
			CheckpointProperties props,
			@Nullable String externalSavepointLocation,
			boolean isPeriodic,
			boolean advanceToEndOfTime) {

		if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
			return FutureUtils.completedExceptionally(new IllegalArgumentException(
				"Only synchronous savepoints are allowed to advance the watermark to MAX."));
		}

		// 创建checkpoint请求
		CheckpointTriggerRequest request = new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic, advanceToEndOfTime);
		// 关闭请求并且执行
		chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);
		return request.onCompletionPromise;
	}

snapshotTaskState

private void snapshotTaskState(
		long timestamp,
		long checkpointID,
		CheckpointStorageLocation checkpointStorageLocation,
		CheckpointProperties props,
		Execution[] executions,
		boolean advanceToEndOfTime) {

		final CheckpointOptions checkpointOptions = new CheckpointOptions(
			props.getCheckpointType(),
			checkpointStorageLocation.getLocationReference(),
			isExactlyOnceMode,
			props.getCheckpointType() == CheckpointType.CHECKPOINT && unalignedCheckpointsEnabled);

		// send the messages to the tasks that trigger their checkpoint
		for (Execution execution: executions) {
			// 如果是同步
			if (props.isSynchronous()) {
				execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
			} else {
				// 否则触发checkpoint
				execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
			}
		}
	}

checkpoint生命周期管理

  • 发送ack消息通知checkpoint完成

private void sendAcknowledgeMessages(long checkpointId, long timestamp) {
		// commit tasks
		// 通知checkpoint完成
		for (ExecutionVertex ev : tasksToCommitTo) {
			Execution ee = ev.getCurrentExecutionAttempt();
			if (ee != null) {
				ee.notifyCheckpointComplete(checkpointId, timestamp);
			}
		}

		// commit coordinators,提交对应的coordinators
		for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) {
			coordinatorContext.checkpointComplete(checkpointId);
		}
	}
  • 发送中断消息,通知checkpoint中断

private void sendAbortedMessages(long checkpointId, long timeStamp) {
		// send notification of aborted checkpoints asynchronously.
		executor.execute(() -> {
			// send the "abort checkpoint" messages to necessary vertices.
			for (ExecutionVertex ev : tasksToCommitTo) {
				Execution ee = ev.getCurrentExecutionAttempt();
				if (ee != null) {
					ee.notifyCheckpointAborted(checkpointId, timeStamp);
				}
			}
		});
	}

最后更新于