深圳幻海软件技术有限公司 欢迎您!

图解 Kafka 源码之服务端启动流程

2023-09-05

前面「八篇」文章通过「场景驱动方式」带你深度剖析了Kafka「日志系统」源码架构设计的方方面面,从今天开始,我们来深度剖析Kafka「Controller」的底层源码实现,这是Controller系列第一篇,我们先回过头来继续来深度聊聊「Kafka 服务端启动的流程」,看看Kafka服务端

前面「八篇」文章通过「场景驱动方式」带你深度剖析了 Kafka「日志系统」源码架构设计的方方面面,从今天开始,我们来深度剖析 Kafka「Controller」的底层源码实现,这是 Controller 系列第一篇,我们先回过头来继续来深度聊聊「Kafka  服务端启动的流程」,看看 Kafka 服务端是如何启动的。

一、总体概述

在深入剖析Kafka「Controller」之前,我想你可能或多或少会有这样的疑问:

Kafka  服务端都有哪些组件,这些组件又是通过哪个类来启动的呢?

这里我们通过启动 Kafka 来了解,大家都知道,启动 Kafka 可以执行以下命令来启动

# 1、启动 kafka 服务命令:
bin/kafka-server-start.sh config/server.properties &
  • 1.
  • 2.

那么今天就来看看通过这个脚本 KafkaServer 初始化了哪些组件。

二、kafka-server-start.sh

我们来看下里面的 shell 内容,如下:

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1、注释说明该脚本的版权信息和使用许可。
if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
        exit 1
fi
# 2、检查命令行参数的个数,若小于 1 则输出脚本的使用方法并退出。
base_dir=$(dirname $0)
# 3、获取当前脚本所在目录的路径,并将其赋值给 base_dir 变量。
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties"
fi
# 4、检查 KAFKA_LOG4J_OPTS 环境变量是否设置,若未设置则设置该变量的值。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
    export JMX_RMI_PORT="10000"
fi
# 5、检查 KAFKA_HEAP_OPTS 环境变量是否设置,若未设置则设置该变量的值,并设置 JMX_PORT 和 JMX_RMI_PORT 环境变量的值,将 EXTRA_ARGS 变量的值设置为字符串 -name kafkaServer -loggc。
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
# 6、检查命令行参数中 COMMAND 变量的值是否为 -daemon,若是则将 EXTRA_ARGS 变量的值添加 -daemon 选项。同时将命令行参数向左移一位,即从 $2 开始计算参数。
COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac
# 7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在 Kafka 客户端库中的 kafka.Kafka 类。整个脚本的作用是启动 Kafka 服务。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
esac
# 7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在 Kafka 客户端库中的 kafka.Kafka 类。整个脚本的作用是启动 Kafka 服务。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.

这里我们重点来看 「第 7 步」,它底层执行的是封装在 Kafka 客户端库中的 kafka.Kafka 类。接下来我们来看下该类都做了什么。

三、kafka.Kafka 类

「Kafka.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:

https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。

从整体上来看,该类就 3 个方法,相对比较简单,我能来看下里面的重点。

这里我们通过「2.8.x」版本来讲解,「2.7.x」还未增加 KafkaRaftServer 类。

1、getPropsFromArgs

