探秘 Kafka 高可用:副本复制机制

Posted by 陈树义 on 2019-01-09

我们在 消息队列的三大重点:幂等性、有序性、可靠性 - 陈树义的博客 中说到,消息可靠性是 Kafka 中一个非常重要的点。而要保证消息可靠性,kafka 的副本复制机制是非常重要的。

我们都知道 Kafka 是分为 topic 和 partition 的,一个 topic 有多个 partition。而副本复制机制就是每个 partition 有多个副本 partition,每个副本 partition 定期从主 partition 备份数据。通过这种方式可以保证主 partition 数据丢失后,可以从副 partition 中恢复数据。

旧版本的消息存储

在 Kafka 0.8 版本之前,如果我们要使用 Kafka 消息队列来实现上面所说的功能解耦。为了让大家更好地理解 Kafka 的副本复制机制,我们创建 Topic A 至 D 代表各个类型的消息,每个 Topic 使用 3 个 Partition 来保存数据。此时,Kafka 的架构图如下所示:

image.png

在上图中,我们使用 3 台 Kafka 服务器来保存 6 个 Topic 的所有消息。其中每个 Kafka 服务器各存储两个 Topic 类型的消息,每个 Topic 有 3 个消息分区。

但上图的存储架构有一个致命的问题:如果其中一台 Kafka 服务器宕机了,那么这台机器上的所有消息将不可用,并且在宕机期间所有业务系统将无法写入消息。例如:Topic A 所在的机器宕机了,那么 Topic A 和 Topic B 的数据都不可读了。也就是说在 Kafka 0.8 版本之前,Kafka 集群是无法实现高可用的。 这对于海量请求的互联网应用来说,这几乎是无法承受的损失。

也正是因为这个原因,所以 Kafka 在初期的时候一般用于日志收集这类对数据安全和高可用要求较低的地方。

新版本的消息存储

为了进一步提高 Kafka 的可用性,Kafka 在 0.8 版本时提供了 High Availability 机制(高可用机制)。通过副本复制机制的实现,即使一个或多个 Kafka 服务器宕机,也能正常提供服务。

Kafka 推出的副本复制机制指的是以 Topic 的 Partition 为基本单位,针对每一个 Partition 创建指定份数的副本,并存储在 Kafka 集群的其他 Kafka 服务器中。 针对每一个 Partition,根据 Zookeeper 的选举算法,选出某个 Partition 作为 Leader。所有针对该 Partition 的读写操作都与该 Partition Leader 进行交互,其他副本则定时从 Leader 中拉取数据同步。同时,为了避免副本上的数据落后 Leader 太多,Leader 会维护一个名为 ISR(In-Sync Replica)的列表。如果一个副本落后 Leader 太多,那么它将会从 ISR 中被剔除。

对于我们上面 6 个 Topic 的场景,默认每个 topic 设置 2 个副本。如果我们使用 Kafka 0.8 版本之后提供的副本机制,那么此时 Kafka 的架构图如下所示:

image.png

从上图可以看出:与 0.8 版本之前的存储方式相比,0.8 版本之后的数据存取更加分散。每个 Partition 的数据均匀地散布在各台机器上,例如 Topic A Partition 1 存储在第一个 Kafka 服务器 上,它作为该 Partition 的 Leader,负责数据的读写。而 Topic A Partition 1 的两个副本分别存在另外两台 Kafka 服务器上,不断地从第一台服务器的 Topic A Partition 1 上拉取数据。如果发生意外造成 Kafka Server A 宕机,那么另外两台机器上的 Topic 分区副本将会选区出一个作为 Leader 继续对外提供服务。

在上面说到的 Kafka 架构中,有两个参数是比较重要的:

  • num.partitions :设置每个Topic的默认分区数。这个参数决定了每个 Topic 将会有几个分区,也决定了这个 Topic 的消费速度,因为一个 Partition 只能被一个线程消费。所以如果我们针对 Order 这个 Topic 只创建了一个 Partition,那么所有的订单消息都会推送到这个 Partition 中,而我们只能启动一个线程消费所有这些消费,这样就会导致消费速度很慢。但如果我们设置多个分区,那么这些消息就会均匀地散步在各个 Partition 中,我们也可以启动多个线程并发地消费多个 Partition 中的消息,从而加快消费速度。在上面的架构图中,我们设置了每个 Topic 的默认分区数是 3,所以你会看到每个 Topic 有三个分区。
  • default.replication.factor :设置每个分区的默认副本数。该数值不能大于集群的服务器个数,因为大于集群服务器个数就没意义了。在上面的架构图中,我们设置每个分区的默认副本数是 2,所以你可以看到每个分区在另外两个 Kafka 服务器上都有相应的副本。

