参考文献

什么是消息中间件

  • 消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步,可靠的消息传输的支撑性软件系统;

    • 消息传递技术: 队列和发布-订阅

      • 队列:在队列中,一组用户可以从服务器中读取消息,每条消息都发送给其中一个人。
      • 发布-订阅:在这个模型中,消息被广播给所有的用户。
  • 消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成.通过提供消息传递和消息排队模型,他可以在分布式环境下扩展进程间的通信.

什么是Kafka

  • Kafka是分布式发布-订阅消息系统,它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分,Kafka是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。
  • Kafka是Scala语言开发,运行在JVM

Kafka的功能及作用

  • 消息系统:
    • Kafka与传统的消息中间件都具有系统解耦,冗余存储,流量削峰,缓冲,异步通信,扩展性,可恢复性等功能,与此同时,Kafka还提供大多消息系统难以实现的消息顺序性保障以及回溯性消费的功能;
      • 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,Kafka在中间可以起到一个缓冲的作用,把消息暂存在Kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
      • 解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
      • 冗余存储:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
      • 健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
      • 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
  • 存储系统:
    • Kafka把消息持久化到磁盘上,相比与其他基于内存存储的系统而言有效的降低了消息丢失的风险;这得益于其消息持久化和多副本机制.也可以将Kafka作为长期的存储系统来使用,只需要把对应的数据保留策略设置为"永久"或启用主题日志压缩功能;
  • 流式处理平台
    • Kafka为流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理框架;

Kafka的架构

  • 一个典型的Kafka体系架构包括若干Producer,若干Consumer以及一个Zookeeper集群(在2.8.0版本中移除了Zookeeper,通过KRaft进行自己的集群管理);
  • Producer将消息发到Broker,Broker负责将收到的消息存储到磁盘中,而Consumer负责从Broker中订阅并消费消息;
  • Kafka中有两个线程: 主线程和Sender线程
    • 主线程负责拦截器,序列化器,分区器的管理;
      • 拦截器:可以在消息传入Kafka前和producer回调函数返回前对消息进行简单的处理
      • 序列化器:将生产者消息序列化,可以被网络传输
      • 分区器:可以给消息指定传入分区
    • Sender线程负责将分区后的数据发送给对应分区.

Kafka基本概念

  • Producer: 生产者,发送消息的一方。生产者负责创建消息,然后将其发送到 Broker。
  • Consumer: 消费者,接受消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。
  • Consumer Group:
    • 消费者组,一个消费者组可以包含一个或多个消费者。
    • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
    • Kafka 就是通过消费组的方式来实现消息 P2P 模式和广播模式。
  • Broker: 服务代理节点。Broker 是 Kafka 的服务节点,即 Kafka 的服务器。
  • Topic: 一个逻辑上的概念,Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。
  • Partition: Topic 是一个逻辑的概念,它可以细分为多个分区,每个分区只属于单个主题。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。
  • Offset: offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序性而不是主题有序性。
  • **Replica:**副本,同一分区的不同副本保存的是相同的消息,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 Topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower。
  • Leader :每个分区的多个副本中的"主副本",生产者以及消费者只与 Leader 交互
  • Follower :每个分区的多个副本中的"从副本",负责实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,从 Follower 副本中重新选举新的 Leader 副本对外提供服务。
  • Record: 实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了 key、value 和 Timestamp。
  • LEO(Log End Offset):每个副本的最后一个offset;
  • HW(High Watermark):所有副本的最小LEO
  • OSR(Out-Sync Relicas):与 Leader 副本同步滞后过多的 Replica 组成了 OSR
  • ISR(In-Sync Replicas): 副本同步队列
  • AR(Assigned Replicas): 所有副本

Kafka Replicas是如何管理的?

  • Leader 负责维护和跟踪 ISR 集合中所有 Follower 副本的滞后状态,当 Follower 副本落后过多时,就会将其放入 OSR 集合,当 Follower 副本追上了 Leader 的进度时,就会将其放入 ISR 集合。
  • 默认情况下,只有ISR中的副本才有资格晋升为 Leader
  • Follower故障时:Follower会被踢出 ISR ,恢复后,follower读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步,等Follower的LEO大于等于该partition的HW后,就可以重新加入ISR
  • **Leader故障时:**Leader 发生故障之后,会从 ISR中选出一个新的 leader,之后,为保证多个副本之间的, 数据一致性, 其余的 Follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。

