一文搞懂 Flink 处理 Barrier 全过程
上次我们讲到了 Flink Checkpoint Barrier 全流程 还有 Flink 消费消息的全流程
Flink 处理 Barrier 分两种:
关键就是 getNextNonBlocked 方法
当没有发生 barrier 对齐完成 这个动作时,currentBuffered == null,currentBuffered 就是当前要处理的 buffer,当 buffer 是数据的时候它就正常消费数据走 Flink 消费消息的全流程 ,当遇到 barrier 时,开始处理 barrier
numBarriersReceived 的默认值是0,所以第一个 barrier 进来后,会进入 beginNewAlignment 方法
当再有其他相同的 barrier 进入时,barrierId == currentCheckpointId 为 true,直到 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 时,触发 notifyCheckpoint,并报告 alignment buffer 以及 alignment time。(彩蛋: 稍后会更新 checkpoint 全流程欢迎关注 )。
如果其他的 channel 中的 barrier 延迟了,即 numBarriersReceived + numClosedChannels != totalNumberOfInputChannels,已经 receive barrier 对应的 channel 数据会进入 bufferBlocker。
bufferBlocker 是通过 ArrayDeque<BufferOrEvent> currentBuffers 来存储数据的,也就是说默认情况下 bufferBlocker.currentBuffers 会无限增大。
当 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 时,会先进行 releaseBlocksAndResetBarriers() 在进行 notifyCheckpoint。
releaseBlocksAndResetBarriers 主要的目的是要先消费已加入缓存中的数据。
当执行完 releaseBlocksAndResetBarriers 方法时,currentBuffered!=null 了,会进入
然后直接消费数据
一直消费缓存中的数据( 此过程会阻塞不会继续消费 inputGate 中的数据),直至消耗完成
完成了之后,就跟程序第一次运行至此一样,循环往复。