Kafka 用副本复制机制实现了数据的高可用,从而避免了服务器宕机带来的数据损失。但 Kafka 的副本机制究竟是如何工作的呢?每个副本又是根据什么算法分配到 Kafka 服务器上的呢?

副本分配算法

为了更好地做负载均衡,Kafka 的分配算法尽量将所有的 Partition 均匀分配到整个集群上。同时为了提高 Kafka 的容错能力,也需要将同一个 Partition 的 Replica 尽量分散到不同的机器。因为如果所有的 Replica 都在同一个 Broker 上,那一旦该 Broker 宕机,该 Partition 的所有 Replica 都无法工作,也就达不到 HA 的效果。

所以为了达到 HA 的效果,Kafka 分配 Replica 的算法如下:

  • 将所有 Broker(假设共n个Broker)和待分配的Partition排序
  • 将第 i 个 Partition 分配到第(i mod n)个 Broker 上
  • 将第 i 个 Partition 的第 j 个 Replica 分配到第((i + j) mod n)个 Broker 上

对于上面的例子,我们 Replica 的分配演示如下:

按照算法第一步:将所有 Broker(假设共n个Broker)和待分配的Partition排序

  • 将 Broker 按顺序排开,将待分配的 Partition 排开,分别是:Topic A Partition 1、Topic A Partition 2、Topic A Partition 3、Topic B Partition 1、Topic B Partition 2、Topic B Partition 3……Topic F Partition 3。

接着第二步:将第 i 个 Partition 分配到第(i mod n)个 Broker 上,这里有 3 个 broker,因此 n = 3。

  • 将第 1 个 Partition(Topic A Partition 1)分配到第 1 个 Broker 上(1 mod 3)
  • 将第 2 个 Partition(Topic A Partition 2)分配到第 2 个 Broker 上(2 mod 3)
  • 将第 3 个 Partition(Topic A Partition 3)分配到第 0 个 Broker 上(3 mod 3)
  • 将第 4 个 Partition(Topic B Partition 1)分配到第 1 个 Broker 上(4 mod 3)
  • ……
  • 将第 18 个 Partition(Topic F Partition 3)分配到第 0 个 Broker 上(18 mod 3)

接着第三步:将第 i 个 Partition 的第 j 个 Replica 分配到第((i + j) mod n)个 Broker 上

  • 将第 1 个 Partition 的第 1 个 Replica 分配到第 2 个 Broker 上((1+1)mod 3)。
  • 将第 1 个 Partition 的第 2 个 Replica 分配到第 0 个 Broker 上((2+1)mod 3)。
  • 将第 2 个 Partition 的第 1 个 Replica 分配到第 0 个 Broker 上((2+1)mod 3)。
  • 将第 2 个 Partition 的第 2 个 Replica 分配到第 1 个 Broker 上((2+2)mod 3)。
    ……

按照上面所描述的算法,我们可以画出如下图所示的 6 个 Topic 的副本存储架构图(下面的图和上面的图是一样的)。每个 Partition 均匀地散步在各个 Kafka 服务器上,而且每个 Kafka 服务器上的副本数量也大致相当。

image.png

副本同步机制

Producer 在发布消息到某个 Partition 时,先通过 Zookeeper 找到该 Partition的 Leader。无论该 Topic 的 Replication Factor 为多少(即该 Partition 有多少个 Replica),Producer 只将该消息发送到该 Partition Leader。

Partition Leader 会将该消息写入内存,之后等待刷盘线程写入磁盘。之后每个 Partition Follower 都从 Partition Leader 拉取数据。通过这种数据同步方式,Partition Follower 上数据的存储顺序与 Partition Leader 保持一致。

Partition Follower 在收到该消息并写入其 Log 后,向 Partition Leader 发送 ACK。一旦 Partition Leader 收到了 ISR 中的所有副本的 ACK,该消息就被认为已经 commit。Partition Leader 将增加 HW(High Water,水位) 并且向 Producer 发送 ACK。

为了提高性能,每个 Follower 在接收到数据后就立马向 Leader 发送 ACK,而非等到数据写入 Log 中。因此对于已经 commit 的消息,Kafka 只能保证它被存在于多个副本的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被 Consumer 消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。在将来的版本中,Kafka会考虑提供更高的持久性。

