前面「八篇」文章通过「场景驱动方式」带你深度剖析了 Kafka「日志系统」源码架构设计的方方面面,从今天开始,我们来深度剖析 Kafka「Controller」的底层源码实现,这是 Controller 系列第一篇,我们先回过头来继续来深度聊聊「Kafka 服务端启动的流程」,看看 Kafka 服务端是如何启动的。
一、总体概述
在深入剖析Kafka「Controller」之前,我想你可能或多或少会有这样的疑问:
Kafka 服务端都有哪些组件,这些组件又是通过哪个类来启动的呢?
这里我们通过启动 Kafka 来了解,大家都知道,启动 Kafka 可以执行以下命令来启动:
# 1、启动 kafka 服务命令:
bin/kafka-server-start.sh config/server.properties &
- 1.
- 2.
那么今天就来看看通过这个脚本 KafkaServer 初始化了哪些组件。
二、kafka-server-start.sh
我们来看下里面的 shell 内容,如下:
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1、注释说明该脚本的版权信息和使用许可。
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
# 2、检查命令行参数的个数,若小于 1 则输出脚本的使用方法并退出。
base_dir=$(dirname $0)
# 3、获取当前脚本所在目录的路径,并将其赋值给 base_dir 变量。
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties"
fi
# 4、检查 KAFKA_LOG4J_OPTS 环境变量是否设置,若未设置则设置该变量的值。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
export JMX_RMI_PORT="10000"
fi
# 5、检查 KAFKA_HEAP_OPTS 环境变量是否设置,若未设置则设置该变量的值,并设置 JMX_PORT 和 JMX_RMI_PORT 环境变量的值,将 EXTRA_ARGS 变量的值设置为字符串 -name kafkaServer -loggc。
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
# 6、检查命令行参数中 COMMAND 变量的值是否为 -daemon,若是则将 EXTRA_ARGS 变量的值添加 -daemon 选项。同时将命令行参数向左移一位,即从 $2 开始计算参数。
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
# 7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在 Kafka 客户端库中的 kafka.Kafka 类。整个脚本的作用是启动 Kafka 服务。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
esac
# 7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在 Kafka 客户端库中的 kafka.Kafka 类。整个脚本的作用是启动 Kafka 服务。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
这里我们重点来看 「第 7 步」,它底层执行的是封装在 Kafka 客户端库中的 kafka.Kafka 类。接下来我们来看下该类都做了什么。
三、kafka.Kafka 类
「Kafka.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:
https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。
从整体上来看,该类就 3 个方法,相对比较简单,我能来看下里面的重点。
这里我们通过「2.8.x」版本来讲解,「2.7.x」还未增加 KafkaRaftServer 类。
1、getPropsFromArgs
def getPropsFromArgs(args: Array[String]): Properties = {
// 创建一个命令行参数解析器
val optionParser = new OptionParser(false)
// 定义 --override 选项,用于覆盖 server.properties 文件中的属性
val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
.withRequiredArg()
.ofType(classOf[String])
// 定义 --version 选项,用于打印版本信息并退出
optionParser.accepts("version", "Print version information and exit.")
// 若没有提供参数或者参数包含 --help 选项,则打印用法并退出
if (args.length == 0 || args.contains("--help")) {
CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
}
// 若参数中包含 --version 选项,则打印版本信息并退出
if (args.contains("--version")) {
CommandLineUtils.printVersionAndDie()
}
// 加载 server.properties 文件中的属性到 Properties 对象中
val props = Utils.loadProps(args(0))
// 若提供了其他参数,则解析这些参数
if (args.length > 1) {
// 解析参数中的选项和参数值
val options = optionParser.parse(args.slice(1, args.length): _*)
// 检查是否有非选项参数
if (options.nonOptionArguments().size() > 0) {
CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
}
// 将解析得到的选项和参数值添加到 props 对象中
props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
}
// 返回解析得到的属性集合
props
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
该函数的作用是从命令行参数中解析出属性集合。它内部使用了 OptionParser 类库来解析命令行选项,并从 server.properties 文件中加载属性。
如果提供了 override 选项,则它将覆盖 server.properties 文件中的相应属性。函数返回一个 Properties 对象,其中包含了解析得到的属性。
如果没有提供正确的命令行参数或者提供了 --help 或 --version 选项,函数会打印帮助信息或版本信息并退出。
2、buildServer
private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, false)
// 直接启动定时任务、网络层、请求处理层
if (config.requiresZookeeper) {
new KafkaServer(
config,
Time.SYSTEM,
threadNamePrefix = None,
enableForwarding = false
)
} else {
// 调用 BrokerServer 等来启动网络层和请求处理层
new KafkaRaftServer(
config,
Time.SYSTEM,
threadNamePrefix = None
)
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
在 kafka 2.8.x 版本中 新增了 raft 协议之后将 BrokerServer、ControllServer 使用了单独的文件来启动最终调用网络层和请求处理层,如果还是使用 zk 的方式启动则是 KafkaServer 启动网络层和请求处理层。
3、main
# 2.7.x 版本源码
def main(args: Array[String]): Unit = {
try {
// 1、解析命令行参数,获得属性集合
val serverProps = getPropsFromArgs(args)
// 2、从属性集合创建 KafkaServerStartable 对象
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
try {
// 如果不是 Windows 操作系统,并且不是 IBM JDK,则注册 LoggingSignalHandler
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
new LoggingSignalHandler().register()
} catch {
// 如果注册 LoggingSignalHandler 失败,则在日志中打印警告信息
case e: ReflectiveOperationException =>
warn("Failed to register optional signal handler that logs a message when the process is terminated " +
s"by a signal. Reason for registration failure is: $e", e)
}
// 3、添加 shutdown hook,用于在程序结束时执行 KafkaServerStartable 的 shutdown 方法
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())
// 4、启动 KafkaServerStartable 实例
kafkaServerStartable.startup()
// 5、等待 KafkaServerStartable 实例终止
kafkaServerStartable.awaitShutdown()
}
catch {
// 如果有异常发生,则记录日志并退出程序
case e: Throwable =>
fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)
}
// 6、正常终止程序
Exit.exit(0)
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
该函数是 Kafka 服务进程的入口,它是整个 Kafka 运行过程的驱动程序。该函数首先通过调用 getPropsFromArgs 函数解析命令行参数并获得属性集合,然后使用这些属性创建 KafkaServerStartable 实例。接着,它注册一个 shutdown hook,用于在程序终止时执行 KafkaServerStartable 的 shutdown 方法。然后它启动 KafkaServerStartable 实例,并等待该实例终止。如果发生异常,则记录日志并退出程序。函数最后调用 Exit.exit 方法退出程序,返回 0 表示正常终止。
# 2.8.x 版本
def main(args: Array[String]): Unit = {
// 获取Kafka服务的配置信息
val serverProps = getPropsFromArgs(args)
// 根据配置信息构建Kafka服务
val server = buildServer(serverProps)
try {
// 注册用于记录日志的信号处理器(若实现失败则退出)
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
new LoggingSignalHandler().register()
} catch {
case e: ReflectiveOperationException =>
warn("Failed to register optional signal handler that logs a message when the process is terminated " +
s"by a signal. Reason for registration failure is: $e", e)
}
// 挂载关闭处理器,用于捕获终止信号和常规终止请求
Exit.addShutdownHook("kafka-shutdown-hook", {
try server.shutdown() // 关闭Kafka服务
catch {
case _: Throwable =>
fatal("Halting Kafka.") // 日志记录致命错误信息
// 调用Exit.halt()强制退出,避免重复调用Exit.exit()引发死锁
Exit.halt(1)
}
})
try server.startup() // 启动Kafka服务
catch {
case _: Throwable =>
// 调用Exit.exit()设置退出状态码,KafkaServer.startup()会在抛出异常时调用shutdown()
fatal("Exiting Kafka.")
Exit.exit(1)
}
server.awaitShutdown() // 等待Kafka服务关闭
Exit.exit(0) // 调用Exit.exit()设置退出状态码
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
这里最重要的是 「第 4 步」,调用 kafkaServerStartable.startup() 或者 server.startup() 来启动 kafka。
这里我们还是以「ZK 模式」的方式来启动,后面抽空再进行对 「Raft 模式」启动进行补充。
四、KafkaServerStartable
「KafkaServerStartable.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:
https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。
在 Scala 语言里,在一个源代码文件中同时定义相同名字的 class 和 object 的用法被称为伴生(Companion)。Class 对象被称为伴生类,它和 Java 中的类是一样的;而 Object 对象是一个单例对象,用于保存一些静态变量或静态方法。
这里我们主要来看下 Class 类代码。
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {
// 创建 KafkaServer 实例
// 构造函数有两个参数 —— staticServerConfig 表示静态服务器配置,reporters 表示 Kafka 指标报告器。如果 threadNamePrefix 参数未用于构造函数,则默认值为 None。threadNamePrefix 参数表示线程名称前缀,用于调试和维护目的。
private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
// 启动 KafkaServer
// startup 方法尝试启动 Kafka 服务器。如果启动 Kafka 服务器时发生异常,则记录一条 fatal 错误日志并退出程序。对于成功启动的 Kafka 服务器,它将开始监听客户端连接,并在收到消息时执行所需的操作。
def startup(): Unit = {
try server.startup()
catch {
// 如果出现异常,则记录日志并退出程序
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}
// 关闭 KafkaServer
// shutdown 方法尝试停止 Kafka 服务器。如果在停止服务器时出现异常,则记录一条 fatal 错误日志并强制退出程序。调用 shutdown 方法后,服务器将不再接受新的请求,并开始等待当前进行中的请求完成。当所有处理中的请求都完成后,服务器将彻底停止。
def shutdown(): Unit = {
try server.shutdown()
catch {
// 如果出现异常,则记录日志并强制退出程序
case _: Throwable =>
fatal("Halting Kafka.")
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Exit.halt(1)
}
}
// setServerState 方法允许从 KafkaServerStartable 对象中设置 broker 状态。如果自定义 KafkaServerStartable 对象想要引入新的状态,则此方法很有用。
def setServerState(newState: Byte): Unit = {
server.brokerState.newState(newState)
}
// 等待 KafkaServer 退出
// awaitShutdown 方法等待 Kafka 服务器完全退出。在 Kafka 服务器执行 shutdown 方法后,它将不再接受新的请求。但是,服务器可能仍在处理一些已经接收的请求。awaitShutdown 方法将阻塞当前线程,直到服务器彻底停止。
def awaitShutdown(): Unit = server.awaitShutdown()
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
KafkaServerStartable 类是一个可启动和停止的 Kafka 服务器。类中的 server 成员变量是 KafkaServer 类的实例,它将在 KafkaServerStartable 类对象启动时创建。该类提供了启动和停止 Kafka 服务器的方法,以及设置 broker 状态和等待 Kafka 服务器退出的方法。
跟本文有关系的是 「启动」方法,它调用了 KafkaServer#startup 方法进行启动。
五、KafkaServer 类
Kafka 集群由多个 Broker 节点构成,每个节点上都运行着一个 Kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。
「KafkaServer.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:
https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。
KafkaServer 为 Kafka 的启动类,其中包含了 Kafka 的所有组件,如 KafkaController、groupCoordinator、replicaManager 等。
class KafkaServer(val config: KafkaConfig, //配置信息
time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //监控上报
) extends Logging with KafkaMetricsGroup {
//标识节点已经启动完成
private val startupComplete = new AtomicBoolean(false)
//标识节点正在执行关闭操作
private val isShuttingDown = new AtomicBoolean(false)
//标识节点正在执行启动操作
private val isStartingUp = new AtomicBoolean(false)
//阻塞主线程等待 KafkaServer 的关闭
private var shutdownLatch = new CountDownLatch(1)
//日志上下文
private var logContext: LogContext = null
var metrics: Metrics = null
//记录节点的当前状态
val brokerState: BrokerState = new BrokerState
//API接口类,用于处理数据类请求
var dataPlaneRequestProcessor: KafkaApis = null
//API接口,用于处理控制类请求
var controlPlaneRequestProcessor: KafkaApis = null
//权限管理
var authorizer: Option[Authorizer] = None
//启动socket,监听9092端口,等待接收客户端请求
var socketServer: SocketServer = null
//数据类请求处理线程池
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
//命令类处理线程池
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
//日志管理器
var logDirFailureChannel: LogDirFailureChannel = null
var logManager: LogManager = null
//副本管理器
var replicaManager: ReplicaManager = null
//topic增删管理器
var adminManager: AdminManager = null
//token管理器
var tokenManager: DelegationTokenManager = null
//动态配置管理器
var dynamicConfigHandlers: Map[String, ConfigHandler] = null
var dynamicConfigManager: DynamicConfigManager = null
var credentialProvider: CredentialProvider = null
var tokenCache: DelegationTokenCache = null
//分组协调器
var groupCoordinator: GroupCoordinator = null
//事务协调器
var transactionCoordinator: TransactionCoordinator = null
//集群控制器
var kafkaController: KafkaController = null
//定时任务调度器
var kafkaScheduler: KafkaScheduler = null
//集群分区状态信息缓存
var metadataCache: MetadataCache = null
//配额管理器
var quotaManagers: QuotaFactory.QuotaManagers = null
//zk客户端配置
val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
private var _zkClient: KafkaZkClient = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
private var _clusterId: String = null
private var _brokerTopicStats: BrokerTopicStats = null
def clusterId: String = _clusterId
// Visible for testing
private[kafka] def zkClient = _zkClient
private[kafka] def brokerTopicStats = _brokerTopicStats
....
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
1、startup
该类方法很多,我们这里只看 startup 启动方法,来看看其内部都启动了哪些组件,来解决本文开头提出的问题。
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup(): Unit = {
try {
info("starting")
// 是否已关闭
if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
// 是否已启动
if (startupComplete.get)
return
// 是否可以启动
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) { // 设置broker状态为Starting
brokerState.newState(Starting)
/* setup zookeeper */
// 连接ZK,并创建根节点
initZkClient(time)
/* initialize features */
_featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)
if (config.isFeatureVersioningSupported) {
_featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)
}
/* Get or create cluster_id */
// 从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64
_clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
/* load metadata */
// 获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id
// 规则:/brokers/seqid的version值 + maxReservedBrokerId(默认1000),保证唯一性
val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs
/* check cluster id */
if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
throw new InconsistentClusterIdException(
s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")
/* generate brokerId */
config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
// 配置logger
this.logIdent = logContext.logPrefix
// initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
// applied after DynamicConfigManager starts.
// 初始化AdminZkClient,支持动态修改配置
config.dynamicConfig.initialize(zkClient)
/* start scheduler */
// 初始化定时任务调度器
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()
/* create and configure metrics */
// 创建及配置监控,默认使用JMX及Yammer Metrics
kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
kafkaYammerMetrics.configure(config.originals)
val jmxReporter = new JmxReporter()
jmxReporter.configure(config.originals)
val reporters = new util.ArrayList[MetricsReporter]
reporters.add(jmxReporter)
val metricConfig = KafkaServer.metricConfig(config)
val metricsContext = createKafkaMetricsContext()
metrics = new Metrics(metricConfig, reporters, time, true, metricsContext)
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
// 初始化配额管理器
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
// 用于保证kafka-log数据目录的存在
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
/* start log manager */
// 启动日志管理器,kafka的消息以日志形式存储
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
// 启动日志清理、刷新、校验、恢复等的定时线程
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
// SCRAM认证方式的token缓存
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
// 启动socket,监听9092端口,等待接收客户端请求
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startProcessingRequests = false)
/* start replica manager */
brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)
// 启动副本管理器,高可用相关
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
brokerToControllerChannelManager.start()
// 将broker信息注册到ZK上
val brokerInfo = createBrokerInfo
val brokerEpoch = zkClient.registerBroker(brokerInfo)
// Now that the broker is successfully registered, checkpoint its metadata
// 校验 broker 信息
checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))
/* start token manager */
// 启动 token 管理器
tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
tokenManager.startup()
/* start kafka controller */
// 启动Kafka控制器,只有 Leader 会与ZK建连
kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
kafkaController.startup()
// admin管理器
adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
// 启动集群群组协调器
groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
groupCoordinator.startup()
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
// 启动事务协调器
transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
// ACL
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
case Some(authZ) =>
authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
ep -> cs.toCompletableFuture
}
case None =>
brokerInfo.broker.endPoints.map { ep =>
ep.toJava -> CompletableFuture.completedFuture[Void](null)
}.toMap
}
// 创建拉取管理器
val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
/* start processing requests */
// 初始化数据类请求的KafkaApis,负责数据类请求逻辑处理
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
// 初始化数据类请求处理的线程池
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
// 初始化控制类请求的 KafkaApis
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
// 初始化控制类请求的线程池
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
}
Mx4jLoader.maybeLoad()
/* Add all reconfigurables for config change notification before starting config handlers */
config.dynamicConfig.addReconfigurables(this)
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
// Create the config manager. start listening to notifications
// 启动动态配置处理器
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
// 启动请求处理线程
socketServer.startProcessingRequests(authorizerFutures)
// 更新broker状态
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
- 152.
- 153.
- 154.
- 155.
- 156.
- 157.
- 158.
- 159.
- 160.
- 161.
- 162.
- 163.
- 164.
- 165.
- 166.
- 167.
- 168.
- 169.
- 170.
- 171.
- 172.
- 173.
- 174.
- 175.
- 176.
- 177.
- 178.
- 179.
- 180.
- 181.
- 182.
- 183.
这里总结下该方法都启动了哪些组件:
- initZkClient(time) 初始化 Zk。
- kafkaScheduler 定时器。
- logManager 日志模块。
- MetadataCache 元数据缓存。
- socketServer 网络服务器。
- replicaManager 副本模块。
- kafkaController 控制器。
- groupCoordinator 协调器用于和ConsumerCoordinator 交互
- transactionCoordinator 事务相关
- fetchManager 副本拉取管理器。
- dynamicConfigManager 动态配置管理器。
2、Broker 状态
这个是在 2.7.x 版本之前的状态,在 2.8.x 之后版本进行了重构。
sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- NotRunning:初始状态,标识当前 broker 节点未运行。
- Starting:标识当前 broker 节点正在启动中。
- RecoveringFromUncleanShutdown:标识当前 broker 节点正在从上次非正常关闭中恢复。
- RuningAsBroker:标识当前 broker 节点启动成功,可以对外提供服务。
- PendingControlledShutdown:标识当前 broker 节点正在等待 controlled shutdown 操作完成。
- BrokerShuttingDown:标识当前 broker 节点正在执行 shutdown 操作。
这些就是 KafkaServer 中主要模块的入口,接下来的文章会通过这些入口一一进行分析。
六、总结
这里,我们一起来总结一下这篇文章的重点。
- 文章开头通过对「kafka-server-start.sh」内容进行剖析,引出了 「kafka.Kafka」类。
- 在「kafka.Kafka」的 main 方法中调用了「KafkaServerStartable」尝试启动 Kafka 服务器。
- 接着在 「KafkaServerStartable」的 startup 方法中调用了 「KafkaServer」的 startup 方法启动服务器需要的各种组件类。
下篇我们来深度剖析「Broker 启动集群如何感知」,大家期待,我们下期见。