深圳幻海软件技术有限公司 欢迎您!

万字长文解析Kafka分区工作机制

2023-02-28

Kafka的消息发送与消息消费与分区关联密切,我们从这篇文章开始讲点学习分区相关的知识,本篇文章将重点介绍分区内部的工作机制,即分区状态机运转机制。1、Kafka分区状态Kafka内部分区的运转机制具体实现为PartitionStateMachine,从这个类的注释上来看可以得知Kafka分区的状态

Kafka的消息发送与消息消费与分区关联密切,我们从这篇文章开始讲点学习分区相关的知识,本篇文章将重点介绍分区内部的工作机制,即分区状态机运转机制。

1、Kafka分区状态

Kafka内部分区的运转机制具体实现为PartitionStateMachine,从这个类的注释上来看可以得知Kafka分区的状态共有四个,它们分别是:

  • NonExistentPartition 表示分区不存在,通常是该分区从未创建过或者创建后被删除。
  • NewPartition 分区已创建,即分配完成了副本,但还未进行分区Leader选举,即还不存在Leader分区与ISR集合,前一个有效状态为NonExistentPartition。
  • OnlinePartition 分区处于在线时的状态,表示已经完成了分区选举,成功选举出Leader,此时可以进行消息发送与消息消费,前一个有效状态为NewPartition/OfflinePartition。
  • OfflinePartition分区处于离线时状态,表示选举出来的Leader失效了,例如Leader所在的Broker宕机,前一个有效状态为NewPartition/OnlinePartition。

关于分区的状态变如下所示:

2、Kafka分区状态机

接下来本文的行为思路,将会通过源码阅读的方式,深入PartitionStateMachine的实现细节,从而提炼出分区变更实现要点,帮助我们更好的运维kafka。

2.1 状态机启动流程

状态机的启动流程定义在PartitionStateMachine的startup方法,该方法的调用时机:一个新的Broker通过控制器选举成为新的Controller时会被调用。

该方法的声明如下:

状态机的启动主要包括两个步骤:

  • 初始化分区的状态
  • 触发分区状态向OnlinePartition转换

接下来将详细探讨实现细节。

2.1.1 分区状态初始化

首先我们来看一下分区的初始化流程,具体代码如下所示:

该方法的实现要点:

  • 在KafkaController中使用来ControllerContext用来在内存中存储与控制器相关的数据结构,其中Map[String, mutable.Map[Int, Seq[Int]]]  partitionReplicaAssignmentUnderlying存储了当前集群中所有的分区信息(主题名称、分区编号,副本数情况),既然是控制器重新选举,故需要重新初始化所有的分区。
  • 然后根据 Map[TopicPartition, LeaderIsrAndControllerEpoch] partitionLeadershipInfo中存储各个分区当前的运行时状态,这里分成三种情况:

如果partitionLeadershipInfo中并不存在主题分区的Leaer和ISR信息,驱动状态从NonExistentPartition转换为NewPartition。

如果partitionLeadershipInfo中存在主题分区的leader信息,但对应的Broker已经为下线状态,则驱动状态从NonExistentPartition转换为OfflinePartition。

如果partitionLeadershipInfo中存在主题分区的leader信息,但对应的Broker已经为下线状态,则将状态从NonExistentPartition先转换为OfflinePartition。

值得注意的是,调用changeStateTo方法改变分区的状态,仅仅只是在内存中更新状态,其具体实现如图所示:

具体的做好是将需要更新的状态存储到Map[TopicPartition, PartitionState] 中。

2.1.2 分区状态运转机制

在内存中根据当前维护的LeaderAndISR信息后将状态存储到本地内存后,接下来就是将分区状态向Online状态转换,具体的代码实现见PartitionStateMachine的triggerOnlinePartitionStateChange方法,代码如下所示:

该方法的实现要点是在内存缓存中(Map[TopicPartition, PartitionState] )挑选出状态处于OfflinePartition与NewPartition并且未被删除的分区,驱动状态机,调用handleStateChanges方法尝试向OnlinePartition分区转化。

该方法主要做如下两件事情:

  • 调用PartitionStateMachine的doHandleStateChanges的方法,驱动分区状态机的转换。
  • 然后调用ControllerBrokerRequestBatch的sendRequestsToBrokers方法,实现元信息在其他Broker上的同步

要想清晰而全面的了解分区状态的变更,我还给出了Kafka中所有调用handleStateChanges的调用入口,在后续深入研究Kafka相关机制时会再次一一提及,调用链如下图所示:

由于篇幅的问题,分区信息在其他Broker中的状态同步将在下一篇文章中介绍。

PartitionStateMachine的doHandleStateChanges方法在上一篇中已经详细介绍,尴尬,在Kafka生产实践中又出问题了 中详细介绍过,在这里我稍微总结提炼一下:

目标状态为NewPartition、OfflinePartition、NonExistentPartition 这三个状态并没有什么复杂的实现逻辑,只是更新内存中的状态,并在state-change.log文件中将输出状态变更日志,只有目标状态为OnlinePartition时才会详细的处理逻辑。

但或许你有一个疑问,状态变更为NewPartition,什么时候会向OnlinePartition状态转换呢?其实通过调用doHandleStateChanges将目标方法设置为NewPartition后,会紧接着调用triggerOnlinePartitionStateChange等方法,将状态进一步向OnlinePartition状态转化。

由于在尴尬,在Kafka生产实践中又出问题了 这篇文章中详细介绍了OfflinePartition向OnlinePartition的转化流程,故本篇文章就将重点放在了NewPartition状态向OnlinePartition的转化处理逻辑,其实也就是分区创建的流程,这块的代码入口如下所示:

由于PartitionStateMachine的initializeLeaderAndIsrForPartitions方法比较长,接下来将分步讲解。

2.1.3 分区初始化流程

接下来我们详细探讨PartitionStateMachine的initializeLeaderAndIsrForPartitions方法。

Step1:首先获取所有分区对应的在线副本,Seq< Map< TopicPartition, Seq< Int>> > liveReplicasPerPartition 来表示,类比Java的数据结构为List< Map< TopicPartition, List< Interger> >,代码如下所示:

在Kafka中创建一个主题时,kafka首先会根据集群节点的负载情况,根据主题的分区数、副本数,物理机架等信息,生成静态负载情况,存储在/brokers/topics/{topicName},其数据如下图所示:

而liveReplicasPerPartition是在这个数据结构的基础上筛选出在线的broker,例如如果id为4的broker已下线,那么liveReplicasPerPartition中的值就可能如下所示:

["0":[0,1,2],"1":[1,2],"2":[2,0],"3":[0,1],"4":[0,2],"5":[1,0],"6":[0,2,1],"7":[1,0,2]]
  • 1.

Step2:如果一个分区所有预分配的分片都不在线,则打印错误日志,代码如下所示:

Step3:为分区创建leaderIsrAndControllerEpoch信息,代码如下所示:

这里的实现比较简单,值得注意的是初始化时分区的Leader则为ISR列表中的第一个分区。

Step4:将分区的状态信息 leaderIsrAndControllerEpoch(leader,isr,LeaderEpoch、ControllerEpoch)写入到zookeeper中,具体代码如下;

具体就是在zookeeper中创建/broker/topics/{topicName}/partitions/{分区序号}/state,并将leaderIsrAndControllerEpoch写入到上述节点,具体效果如下图所示:

Step5:对zookeeper写入结果进行处理,对应的代码如下所示:

如果在zookeeper中创建成功,将leaderIsrAndControllerEpoch信息缓存到内存中(Map< TopicPartition, leaderIsrAndControllerEpoch>)中,并将信息放入到controllerBrokerRequestBatch,Kafka Broker控制将信息同步到集群的其他Broker上,同时会在state-change.log日志文件中记录状态成功变更日志;如果创建失败,则在state-change.log中输出对应的错误日志。

当然:为了尽量保证上述过程成功创建,Zookeeper的写入过程引入来重试机制来保证最终执行成功,除非一些类似AUTH_FAILED等不可恢复的异常。

分区的信息写入到zookeeper的/broker/topics/{topicName}/partitions/{分区序号}/state文件路径后,会再次调用changeTo方法,在内存中将分区的状态变更为OnlineParttion。

那在什么时候触发真正创建分区相关的文件夹呢?

原来在将分区信息写入到zookeeper指定文件后,由于Kafka Controller订阅了/broker/topics/{topicName}相关节点,故节点的创建会实时告知Kafka Controller,从而执行分区的选择,具体的代码如下所示:

通过Zookeeper的事件监听机制,kafka就这样巧妙的实现了分区状态机的切换。

3、总结

通过上面的学习,我们对分区的理解应该更加深刻了,从这里我们至少能得出如下结论:

分区的状态主要包括NonExistentPartition、NewPartition、OnlinePartition、OfflinePartition四个状态,只有分区状态为OnlinePartition才能对外提供读与写。

Kafka启动时,在选举好集群的控制器(Kafka Controller)后会启动分区状态机(PartitionStateMachine),Kafka会根据/brokers/topics/{topicName}/partitions/{partition_no}/state中的信息,驱动分区状态向OnlineParttion转换。

当新创建主题时,Kafka会根据当前集群的负载情况,主题需要创建的分区数量、副本数量,机架信息等,进行负载均衡,生成分区的意向leader,已经分区副本的分布情况,写入到/brokers/topics/{topicName}节点上,此时会触发PartitionModifications,从而触发分区创建流程,即从NewPartition向OnlineParttion转换。