生产者发送消息有哪些模式?

  • 发后即忘(fire-and-forget)
    • 它只管往 Kafka 里面发送消息,但是不关心消息是否正确到达,这种方式的效率最高,但是可靠性也最差,比如当发生某些不可充实异常的时候会造成消息的丢失
  • 同步(sync)
    • producer.send()返回一个Future对象,调用get()方法变回进行同步等待,就知道消息是否发送成功,发送一条消息需要等上个消息发送成功后才可以继续发送
  • 异步(async)
    • Kafka支持 producer.send() 传入一个回调函数,消息不管成功或者失败都会调用这个回调函数,这样就算是异步发送,我们也知道消息的发送情况,然后再回调函数中选择记录日志还是重试都取决于调用方

发送消息的分区策略有哪些?

  • 轮询:
    • 依次将消息发送该topic下的所有分区,如果在创建消息的时候 key 为 null,Kafka 默认采用这种策略。
  • Key 指定分区:
    • 在创建消息是 key 不为空,并且使用默认分区器,Kafka 会将 key 进行 hash,然后根据hash值映射到指定的分区上。这样的好处是 key 相同的消息会在一个分区下,Kafka 并不能保证全局有序,但是在每个分区下的消息是有序的,按照顺序存储,按照顺序消费。在保证同一个 key 的消息是有序的,这样基本能满足消息的顺序性的需求。但是如果 partition 数量发生变化,那就很难保证 key 与分区之间的映射关系了
  • 自定义策略:
    • 实现 Partitioner 接口就能自定义分区策略。
  • 指定 Partition发送

Kafka 支持读写分离吗?为什么?

  • Kafka 是不支持读写分离的,那么读写分离的好处是什么?主要就是让一个节点去承担另一个节点的负载压力,也就是能做到一定程度的负载均衡,而且 Kafka 不通过读写分离也可以一定程度上去实现负载均衡。

  • 但是对于 Kafka 的架构来说,读写分离有两个很大的缺点:

    • 数据不一致的问题:读写分离必然涉及到数据的同步,只要是不同节点之间的数据同步,必然会有数据不一致的问题存在。
    • 由于 Kafka 独特的数据处理方式,导致如果将数据从一个节点同步到另一个节点必然会经过主节点磁盘和从节点磁盘,对一些延时性要求较高的应用来说,并不太适用;

Kafka 是怎么去实现负载均衡的?

  • Kafka 的负责均衡主要是通过分区来实现的,我们知道 Kafka 是主写主读的架构,如下图:

    img
    • 共三个 broker ,里面各有三个副本,总共有三个 partition, 深色的是 leader,浅色的是 follower,上下灰色分别代表生产者和消费者,虚线代表 follower 从 leader 拉取消息。

    • 我们从这张图就可以很明显的看出来,每个 broker 都有消费者拉取消息,每个 broker 也都有生产者发送消息,每个 broker 上的读写负载都是一样的,这也说明了 Kafka 独特的架构方式可以通过主写主读来实现负载均衡。

Kafka 的负责均衡会有什么问题呢?

  • Kafka的负载均衡在绝对理想的状况下可以实现,但是会有某些情况出现一定程度上的负载不均衡
    • broker 端分配不均:当创建 topic 的时候可能会出现某些 broker 分配到的分区数多,而有些 broker 分配的分区少,这就导致了 leader 多副本不均。
    • 生产者写入消息不均:生产者可能只对某些 broker 中的 leader 副本进行大量的写入操作,而对其他的 leader 副本不闻不问。
    • 消费者消费不均:消费者可能只对某些 broker 中的 leader 副本进行大量的拉取操作,而对其他的 leader 副本不闻不问。
    • leader 副本切换不均:当主从副本切换或者分区副本进行了重分配后,可能会导致各个 broker 中的 leader 副本分配不均匀。

