【Flink】Flink容错之Checkpoint机制源码分析
目录:
简介
Apache Flink 提供了Flink应用exactly-once保证的容错机制。Flink的容错机制是基于异步的分布式快照来实现的,参见论文Lightweight Asynchronous Snapshots for Distributed Dataflows。这些分布式快照可存储在JobManager或HDFS等可配置的存储后端。在遇到程序错误(或其他硬件错误)时,Flink停止分布式数据流,重置到最近成功的checkpoint,重放输入流和各算子的状态。保证被重启的并行数据流中处理的任何一个记录都不是checkpoint 状态之前的一部分,实现正好一次的容错机制。
Flink基于分布式快照的Checkpoint容错机制是Flink最大的亮点之一。(其他还有Flink的Window机制,乱序记录的处理等等)。接下来,本文从源码层面分析整个Flink Checkpoint的过程。
几个基本问题
在Checkpoint源码分析前首先介绍和Checkpoint相关的几个基本问题。
哪些对象需要容错
在Flink中,需要具备容错能力的有这样两类对象:function 和 operator。其中function通过实现Checkpointed接口(新版本改由CheckpointedFunction)来进行snapShot,operator通过实现StreamOperator接口来进行snapShot。这两个接口都包含了与快照有关的方法,如 snapshotState等。
State是什么
state一般指一个具体的task/operator的状态。而checkpoint则表示了一个Flink Job在某一时刻的一份全局状态快照,包含了所有task/operator的状态。Flink中包含两种基础的状态:Keyed State和Operator State。
Keyed State
顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,可能都对应一个state。
Operator State
与Keyed State不同,Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能会有很多个key,从而对应多个keyed state。
原始状态和Flink托管状态 (Raw and Managed State)
Keyed State和Operator State在Flink中可以以两种形式存在:原始状态和托管状态。
托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等,通过框架提供的接口,我们来更新和管理状态的值。而raw state即原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,读写状态内容使用的是字节流,对其内部数据结构一无所知(也就无法做一些优化)。通常在DataStream上的状态建议使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。详细的State描述请参见官网文档。
Barrier是什么
Flink 分布式快照的核心概念之一就是数据栅栏(barrier)。这些 barrier 被插入到数据流中,作为数据流的一部分和数据一起向下流动。barrier 不会干扰正常数据,数据流严格有序。一个 barrier 把数据流分割成两部分:一部分进入到当前快照,另一部分进入下一个快照。每一个 barrier 都带有快照 ID,并且 barrier 之前的数据都进入了此快照。barrier 不会干扰数据流处理,所以非常轻量。多个不同快照的多个 barrier 会在流中同时出现,即多个快照可能同时创建。
barrier 在数据源端插入,当快照n的 barrier 插入后,系统会记录当前快照位置值,然后 barrier 继续往下流动,当一个算子从其输入流接收到所有标识快照n 的 barrier 时,它会向其所有输出流发射一个标识快照n的barrier n。当 sink端从其输入流接收到所有 barrier n 时,它向CheckpointCoordinator 确认快照n 已完成。当所有 sink 端确认了这个快照,快照就被标识为完成。
注意到一点,当接收超过一个输入流的算子需要基于 barrier 对齐。就像上图所示的:
1、算子只要一接收到某个输入流的 barrier n,它就不能继续处理此数据流后续的数据,直到该算子接收到其余流的 barrier n。否则会将属于 snapshot n 的数据和 snapshot n+1的搞混
2、barrier n 所属的数据流先不处理,从这些数据流中接收到的数据被放入接收缓存
3、当从最后一个流中提取到 barrier n 时,该算子会发射出所有等待向后发送的数据,然后发射snapshot n对应的barrier n
4、经过以上步骤,算子恢复所有输入流数据的处理,优先处理输入缓存中的数据
FlinkCheckpoint过程
先放上自己根据源码和文档画出的流程图。图中的1.1 2.2等过程与下文是一一对应的。
第一阶段
Client端StreamGraph生成并转化为JobGraph的过程。这里不展开阐述了。注意到一点的是在JobGraph生成的时候会调用 configureCheckpointing方法,进行checkpoint配置。该方法一个非常重要的地方是triggerVertices.add(vertex.getID())这个操作,它只会将input的JobVertex加入到触发checkpoint的triggerVertices集合。这一步决定了后续CheckpointCoordinator发起的triggerCheckpoint的一系列逻辑只针对source端,注意到这一点非常重要。
1.1 JobGraph生成后会被提交给JobManager。
private void configureCheckpointing() {
CheckpointConfig cfg = streamGraph.getCheckpointConfig(); //取出Checkpoint的配置
if (cfg.isCheckpointingEnabled()) {
long interval = cfg.getCheckpointInterval(); //Checkpoint的时间间隔
// collect the vertices that receive "trigger checkpoint" messages.
// currently, these are all the sources
List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();
// collect the vertices that need to acknowledge the checkpoint
// currently, these are all vertices
List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size());
// collect the vertices that receive "commit checkpoint" messages
// currently, these are all vertices
List<JobVertexID> commitVertices = new ArrayList<JobVertexID>();
for (JobVertex vertex : jobVertices.values()) {
if (vertex.isInputVertex()) { //只有对source vertex,才加入triggerVertices,因为只需要在源头触发checkpoint
triggerVertices.add(vertex.getID());
}
// TODO: add check whether the user function implements the checkpointing interface
commitVertices.add(vertex.getID()); //当前所有节点都会加入commitVertices和ackVertices
ackVertices.add(vertex.getID());
}
JobSnapshottingSettings settings = new JobSnapshottingSettings( //生成JobSnapshottingSettings
triggerVertices, ackVertices, commitVertices, interval,
cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
cfg.getMaxConcurrentCheckpoints());
jobGraph.setSnapshotSettings(settings); //调用setSnapshotSettings
// if the user enabled checkpointing, the default number of exec retries is infinitive.
int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
if(executionRetries == -1) {
streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE);
}
}
}
小结:第一阶段主要是client端的JobGraph的生成并拿到所有checkpoint的配置信息,然后提交任务给JobManager。
第二阶段
2.1 JobManager调用submitJob方法时根据JobGraph构建ExecutionGraph,并拿到所有Checkpoint的配置,包括上一步提到的触发集合triggerVertices、ACK集合ackVertices和commit集合commitVertices等。同时,ExecutionGraph会初始化checkpointCoordinator,并为checkpointCoordinator 创建一个checkpoint定时任务触发的开关CheckpointCoordinatorDeActivator。
ExecutionGraph创建CheckpointCoordinator
// create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator = new CheckpointCoordinator(
jobInformation.getJobId(),
interval,
checkpointTimeout,
minPauseBetweenCheckpoints,
maxConcurrentCheckpoints,
externalizeSettings,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
checkpointIDCounter,
checkpointStore,
checkpointDir,
ioExecutor,
SharedStateRegistry.DEFAULT_FACTORY);
// register the master hooks on the checkpoint coordinator
for (MasterTriggerRestoreHook<?> hook : masterHooks) {
if (!checkpointCoordinator.addMasterHook(hook)) {
LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
}
}
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
if (interval != Long.MAX_VALUE) {
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}
ExecutionGraph初始化完毕后,JobManager的submit方法后续将ExecutionGraph异步提交。
// execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
// because it is a blocking operation
future {
try {
if (isRecovery) {
executionGraph.restoreLatestCheckpointedState() //恢复CheckpointedState
}
else {
//......
}
submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //把jobGraph放到submittedJobGraphs中track
}
jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) //告诉client,job提交成功
if (leaderElectionService.hasLeadership) {
executionGraph.scheduleForExecution(scheduler) //真正的调度executionGraph
} else {
//......
}
} catch {
//.......
}
}(context.dispatcher)
2.2 提交的flink job运行起来,job状态变动后,CheckpointCoordinatorDeActivator持续监听Job的状态。当监听到Job处于RUNNING的时候,将timer定时任务启动。
public class CheckpointCoordinatorDeActivator implements JobStatusListener {
private final CheckpointCoordinator coordinator;
public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
this.coordinator = checkNotNull(coordinator);
}
@Override
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
}
startCheckpointScheduler启动时做一些前置检查
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
// make sure all prior timers are cancelled
stopCheckpointScheduler();
periodicScheduling = true;
currentPeriodicTrigger = timer.scheduleAtFixedRate(
new ScheduledTrigger(),
baseInterval, baseInterval, TimeUnit.MILLISECONDS);
}
}
timer运行注册的任务,该任务是一个ScheduledTrigger
private final class ScheduledTrigger implements Runnable {
@Override
public void run() {
try {
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint.", e);
}
}
}
2.3 并走到triggerCheckpoint这一核心方法,触发一次checkpoint(注意这里针对source)
triggerCheckpoint方法会进行多次检查,其中对checkpoint检查的几个条件包括当前正在处理的并发检查点数目是否超过阈值,两次checkpoint的间隔时间是否过小等。如果这些条件不满足,则将当前检查点的触发请求不会执行。
CheckpointTriggerResult triggerCheckpoint(
long timestamp,
CheckpointProperties props,
String targetDirectory,
boolean isPeriodic) {
// Sanity check
if (props.externalizeCheckpoint() && targetDirectory == null) {
throw new IllegalStateException("No target directory specified to persist checkpoint to.");
}
// make some eager pre-checks
synchronized (lock) {
// abort if the coordinator has been shutdown in the meantime
if (shutdown) {
return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
}
// Don't allow periodic checkpoint if scheduling has been disabled
if (isPeriodic && !periodicScheduling) {
return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
}
// validate whether the checkpoint can be triggered, with respect to the limit of
// concurrent checkpoints, and the minimum time between checkpoints.
// these checks are not relevant for savepoints
if (!props.forceCheckpoint()) {
// sanity check: there should never be more than one trigger request queued
if (triggerRequestQueued) {
LOG.warn("Trying to trigger another checkpoint while one was queued already");
return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
}
// if too many checkpoints are currently in progress, we need to mark that a request is queued
if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
triggerRequestQueued = true;
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
}
// make sure the minimum interval between checkpoints has passed
final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
if (durationTillNextMillis > 0) {
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
// Reassign the new trigger to the currentPeriodicTrigger
currentPeriodicTrigger = timer.scheduleAtFixedRate(
new ScheduledTrigger(),
durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
}
以上这些检查处于基于锁机制实现的同步代码块中。
接着检查需要被触发检查点的task是否都处于运行状态:
// check if all tasks that we need to trigger are running.
// if not, abort the checkpoint
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee != null && ee.getState() == ExecutionState.RUNNING) {
executions[i] = ee;
} else {
LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex());
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
只要有一个task不满足条件,则不会触发检查点,并立即返回。
然后检查是否所有需要ack检查点的task都处于运行状态:
// next, check if all tasks that need to acknowledge the checkpoint are running.
// if not, abort the checkpoint
Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
for (ExecutionVertex ev : tasksToWaitFor) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
ev.getTaskNameWithSubtaskIndex());
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
如果有一个task不满足条件,则不会触发检查点,并立即返回。当以上条件都满足后就具备了具备触发一个检查点的基本条件。然后进入下一步,生成checkpointId:
final long checkpointID;
try {
// this must happen outside the coordinator-wide lock, because it communicates
// with external services (in HA mode) and may block for a while.
checkpointID = checkpointIdCounter.getAndIncrement();
}
catch (Throwable t) {
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
接着创建一个PendingCheckpoint对象:
final PendingCheckpoint checkpoint = new PendingCheckpoint(
job,
checkpointID,
timestamp,
ackTasks,
props,
targetDirectory,
executor);
该类表示一个待处理的检查点。
与此同时,会定义一个针对当前检查点超时进行资源清理的取消器canceller。该取消器主要是针对检查点没有释放资源的情况进行资源释放操作,同时还会调用triggerQueuedRequests方法启动一个触发检查点的定时任务,如果有的话(取决于triggerRequestQueued是否为true)。
然后会再次进入同步代码段,对上面的是否新建检查点的判断条件做二次检查,防止产生竞态条件。这里做二次检查的原因是,中间有一段关于获得checkpointId的代码,不在同步块中。
检查后,如果触发检查点的条件仍然是满足的,那么将上面创建的PendingCheckpoint对象加入集合中,同时会启动针对当前检查点的超时取消器:
pendingCheckpoints.put(checkpointID, checkpoint);
ScheduledFuture<?> cancellerHandle = timer.schedule(
canceller,
checkpointTimeout, TimeUnit.MILLISECONDS);
2.4 接下来会发送消息给task以真正触发检查点(基于Akka机制):
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
小结:第二阶段主要发生在CheckpointCoordinator,并最终将触发checkpoint的消息发送至TaskManager。
第三阶段
3.1 TaskManager收到上一阶段的triggerCheckpoint消息后,进行处理。主要是触发检查点屏障Barrier。
private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = {
actorMessage match {
case message: TriggerCheckpoint =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
val checkpointOptions = message.getCheckpointOptions
log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions)
} else {
log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
}
case message: NotifyCheckpointComplete =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
task.notifyCheckpointComplete(checkpointId)
} else {
log.debug(
s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
}
// unknown checkpoint message
case _ => unhandled(actorMessage)
}
}
task的triggerCheckpointBarrier也是一个核心方法,该方法在这一步骤主要是为source端打状态并发射初始barrier到下游。
public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
if (executionState == ExecutionState.RUNNING && invokable != null) {
if (invokable instanceof StatefulTask) {
// build a local closure
final StatefulTask statefulTask = (StatefulTask) invokable;
final String taskName = taskNameWithSubtask;
final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
Runnable runnable = new Runnable() {
@Override
public void run() {
// set safety net from the task's context for checkpointing thread
LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
try {
boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
new CheckpointDeclineTaskNotReadyException(taskName));
}
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " + checkpointID + " for " +
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for " +
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
} finally {
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
}
}
};
executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
}
else {
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).",
taskNameWithSubtask, executionId);
}
}
else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
// send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
}
}
该方法内部的调用栈如下:
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(
StreamTask.java:1162)
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:543)
即Task的triggerCheckpointBarrier会调用StreamTask.triggerCheckpoint方法,该方法只会在source端的trigger请求中被触发到,它会设置barrier对齐的一些参数并调用performCheckpoint去实际做checkpoint工作。performCheckpoint最终会调用算子的snapshotState方法,也就是最开始提到的state状态需要实现的抽象方法。该方法进行最终的打snapShot的过程,并存储到状态后端。
@Override
public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,
getContainingTask().getCancelables())) {
snapshotState(snapshotContext);
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
try {
snapshotInProgress.cancel();
} catch (Exception e) {
snapshotException.addSuppressed(e);
}
throw new Exception("Could not complete snapshot " + checkpointId + " for operator " +
getOperatorName() + '.', snapshotException);
}
return snapshotInProgress;
}
3.2 如果这几个步骤正确执行,最终同步或异步的调用
getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
把state snapshot发送到JobManager去,消息是AcknowledgeCheckpoint。
如果在调用
boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
出现错误,将发送消息DeclineCheckpoint到JobManager。
第四阶段
4.1 第四阶段发生在JobManager收到Task的checkpoint消息后的处理。
private def handleCheckpointMessage(actorMessage: AbstractCheckpointMessage): Unit = {
actorMessage match {
case ackMessage: AcknowledgeCheckpoint =>
val jid = ackMessage.getJob()
currentJobs.get(jid) match {
case Some((graph, _)) =>
val checkpointCoordinator = graph.getCheckpointCoordinator()
if (checkpointCoordinator != null) {
future {
try {
if (!checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
log.info("Received message for non-existing checkpoint " +
ackMessage.getCheckpointId)
}
}
catch {
case t: Throwable =>
log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t)
}
}(context.dispatcher)
}
else {
log.error(
s"Received AcknowledgeCheckpoint message for job $jid with no " +
s"CheckpointCoordinator")
}
case None => log.error(s"Received AcknowledgeCheckpoint for unavailable job $jid")
}
case declineMessage: DeclineCheckpoint =>
val jid = declineMessage.getJob()
currentJobs.get(jid) match {
case Some((graph, _)) =>
val checkpointCoordinator = graph.getCheckpointCoordinator()
if (checkpointCoordinator != null) {
future {
try {
checkpointCoordinator.receiveDeclineMessage(declineMessage)
}
catch {
case t: Throwable =>
log.error(s"Error in CheckpointCoordinator while processing $declineMessage", t)
}
}(context.dispatcher)
}
else {
log.error(
s"Received DeclineCheckpoint message for job $jid with no CheckpointCoordinator")
}
case None => log.error(s"Received DeclineCheckpoint for unavailable job $jid")
}
// unknown checkpoint message
case _ => unhandled(actorMessage)
}
}
如果收到是task端确认的AcknowledgeCheckpoint消息,将会调用CheckpointCoordinator的receiveAcknowledgeMessage方法并在方法中等待所有task的ack消息的确认.
if (checkpoint.isFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
}
如果全部task的ack消息得到确认后,将把pendingCheckpoint转为completedCheckpoint。
4.2 如果正确转化为completedCheckpoint则再次向task发送notifyCheckpointComplete消息告诉task该checkpoint已完成并被JobManager记录。
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}
4.3 Taskmanager收到notifyCheckpointComplete消息后触发task的notifyCheckpointComplete方法并最终调用到task上的所有operator的notifyCheckpointComplete。这样一次完整的Checkpoint过程就结束了。
case message: NotifyCheckpointComplete =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
task.notifyCheckpointComplete(checkpointId)
} else {
log.debug(
s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
}
运行时Checkpoint触发
注意到在4.1步JobManager收到Task的checkpoint消息后的处理,如果当前的消息是ACK的消息,JobManager必须等待所有task的ACK到达才会做PendingCheckpoint到CompletedCheckpoint的过程。而3.1步说明的是Source处Task触发Checkpoint过程,列出的调用栈同样是基于Source的。那么Source的下游的Task是如何触发Checkpoint的呢?
注意到前文我们叙述到Source端触发Checkpoint后会创建初始barrier并发射出去。而这个就是下游Task触发Checkpoint的关键。与Source端是由CheckpointCoordinator的timer定时器主动触发不同,下游的算子是在运行时触发的。当下游的算子收到上游的barrier后,它将会意识到当前正处于前一个检查点和后一个检查点之间。会进行基本问题3中说明的barrier对齐(exactly-once需要)。Flink中提供了CheckpointBarrierHandler类进行barrier事件的处理。在Exactly-Once要求的应用中,会使用CheckpointBarrierHandler的实现类BarrierBuffer进行barrier对齐和barrier事件的处理。
BarrierBuffer的核心方法是重写的getNextNonBlocked方法。
public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
//获得下一个待缓存的buffer或者barrier事件
BufferOrEvent next;
//如果当前的缓冲区为null,则从输入端获得
if (currentBuffered == null) {
next = inputGate.getNextBufferOrEvent();
}
//如果缓冲区不为空,则从缓冲区中获得数据
else {
next = currentBuffered.getNext();
//如果获得的数据为null,则表示缓冲区中已经没有更多地数据了
if (next == null) {
//清空当前缓冲区,获取已经新的缓冲区并打开它
completeBufferedSequence();
//递归调用,处理下一条数据
return getNextNonBlocked();
}
}
//获取到一条记录,不为null
if (next != null) {
//如果获取到得记录所在的channel已经处于阻塞状态,则该记录会被加入缓冲区
if (isBlocked(next.getChannelIndex())) {
// if the channel is blocked we, we just store the BufferOrEvent
bufferSpiller.add(next);
}
//如果该记录是一个正常的记录,而不是一个barrier(事件),则直接返回
else if (next.isBuffer()) {
return next;
}
//如果是一个barrier
else if (next.getEvent().getClass() == CheckpointBarrier.class) {
//并且当前流还未处于结束状态,则处理该barrier
if (!endOfStream) {
// process barriers only if there is a chance of the checkpoint completing
processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
}
}
else {
//如果它是一个事件,表示当前已到达分区末尾
if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
//以关闭的channel计数器加一
numClosedChannels++;
// no chance to complete this checkpoint
//此时已经没有机会完成该检查点,则解除阻塞
releaseBlocks();
}
//返回该事件
return next;
}
}
//next 为null 同时流结束标识为false
else if (!endOfStream) {
// end of stream. we feed the data that is still buffered
//置流结束标识为true
endOfStream = true;
//解除阻塞,这种情况下我们会看到,缓冲区的数据会被加入队列,并等待处理
releaseBlocks();
//继续获取下一个待处理的记录
return getNextNonBlocked();
}
else {
return null;
}
}
}
方法调用processBarrier进行barrier的处理。
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel cases
if (totalNumberOfInputChannels == 1) {
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
notifyCheckpoint(receivedBarrier);
}
return;
}
// -- general code path for multiple input channels --
if (numBarriersReceived > 0) {
// this is only true if some alignment is already progress and was not canceled
if (barrierId == currentCheckpointId) {
// regular case
onBarrier(channelIndex);
}
else if (barrierId > currentCheckpointId) {
// we did not complete the current checkpoint, another started before
LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.", barrierId, currentCheckpointId);
// let the task know we are not completing this
notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
// abort the current checkpoint
releaseBlocksAndResetBarriers();
// begin a the new checkpoint
beginNewAlignment(barrierId, channelIndex);
}
else {
// ignore trailing barrier from an earlier checkpoint (obsolete now)
return;
}
}
else if (barrierId > currentCheckpointId) {
// first barrier of a new checkpoint
beginNewAlignment(barrierId, channelIndex);
}
else {
// either the current checkpoint was canceled (numBarriers == 0) or
// this barrier is from an old subsumed checkpoint
return;
}
// check if we have all barriers - since canceled checkpoints always have zero barriers
// this can only happen on a non canceled checkpoint
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug("Received all barriers, triggering checkpoint {} at {}",
receivedBarrier.getId(), receivedBarrier.getTimestamp());
}
releaseBlocksAndResetBarriers();
notifyCheckpoint(receivedBarrier);
}
}
processBarrier中会区分单一的input channel和多input channel的情况,最终如果满足条件会触发notifyCheckpoint方法,该方法会调用到StreamTask的triggerCheckpointOnBarrier方法,注意与3.1步骤的triggerCheckpointBarrier方法不要混淆。
下游算子Checkpoint的方法调用栈如下:
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
可以看到与3.1步骤的区别在于触发的方法是triggerCheckpointOnBarrier,后续的步骤是一致的。一旦barrier对齐处理完毕,打完算子状态,该Task也会向JobManager发送ACK消息。当barrier发射到sink端,sink端处理完,所有sink端的算子状态的ACK消息也被确认才会调用4.1步骤对PendingCheckpoint的最终转化,并真正完成一次Checkpoint的过程。
本人系作者原创,欢迎Spark、Flink等大数据技术方面的探讨。
ps:公众号已正式接入图灵机器人,快去和我聊聊吧。
本文系本人个人公众号「梦回少年」原创发布,扫一扫加关注。