def getPropsFromArgs(args: Array[String]): Properties = {
  // 创建一个命令行参数解析器
  val optionParser = new OptionParser(false)
  // 定义 --override 选项,用于覆盖 server.properties 文件中的属性
  val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
    .withRequiredArg()
    .ofType(classOf[String])
   
  // 定义 --version 选项,用于打印版本信息并退出
  optionParser.accepts("version", "Print version information and exit.")
  // 若没有提供参数或者参数包含 --help 选项,则打印用法并退出
  if (args.length == 0 || args.contains("--help")) {
    CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
  }
  // 若参数中包含 --version 选项,则打印版本信息并退出
  if (args.contains("--version")) {
    CommandLineUtils.printVersionAndDie()
  }
  // 加载 server.properties 文件中的属性到 Properties 对象中
  val props = Utils.loadProps(args(0))
  // 若提供了其他参数,则解析这些参数
  if (args.length > 1) {
    // 解析参数中的选项和参数值
    val options = optionParser.parse(args.slice(1, args.length): _*)
    // 检查是否有非选项参数
    if (options.nonOptionArguments().size() > 0) {
      CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
    }
    // 将解析得到的选项和参数值添加到 props 对象中
    props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
  }
  // 返回解析得到的属性集合
  props
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.

该函数的作用是从命令行参数中解析出属性集合。它内部使用了 OptionParser 类库来解析命令行选项,并从 server.properties 文件中加载属性。

如果提供了 override 选项,则它将覆盖 server.properties 文件中的相应属性。函数返回一个 Properties 对象,其中包含了解析得到的属性。

如果没有提供正确的命令行参数或者提供了 --help 或 --version 选项,函数会打印帮助信息或版本信息并退出。

2、buildServer

private def buildServer(props: Properties): Server = {
    val config = KafkaConfig.fromProps(props, false)
    // 直接启动定时任务、网络层、请求处理层
    if (config.requiresZookeeper) {
      new KafkaServer(
        config,
        Time.SYSTEM,
        threadNamePrefix = None,
        enableForwarding = false
      )
    } else {
      // 调用 BrokerServer 等来启动网络层和请求处理层
      new KafkaRaftServer(
        config,
        Time.SYSTEM,
        threadNamePrefix = None
      )
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.

在 kafka 2.8.x 版本中 新增了 raft 协议之后将 BrokerServer、ControllServer 使用了单独的文件来启动最终调用网络层和请求处理层,如果还是使用 zk 的方式启动则是 KafkaServer 启动网络层和请求处理层。

3、main

# 2.7.x 版本源码
def main(args: Array[String]): Unit = {
  try {
    // 1、解析命令行参数,获得属性集合
    val serverProps = getPropsFromArgs(args)
    // 2、从属性集合创建 KafkaServerStartable 对象
    val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
    try {
      // 如果不是 Windows 操作系统,并且不是 IBM JDK,则注册 LoggingSignalHandler
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      // 如果注册 LoggingSignalHandler 失败,则在日志中打印警告信息
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }
    // 3、添加 shutdown hook,用于在程序结束时执行 KafkaServerStartable 的 shutdown 方法
    Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())
    // 4、启动 KafkaServerStartable 实例
    kafkaServerStartable.startup()
    // 5、等待 KafkaServerStartable 实例终止
    kafkaServerStartable.awaitShutdown()
  }
  catch {
    // 如果有异常发生,则记录日志并退出程序
    case e: Throwable =>
      fatal("Exiting Kafka due to fatal exception", e)
      Exit.exit(1)
  }
  // 6、正常终止程序
  Exit.exit(0)
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.

该函数是 Kafka 服务进程的入口,它是整个 Kafka 运行过程的驱动程序。该函数首先通过调用 getPropsFromArgs 函数解析命令行参数并获得属性集合,然后使用这些属性创建 KafkaServerStartable 实例。接着,它注册一个 shutdown hook,用于在程序终止时执行 KafkaServerStartable 的 shutdown 方法。然后它启动 KafkaServerStartable 实例,并等待该实例终止。如果发生异常,则记录日志并退出程序。函数最后调用 Exit.exit 方法退出程序,返回 0 表示正常终止。

# 2.8.x 版本
def main(args: Array[String]): Unit = {
    // 获取Kafka服务的配置信息
    val serverProps = getPropsFromArgs(args)
    // 根据配置信息构建Kafka服务
    val server = buildServer(serverProps)
    try {
      // 注册用于记录日志的信号处理器(若实现失败则退出)
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }
    // 挂载关闭处理器,用于捕获终止信号和常规终止请求
    Exit.addShutdownHook("kafka-shutdown-hook", {
      try server.shutdown() // 关闭Kafka服务
      catch {
        case _: Throwable =>
          fatal("Halting Kafka.") // 日志记录致命错误信息
          // 调用Exit.halt()强制退出,避免重复调用Exit.exit()引发死锁
          Exit.halt(1)
      }
    })
    try server.startup() // 启动Kafka服务
    catch {
      case _: Throwable =>
        // 调用Exit.exit()设置退出状态码,KafkaServer.startup()会在抛出异常时调用shutdown()
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
    server.awaitShutdown() // 等待Kafka服务关闭
    Exit.exit(0) // 调用Exit.exit()设置退出状态码
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.

这里最重要的是 「第 4 步」,调用 kafkaServerStartable.startup() 或者 server.startup() 来启动 kafka。

这里我们还是以「ZK 模式」的方式来启动,后面抽空再进行对 「Raft 模式」启动进行补充。

四、KafkaServerStartable

「KafkaServerStartable.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。

在 Scala 语言里,在一个源代码文件中同时定义相同名字的 class 和 object 的用法被称为伴生(Companion)。Class 对象被称为伴生类,它和 Java 中的类是一样的;而 Object 对象是一个单例对象,用于保存一些静态变量或静态方法。

这里我们主要来看下 Class 类代码。

class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {
  // 创建 KafkaServer 实例
  // 构造函数有两个参数 —— staticServerConfig 表示静态服务器配置,reporters 表示 Kafka 指标报告器。如果 threadNamePrefix 参数未用于构造函数,则默认值为 None。threadNamePrefix 参数表示线程名称前缀,用于调试和维护目的。
  private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)

  def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
  // 启动 KafkaServer
  // startup 方法尝试启动 Kafka 服务器。如果启动 Kafka 服务器时发生异常,则记录一条 fatal 错误日志并退出程序。对于成功启动的 Kafka 服务器,它将开始监听客户端连接,并在收到消息时执行所需的操作。
  def startup(): Unit = {
    try server.startup()
    catch {
      // 如果出现异常,则记录日志并退出程序
      case _: Throwable =>
        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
  }
  // 关闭 KafkaServer
  // shutdown 方法尝试停止 Kafka 服务器。如果在停止服务器时出现异常,则记录一条 fatal 错误日志并强制退出程序。调用 shutdown 方法后,服务器将不再接受新的请求,并开始等待当前进行中的请求完成。当所有处理中的请求都完成后,服务器将彻底停止。
  def shutdown(): Unit = {
    try server.shutdown()
    catch {
      // 如果出现异常,则记录日志并强制退出程序
      case _: Throwable =>
        fatal("Halting Kafka.")
        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
        Exit.halt(1)
    }
  }
  // setServerState 方法允许从 KafkaServerStartable 对象中设置 broker 状态。如果自定义 KafkaServerStartable 对象想要引入新的状态,则此方法很有用。
  def setServerState(newState: Byte): Unit = {
    server.brokerState.newState(newState)
  }
  // 等待 KafkaServer 退出
  // awaitShutdown 方法等待 Kafka 服务器完全退出。在 Kafka 服务器执行 shutdown 方法后,它将不再接受新的请求。但是,服务器可能仍在处理一些已经接收的请求。awaitShutdown 方法将阻塞当前线程,直到服务器彻底停止。
  def awaitShutdown(): Unit = server.awaitShutdown()
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.

KafkaServerStartable 类是一个可启动和停止的 Kafka 服务器。类中的 server 成员变量是 KafkaServer 类的实例,它将在 KafkaServerStartable 类对象启动时创建。该类提供了启动和停止 Kafka 服务器的方法,以及设置 broker 状态和等待 Kafka 服务器退出的方法。

跟本文有关系的是 「启动」方法,它调用了 KafkaServer#startup 方法进行启动。

五、KafkaServer 类

Kafka 集群由多个 Broker 节点构成,每个节点上都运行着一个 Kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。

「KafkaServer.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。

KafkaServer 为 Kafka 的启动类,其中包含了 Kafka 的所有组件,如 KafkaController、groupCoordinator、replicaManager 等。

class KafkaServer(val config: KafkaConfig, //配置信息
time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
                  kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //监控上报
                  ) extends Logging with KafkaMetricsGroup {
  //标识节点已经启动完成
  private val startupComplete = new AtomicBoolean(false)
  //标识节点正在执行关闭操作
  private val isShuttingDown = new AtomicBoolean(false)
  //标识节点正在执行启动操作
  private val isStartingUp = new AtomicBoolean(false)
  //阻塞主线程等待 KafkaServer 的关闭
  private var shutdownLatch = new CountDownLatch(1)
  //日志上下文
  private var logContext: LogContext = null
  var metrics: Metrics = null
  //记录节点的当前状态
  val brokerState: BrokerState = new BrokerState
  //API接口类,用于处理数据类请求
  var dataPlaneRequestProcessor: KafkaApis = null
  //API接口,用于处理控制类请求
  var controlPlaneRequestProcessor: KafkaApis = null
  //权限管理
  var authorizer: Option[Authorizer] = None
  //启动socket,监听9092端口,等待接收客户端请求 
  var socketServer: SocketServer = null
  //数据类请求处理线程池
  var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //命令类处理线程池
  var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //日志管理器    
  var logDirFailureChannel: LogDirFailureChannel = null
  var logManager: LogManager = null
  //副本管理器
  var replicaManager: ReplicaManager = null
  //topic增删管理器
  var adminManager: AdminManager = null
  //token管理器
  var tokenManager: DelegationTokenManager = null
  //动态配置管理器
  var dynamicConfigHandlers: Map[String, ConfigHandler] = null
  var dynamicConfigManager: DynamicConfigManager = null
  var credentialProvider: CredentialProvider = null
  var tokenCache: DelegationTokenCache = null
  //分组协调器
  var groupCoordinator: GroupCoordinator = null
  //事务协调器
  var transactionCoordinator: TransactionCoordinator = null
  //集群控制器
  var kafkaController: KafkaController = null
  //定时任务调度器
  var kafkaScheduler: KafkaScheduler = null
  //集群分区状态信息缓存
  var metadataCache: MetadataCache = null
  //配额管理器
  var quotaManagers: QuotaFactory.QuotaManagers = null
  //zk客户端配置
  val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
  private var _zkClient: KafkaZkClient = null
  val correlationId: AtomicInteger = new AtomicInteger(0)
  val brokerMetaPropsFile = "meta.properties"
  val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
  private var _clusterId: String = null
  private var _brokerTopicStats: BrokerTopicStats = null
  def clusterId: String = _clusterId
  // Visible for testing
  private[kafka] def zkClient = _zkClient
  private[kafka] def brokerTopicStats = _brokerTopicStats
  ....
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.

1、startup

该类方法很多,我们这里只看 startup 启动方法,来看看其内部都启动了哪些组件,来解决本文开头提出的问题。

/**
   * Start up API for bringing up a single instance of the Kafka server.
   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
   */
  def startup(): Unit = {
    try {
      info("starting")
      // 是否已关闭
      if (isShuttingDown.get)
        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
      // 是否已启动
      if (startupComplete.get)
        return
      // 是否可以启动
      val canStartup = isStartingUp.compareAndSet(false, true)
      if (canStartup) { // 设置broker状态为Starting
        brokerState.newState(Starting)
        /* setup zookeeper */
        // 连接ZK,并创建根节点
        initZkClient(time)
        /* initialize features */
        _featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)
        if (config.isFeatureVersioningSupported) {
          _featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)
        }
        /* Get or create cluster_id */
        // 从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64
        _clusterId = getOrGenerateClusterId(zkClient)
        info(s"Cluster ID = $clusterId")
        /* load metadata */
        // 获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id
        // 规则:/brokers/seqid的version值 + maxReservedBrokerId(默认1000),保证唯一性
        val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs
        /* check cluster id */
        if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
          throw new InconsistentClusterIdException(
            s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
            s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")
        /* generate brokerId */
        config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
        logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
        // 配置logger
        this.logIdent = logContext.logPrefix
        // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
        // applied after DynamicConfigManager starts.
        // 初始化AdminZkClient,支持动态修改配置 
        config.dynamicConfig.initialize(zkClient)
        /* start scheduler */
        // 初始化定时任务调度器
        kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
        kafkaScheduler.startup()
        /* create and configure metrics */
        // 创建及配置监控,默认使用JMX及Yammer Metrics
        kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
        kafkaYammerMetrics.configure(config.originals)
        val jmxReporter = new JmxReporter()
        jmxReporter.configure(config.originals)
        val reporters = new util.ArrayList[MetricsReporter]
        reporters.add(jmxReporter)
        val metricConfig = KafkaServer.metricConfig(config)
        val metricsContext = createKafkaMetricsContext()
        metrics = new Metrics(metricConfig, reporters, time, true, metricsContext)
        /* register broker metrics */
        _brokerTopicStats = new BrokerTopicStats
        // 初始化配额管理器
        quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
        notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
        // 用于保证kafka-log数据目录的存在
        logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
        /* start log manager */
        // 启动日志管理器,kafka的消息以日志形式存储
        logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
        // 启动日志清理、刷新、校验、恢复等的定时线程
        logManager.startup()
        metadataCache = new MetadataCache(config.brokerId)
        // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
        // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
        // SCRAM认证方式的token缓存
        tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
        credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
        // Create and start the socket server acceptor threads so that the bound port is known.
        // Delay starting processors until the end of the initialization sequence to ensure
        // that credentials have been loaded before processing authentications.
        // 启动socket,监听9092端口,等待接收客户端请求 
        socketServer = new SocketServer(config, metrics, time, credentialProvider)
        socketServer.startup(startProcessingRequests = false)
        /* start replica manager */
        brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)
        // 启动副本管理器,高可用相关
        replicaManager = createReplicaManager(isShuttingDown)
        replicaManager.startup()
        brokerToControllerChannelManager.start()
        // 将broker信息注册到ZK上
        val brokerInfo = createBrokerInfo
        val brokerEpoch = zkClient.registerBroker(brokerInfo)
        // Now that the broker is successfully registered, checkpoint its metadata
        // 校验 broker 信息
        checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))
        /* start token manager */
        // 启动 token 管理器
        tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
        tokenManager.startup()
        /* start kafka controller */
        // 启动Kafka控制器,只有 Leader 会与ZK建连
        kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
        kafkaController.startup()
        // admin管理器
        adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
        /* start group coordinator */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        // 启动集群群组协调器
        groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
        groupCoordinator.startup()
        /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        // 启动事务协调器
        transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
        transactionCoordinator.startup()
        /* Get the authorizer and initialize it if one is specified.*/
        // ACL
        authorizer = config.authorizer
        authorizer.foreach(_.configure(config.originals))
        val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
          case Some(authZ) =>
            authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
              ep -> cs.toCompletableFuture
            }
          case None =>
            brokerInfo.broker.endPoints.map { ep =>
              ep.toJava -> CompletableFuture.completedFuture[Void](null)
            }.toMap
        }
        // 创建拉取管理器
        val fetchManager = new FetchManager(Time.SYSTEM,
          new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
            KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
        /* start processing requests */
        // 初始化数据类请求的KafkaApis,负责数据类请求逻辑处理
        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
        // 初始化数据类请求处理的线程池  
        dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
          config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
        socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
          // 初始化控制类请求的 KafkaApis
          controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
            fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
          // 初始化控制类请求的线程池
          controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
            1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
        }
        Mx4jLoader.maybeLoad()
        /* Add all reconfigurables for config change notification before starting config handlers */
        config.dynamicConfig.addReconfigurables(this)
        /* start dynamic config manager */
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
                                                           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
                                                           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                                                           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

        // Create the config manager. start listening to notifications
        // 启动动态配置处理器
        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
        dynamicConfigManager.startup()
        // 启动请求处理线程
        socketServer.startProcessingRequests(authorizerFutures)
        // 更新broker状态
        brokerState.newState(RunningAsBroker)
        shutdownLatch = new CountDownLatch(1)
        startupComplete.set(true)
        isStartingUp.set(false)
        AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
        info("started")
      }
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        isStartingUp.set(false)
        shutdown()
        throw e
    }
 }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.

这里总结下该方法都启动了哪些组件:

  • initZkClient(time) 初始化 Zk。
  • kafkaScheduler  定时器。
  • logManager 日志模块。
  • MetadataCache  元数据缓存。
  • socketServer 网络服务器。
  • replicaManager 副本模块。
  • kafkaController 控制器。
  • groupCoordinator 协调器用于和ConsumerCoordinator 交互
  • transactionCoordinator 事务相关
  • fetchManager  副本拉取管理器。
  • dynamicConfigManager 动态配置管理器。

2、Broker 状态

这个是在 2.7.x 版本之前的状态,在 2.8.x 之后版本进行了重构。

sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • NotRunning:初始状态,标识当前 broker 节点未运行。
  • Starting:标识当前 broker 节点正在启动中。
  • RecoveringFromUncleanShutdown:标识当前 broker 节点正在从上次非正常关闭中恢复。
  • RuningAsBroker:标识当前 broker 节点启动成功,可以对外提供服务。
  • PendingControlledShutdown:标识当前 broker 节点正在等待 controlled shutdown 操作完成。
  • BrokerShuttingDown:标识当前 broker 节点正在执行 shutdown 操作。

这些就是 KafkaServer 中主要模块的入口,接下来的文章会通过这些入口一一进行分析。

六、总结

这里,我们一起来总结一下这篇文章的重点。

  • 文章开头通过对「kafka-server-start.sh」内容进行剖析,引出了 「kafka.Kafka」类。
  • 在「kafka.Kafka」的 main 方法中调用了「KafkaServerStartable」尝试启动 Kafka 服务器。
  • 接着在 「KafkaServerStartable」的 startup 方法中调用了 「KafkaServer」的 startup 方法启动服务器需要的各种组件类。

下篇我们来深度剖析「Broker 启动集群如何感知」,大家期待,我们下期见。