Kafka 的可靠性是怎么保证的?

  • ack
    • 这个参数用来指定分区中有多少个副本收到这条消息,生产者才认为这条消息是写入成功的,这个参数有三个值:
      • acks = 1,默认为1。生产者发送消息,只要 leader 副本成功写入消息,就代表成功。这种方案的问题在于,当返回成功后,如果 leader 副本和 follower 副本还没有来得及同步,leader 就崩溃了,那么在选举后新的 leader 就没有这条消息,也就丢失了
      • acks = 0生产者发送消息后直接算写入成功,不需要等待响应。这个方案的问题很明显,只要服务端写消息时出现任何问题,都会导致消息丢失
      • acks = -1acks = all。生产者发送消息后,需要等待 ISR 中的所有副本都成功写入消息后才能收到服务端的响应。毫无疑问这种方案的可靠性是最高的,但是如果 ISR 中只有leader 副本,那么就和 acks = 1 毫无差别了。
  • 消息发送方式
    • 生产者发送消息有三种方式,发完即忘,同步和异步。可以通过同步或者异步获取响应结果,失败做重试来保证消息的可靠性;
  • 手动提交位移
    • 默认情况下,当消费者消费到消息后,就会自动提交位移。但是如果消费者消费出错,没有进入真正的业务处理,那么就可能会导致这条消息消费失败,从而丢失。我们可以开启手动提交位移,等待业务正常处理完成后,再提交offset。
  • 通过副本LEO来确定分区HW

Kafka 的消息消费方式有哪些?

  • 一般消息消费有两种模式,推和拉。Kafka的消费是属于拉模式(PULL)的,而此模式的消息消费方式有两种,点对点和发布订阅
    • 点对点:如果所有消费者属于同一个消费组,那么所有的消息都会被均匀的投递给每一个消费者,每条消息只会被其中一个消费者消费
    • 发布订阅:如果所有消费者属于不同的消费组,那么所有的消息都会被投递给每一个消费者,每个消费者都会收到该消息

分区再分配是做什么的?解决了什么问题?

  • 分区再分配主要是用来维护 Kafka 集群的负载均衡

  • 问题1:当集群中的一个节点下线了

    • 如果该节点的分区是单副本的,那么分区将会变得不可用
    • 如果是多副本的,就会进行 leader 选举,在其他机器上选举出新的 leader
    • Kafka 并不会将这些失效的分区迁移到其他可用的 broker 上,这样就会影响集群的负载均衡,甚至也会影响服务的可靠性和可用性
  • 问题2:当集群新增 broker 时,只有新的主题分区会分配在该 broker 上,而老的主题分区不会分配在该 broker 上,就造成了老节点和新节点之间的负载不均衡

    • 为了解决该问题就出现了分区再分配,它可以在集群扩容,broker 失效的场景下进行分区迁移。
  • 分区再分配的原理就是通化控制器给分区新增新的副本,然后通过网络把旧的副本数据复制到新的副本上,在复制完成后,将旧副本清除。 当然,为了不影响集群正常的性能,在此复制期间还会有一些列保证性能的操作,比如复制限流

如何增强消费者的消费能力?

  • 可以考虑增加 Topic 的分区数,并且同时提升消费组的消费者数量,消费者数=分区数;
  • 如果是消费者消费不及时,可以采用多线程的方式进行消费,并且优化业务方法流程,同样的分区数;

消费者与 Topic 的分区分配策略有哪些?

  • Range Assignor 分配策略

    • 该分配策略是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后分区按照跨度来进行平均分配,尽可能保证分区均匀的分配给所有的消费者。
    • 对于每个 topic,该策略会讲消费者组内所有订阅这个主题的消费者按照名称的字典顺序排序,然后为每个消费者划分固定过的区域,如果不够平均分配,那么字典排序考前的就会多分配一个分区
  • Round Robin Assignor 分配策略

    • 该分配策略是按将消费者组内所有消费者及消费者订阅的所有主题的分区按照字典排序,然后通过轮询的方式分配给每个消费者
  • Sticky Assignor分配策略

    • 这种分配策略有两个目的

      • 1.分区的分配要尽可能的均匀

      • 2.分区的分配尽可能的与上次分配的保持相同。

    • 当两者发生冲突时,第一个目标优先于第二个目标。

  • 自定义分区分配策略

    • 可以通过实现 org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口来实现

Kafka 控制器是什么?有什么作用?

  • 在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器,它负责管理整个集群中所有分区和副本的状态,Kafka 集群中只能有一个控制器
    • 当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本
    • 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息
    • 当为某个 topic 增加分区数量时,由控制器负责分区的重新分配