而在 Kafka 的副本同步机制中,针对不同的使用场景,Producer 可以通过配置 request.required.acks 来选择不同的数据一致性等级。

  • request.required.acks = 0 表示 Producer 不等待来自 Leader 的 ACK 确认,直接发送下一条消息。在这种情况下,如果 Leader 分片所在服务器发生宕机,那么这些已经发送的数据会丢失。
  • request.required.acks = 1 表示 Producer 等待来自 Leader 的 ACK 确认,当收到确认后才发送下一条消息。在这种情况下,消息一定会被写入到 Leader 服务器,但并不保证 Follow 节点已经同步完成。所以如果在消息已经被写入 Leader 分片,但是还未同步到 Follower 节点,此时Leader 分片所在服务器宕机了,那么这条消息也就丢失了,无法被消费到。
  • request.required.acks = -1 表示 Producer 等待来自 Leader 和所有 Follower 的 ACK 确认之后,才发送下一条消息。在这种情况下,无论什么情况都不会发生消息的丢失,除非所有 Follower 节点都宕机了。

上面三种不同值的设置,性能依次递减,但数据健壮性则依次递增。

Leader选举问题

假设当 Partition Leader 所在服务器宕机了,那如何在剩下的 Partition Follower 中选举出新的 Partition Leader?因为 Partition Follower 可能落后许多或者其所在机器也宕机了,所以必须确保选择「最新」的 Partition Follower 作为新的 Partition Leader。

一个基本的原则是:如果 Partition Leader 不在了,新的 Partition Leader 必须拥有原来的 Partition Leader commit 过的所有消息。 这就需要作一个折中,如果 Partition Leader 在标明一条消息被 commit 前等待更多的 Partition Follower 确认,那在它宕机后就有更多的 Partition Follower 可以作为新的 Partition Leader。但这就意味着 Partition Leader 必须每次等待更长时间,这就意味着系统吞吐率的下降。  

Majority Vote 算法

其实 Leader 选举问题普遍存在与各种分布式中间件中,其中一种非常常用的 Leader 选举方式是 Majority Vote(少数服从多数)。Majority Vote 方式指的是:如果我们有 2f+1 个副本(包含Leader和Follower),那在 commit 之前必须保证有 f+1 个副本复制完消息。同时为了保证正确选出新的 Leader,失败的副本数不能超过 f 个(因为超过了 f 个失败,那么就无法达到投票成立的 f+1 张票)。

Majority Vote 这种方式有个很大的优势,系统的延迟只取决于最快的几个Broker,而非最慢那个。但 Majority Vote 也有一些劣势,为了保证 Leader 选举的正常进行,它所能容忍失败的 follower 个数比较少。如果要容忍 1 个 follower 挂掉,必须要有 3 个以上的副本。如果要容忍 2 个 follower 挂掉,必须要有 5 个以上的 副本。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的副本。而大量的副本又会在大数据量下导致性能的急剧下降。

所以 Majority Vote 一般用在存储少量数据的分布式系统中(例如 Zookeeper),而不用在需要存储大量数据的分布式系统中。因为如果需要存储大量数据,而为了保证高可用又必须存在大量的副本,那副本之间的数据同步就成为一个头疼的问题。

PacificA 算法

事实上,除了 Majority Vote 之外,还有其他很多其他 Leader 选举算法。例如 Zookeeper 的 Zab 选举算法、Raft 选举算法、Viewstamped Replication。而 Kafka 所使用的 Leader Election 算法更像微软的 PacificA 算法。

Kafka 在 Zookeeper 中动态维护了一个 ISR 列表(In-Sync Replicas 保持同步的副本集)。这个 ISR 里的所有副本都跟上了 leader,只有 ISR 里的副本才有被选为 Leader的可能。在这种模式下,对于 f+1 个副本的情况,1 个 Partition 能够在保证不丢失已经 commit 的消息的前提下容忍 f 个副本的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍 f 个副本的失败,Majority Vote 和 ISR 在 commit 前需要等待的副本数量是一样的,但是 ISR 需要的总的副本的个数几乎是 Majority Vote 的一半。

所有副本都不工作

在 ISR 中至少有一个 Partition 副本时,Kafka 可以确保已经 commit 的数据不丢失。但如果某个 Partition 的所有副本都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

  • 方案一:等待 ISR 中的任一个副本「活」过来,并且选它作为 Partition Leader
  • 方案二:选择第一个「活」过来的副本(不一定是ISR中的)作为 Partition Leader。

这就需要在可用性和一致性当中作出一个简单的折中。如果一定要等待 ISR 中的副本「活」过来,那不可用的时间就可能会相对较长。而且如果 ISR 中的所有副本都无法「活」过来了,或者数据都丢失了,这个 Partition 将永远不可用。

而选择第一个「活」过来的副本作为 Leader,而这个副本并不是 ISR 中的副本。那可能会丢失一些数据,但是不可用的时间相对较短。这种情况下,它会成为 Partition Leader 并作为consumer 的数据源。

目前 Kafka0.8.* 使用了第二种方式(第一个活过来的作为副本),即 Kafka 默认选择了高可用。据悉后续 Kafka 将支持自定义配置,让用户根据不同的使用场景选择高可用性还是强一致性。