Kafka 控制器是怎么进行选举的?

  • Kafka 中的控制器选举工作依赖于 Zookeeper,成功竞选成为控制器的 broker 会在Zookeeper中创建/controller临时节点。

  • 每个 broker 启动的时候会去尝试读取/controller 节点的brokerid的值

    • 如果读取到的 brokerid 的值不为-1,表示已经有其他broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选
  • 如果Zookeeper不存在/controller 节点,或者这个节点的数据异常,那么就会尝试去创建/controller 节点,创建成功的那个 broker 就会成为控制器

  • 每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为 activeControllerId

  • Zookeeper 中还有一个与控制器有关的/controller_epoch 节点,这个节点是持久节点,节点中存放的是一个整型的 controller_epoch 值。controller_epoch 值用于记录控制器发生变更的次数

  • controller_epoch 的初始值为1,即集群中的第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1

  • 每个和控制器交互的请求都会携带 controller_epoch 这个字段,

    • 如果请求的 controller_epoch 值小于内存中的 controller_epoch值认为这个请求是向已经过期的控制器发送的请求,那么这个请求会被认定为无效的请求
    • 如果请求的 controller_epoch 值大于内存中的 controller_epoch值,那么说明已经有新的控制器当选

什么情况下 Kafka 会丢失消息?

  • Kafka 有三次消息传递的过程:生产者发消息给 BrokerBroker 同步消息和持久化消息Broker 将消息传递给消费者.这其中每一步都有可能丢失消息.
    • 生产者发消息给 Broker
      • acks 为 0,只要服务端写消息时出现任何问题,都会导致消息丢失
      • acks 配置为 1 时,生产者发送消息,只要 leader 副本成功写入消息,就代表成功。这种方案的问题在于,当返回成功后,如果 leader 副本和 follower 副本还没有来得及同步,leader 就崩溃了,那么在选举后新的 leader 就没有这条消息,也就丢失了
    • Broker 同步消息和持久化消息;
      • Broker 存储数据:Kafka 通过 Page Cache 将数据写入磁盘。
      • Page Cache 就是当往磁盘文件写入的时候,系统会先将数据流写入缓存中,但是什么时候将缓存的数据写入文件中是由操作系统自行决定。所以如果此时机器突然挂了,也是会丢失消息的
    • Broker 将消息传递给消费者;
      • 消费者消费数据:在开启自动提交 offset 时,只要消费者消费到消息,那么就会自动提交偏移量,如果业务还没有来得及处理,那么消息就会丢失

什么时候触发分区分配(Rebalance )策略?

  • 当出现以下几种情况时,Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作

    • 同一个 consumer 消费者组 group.id 中,新增了消费者进来,会执行 Rebalance 操作
    • 消费者离开当期所属的 consumer group组。比如 主动停机 或者 宕机
    • 分区数量发生变化时(即 topic 的分区数量发生变化时)
    • 消费者主动取消订阅
  • Kafka 消费端的 Rebalance 机制,规定了一个 Consumer group 下的所有 consumer 如何达成一致来分配订阅 topic 的每一个分区。而具体如何执行分区策略,就是上面提到的 Range 范围分区 和 RoundRobin 轮询分区 两种内置的分区策略。Kafka 对于分区分配策略这块,也提供了可插拔式的实现方式,除了上面两种分区分配策略外,我们也可以创建满足自己使用的分区分配策略,即:自定义分区策略。

Kafka使用过程中可能出现的问题

消息丢失
  • 消息丢失会发生在生产端和消费端,生产端消息丢失是因为Broker未接收到生产端发送过来的消息或者未对消息持久化。消费端消息丢失是因为消费端未正确处理消息而提交了Offset。
  • 解决方案
    • 消息丢失在生产端和消费端均会发生。生产端ack设置为0时,生产者无需等待Broker的写入确认,当Leader所在Broker异常时生产者发送的消息均丢失。ack设置为1时,生产者只需等待Leader的写入确认,当Leader所在的Broker发生异常时Follower未同步的消息丢失。分区为单副本的情况下Broker宕机后会丢失消息。还有消费者设置了Offset自动提交,当消费者程序异常时也会丢失消息。解决丢失问题我们要从这三方面入手:
      • 分区至少设置两个副本,保证分区高可用性
      • 设置ack=-1,确保所有的副本均同步生产者的发送来的消息后再返回写入确认信息。做为消息系统使用必须设置为-1,日志收集类系统为提高吞吐量可以选择0或者1。
      • 消费者Offset手动提交。避免在消费者发生异常后丢失未正确消费的消息。Kafka作为消息系统使用必须设置手动。手动提交有三种方式:
        • 同步手动提交偏移量:同步模式下提交失败的时候会一直尝试提交,直到遇到无法重试的情况或者提交成功才会结束。这种方式消费者线程会发生阻塞,出现无法提交的情况时会导致消息积压。

        • 异步手动提交偏移量+回调函数:异步手动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进行重试,但可以配合回调函数在Broker做出响应的时候记录错误信息。

        • 异步+同步组合的方式提交偏移量:消费者进行异步提交并且在关闭时进行同步提交,这样即使上一次的异步提交失败,还可通过同步提交来进行补救。

重复消费
  • 可能产生重复消费的场景:
    • 强行kill线程,导致消费后的数据,offset没有提交,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会reblance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
    • 如果在close之前调用了consumer.unsubscribe()则有可能部分offset没提交,下次重启会重复消费。
    • Kafka数据重复Kafka设计的时候是设计了(at-least-once)至少一次的逻辑,这样就决定了数据可能是重复的,Kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
    • Kafka的数据重复一般情况下应该在消费者端,这时log.cleanup.policy = delete使用定期删除机制。
  • 解决方案
    • 采用关系型数据库的事务性来保证精确一次更新,比如将消息的Offset或者唯一ID和下游状态更新放在一个库里,通过关系型数据事务性来保证精确一次更新。这种方式适用于消息量较小的系统。
    • 为提高消费效率可以采用Redis等非关系数据库,将消息Offset或者业务主键放入Redis,消费者拉取消息后与Redis存储的消息信息做校验,根据校验结果决定是否消费拉取的消息。
    • 使用状态机。通常在订单类业务系统中会涉及。使用状态机的前提是系统中的状态是有限的,比如一个订单有提交订单、支付(成功、失败)、发货、确认收货等几种状态。状态机的作用就是记录当前订单所处的状态并为下一次变更提供状态校验。比如状态机收到一个状态的变更,而此次变更为状态机记录的前一个状态,那么此次变更视则为无效变更。通过状态机,能够保证有限状态的幂等。
    • 选择具有天然幂性的组件,比如Elasticsearch、Hbase等。
消息积压
  • 消息积压通常是因为数据量突然增大,消费端消费能力不足导致。这里还包括集群异常导致消费端无法正常消费而造成的积压。在Kafka的使用中消息积压也是经常碰到的问题之一,比如消息量突然增大后消费者无法及时消费导致消息积压。
  • 解决方案
    • 通常的做法是在上生产之前先预估一个最大数据量,根据该数据量和消费速度来预分一个分区数,并在预上线环境进行压力测试,根据压测结果动态增加分区数,在数据无积压后再适当增加分区,来应对后期数据增长。
    • 当遇到数据积压时,最简单的方式就动态扩展分区,并相应增加消费者进行消费,但这种方式存在破坏Message Key和分区对应关系的风险。另外一个途径是新建一个主题将积压的数据快速消费到新的主题,然后再增加新的消费者来消费新主题中的数据,缓解积压情况。
      • 这里需要说明一下不是分区越多越好,分区数越多平均到每一个Broker上的数量越多,Broker压力也会越大堆内存占用也越多,在Broker宕机后Leader选举会占用更多时间,选举期间读写都会发生异常可能造成数据丢失。
顺序消费
  • Kafka是一个队列,理论上消费顺序是不会错乱的。但是由于分布式的原因,Kafka的顺序消息只在单个分区内保证,整个主题消息的顺序性是无法保证的。因此,在多个消费者多分区的情况下,无法保证全局消息被顺序消费。
  • 解决消息的顺序问题,可以将相同业务主键的消息发送到同一分区上,来保证相同业务主键消息的顺序性。Kafka在确定消息应该发送到那个分区是通过消息的Key取Hash决定的。因此在生产消息时指定消息的Key值(可以是订单ID或者业务组合ID),来保证具有相同业务属性的消息被发送到同一个分区。对于全局顺序消息可为主题设置单一分区来解决,但这种方式不适用数据量大的系统,容易造成数据积压。

Zookeeper

  • Controller

    • Controller 是从 Broker 中选举出来的,负责分区 Leader 和 Follower 的管理。当某个分区的 leader 副本发生故障时,由 Controller 负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR(In-Sync Replica)集合发生变化时,由控制器负责通知所有 broker 更新其元数据信息。当使用kafka-topics.sh脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
    • Kafka 中 Contorller 的选举的工作依赖于 Zookeeper,成功竞选为控制器的 broker 会在 Zookeeper 中创建/controller这个临时(EPHEMERAL)节点。
    • 选举过程
      • Broker启动的时候尝试去读取/controller节点的brokerid的值;
      • 如果brokerid的值不等于-1,则表明已经有其他的Broker成功成为Controller节点,当前Broker主动放弃竞选;
      • 如果不存在/controller节点,或者brokerid数值异常,当前Broker尝试去创建/controller这个节点,此时也有可能其他broker同时去尝试创建这个节点;
      • 只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败.每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId.
    • 实现
      • Controller读取Zookeeper中节点数据,初始化上下文(Controller Context),并管理节点变化,变更上下文,同事也需要将这些变更信息同步到其他普通的broker节点中.
      • Controller通过定时任务,或者监听器模式获取Zookeeper信息,事件监听会更新上下文信息,Contronller内部也采用生产者-消费者实现模式,Controller将Zookeeper的变动通过事件的方式发送给事件队列,队列就是一个LinkedBlockQueue,事件消费者线程组通过消费时间,将相应的事件同步到各个Broker节点.
    • 职责
      • Controller被选举出来,作为整个Broker集群的管理者,管理所有的集群信息和员数据信息.主要责任包括:
        • 处理 Broker 节点的上线和下线,包括自然下线、宕机和网络不可达导致的集群变动,Controller 需要及时更新集群元数据,并将集群变化通知到所有的 Broker 集群节点;
        • 创建 Topic 或者 Topic 扩容分区,Controller 需要负责分区副本的分配工作,并主导 Topic 分区副本的 Leader 选举。
        • 管理集群中所有的副本和分区的状态机,监听状态机变化事件,并作出相应的处理。Kafka 分区和副本数据采用状态机的方式管理,分区和副本的变化都在状态机内会引起状态机状态的变更,从而触发相应的变化事件。
  • 分区状态机

    • PartitionStateChange:管理Topic的分区,它有以下4中状态:

      • NonExistentPartition:该状态表示分区没有被创建过或创建后被删除了。
      • NewPartition:分区刚创建后,处于这个状态。此状态下分区已经分配了副本,但是还没有选举 leader,也没有 ISR 列表。
      • OnlinePartition:一旦这个分区的 leader 被选举出来,将处于这个状态。
      • OfflinePartition:当分区的 leader 宕机,转移到这个状态。
    • 状态之间的切换

      1
      2
      3
      4
      5
      6
      graph TB
      NonExistentPartition -->|从zk加载分配的replicas|NewPartition
      NewPartition-->|1. 将第一个replica作为leader,其余ISR,更新数据到zk,2. 发送LeaderAndLsr请求给存活的replica,发送UpdateMetaData请求给存活的Broker| OnlinePartition
      OnlinePartition-->|标记分区下线|OfflinePartition
      OfflinePartition-->|1. 选举新的leader更新zk,2. 发送LeaderAndLsr请求给存活的replica,发送UpdateMetaData请求给存活的Broker|OnlinePartition
      OfflinePartition-->|标记分区不存在|NonExistentPartition
  • 副本状态机

    • ReplicaStateChange,副本状态,管理分区副本信息,它也有 4 种状态:

      • NewReplica: 创建 topic 和分区分配后创建 replicas,此时,replica 只能获取到成为 follower 状态变化请求。
      • OnlineReplica: 当 replica 成为 parition 的 assingned replicas 时,其状态变为 OnlineReplica, 即一个有效的 OnlineReplica。
      • OfflineReplica: 当一个 replica 下线,进入此状态,这一般发生在 broker 宕机的情况下;
      • NonExistentReplica: Replica 成功删除后,replica 进入 NonExistentReplica 状态。
    • 状态之间的切换

      1
      2
      3
      4
      5
      6
      graph TD
      NonExistentReplica -->|向新的replica发送LeaderAndLsr请求,告诉它当前Leader和LSR信息并为分区发送UpdateMetaData请求到存活的Broker|NewReplica
      NewReplica-->|如果需要的话,将新的replica添加到assigned replica list| OnlineReplica
      OnlineReplica-->|1.向replica发送StopReplicaRequest请求,2.从LSR中移除当前Replica,想Leader副本发送LeaderAndLsr请求,并为分区发送UpdateMetaData请求到存活的Broker|OfflineReplica
      OfflineReplica-->|向新的replica发送LeaderAndLsr请求,告诉它当前leader和lsr信息并为分区发送UpdateMetaData请求到存活的Broker|OnlineReplica
      OfflineReplica-->|向replica发送StopReplicaRequest|NonExistentReplica