Kafka 生产者分区策略详解

Posted by 陈树义 on 2020-12-16

我们都知道 Kafka 的一个 Topic 有多个 Partition,那么生产者将消息丢入消息队列的时候,到底是使用什么策略放入哪个 Partition 的呢?

Kafka 的分区策略

我们知道向 Kafka 消息队列发送消息都是通过 KafkaProducer.send() 方法实现的。其实 Kafka 的分区策略就藏在 send() 方法中。通过跟踪 send 方法,发现 KafkaProducer 是通过内部的私有方法 doSend 来发送消息的,里面有一行代码:

int partition = partition(record, serializedKey, serializedValue, cluster);

这行代码的功能其实就是选择分区,partition 方法的代码逻辑如下:

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
   Integer partition = record.partition();
   return partition != null ?partition :partitioner.partition(record.topic(),   record.key(), serializedKey, record.value(), serializedValue, cluster);
}

从上面的逻辑我们可以看出:如果 record 指定了分区策略,则指定的分区策略会被使用。如果没有指定分区策略,就使用默认的 DefaultPartitioner 分区策略。我们可以在创建 KafkaProducer 时传入 Partitioner 的实现类来实现自定义分区。

默认的分区策略

Kafka 的默认分区策略可以分为两种情况:消息 Key 为 null、消息 Key 不为 null。这里说的 key 就是我们将消息丢入 Kafka 时传入的一个参数。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        // key 为空的情况
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // key 不为空的情况
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

消息 Key 为 null

如果 key 为 null,则先根据 topic 名获取上次计算分区时使用的一个整数并加一。

接着判断 topic 的可用分区数是否大于 0,如果大于 0 则使用获取的 nextValue 的值和可用分区数进行取模操作。 如果 topic 的可用分区数小于等于 0,则用获取的 nextValue 的值和总分区数进行取模操作(其实就是随机选择了一个不可用分区)。

int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
    int part = Utils.toPositive(nextValue) % availablePartitions.size();
    return availablePartitions.get(part).partition();
} else {
    // no partitions are available, give a non-available partition
    return Utils.toPositive(nextValue) % numPartitions;
}

消息 Key 不为 null

如果消息 Key 不为 null,就是根据 hash 算法 murmur2 就算出 key 的 hash 值,然后和分区数进行取模运算。

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

总结

对于 Kafka 生产者来说,如果指定了分区策略类,那么会按照分区策略类执行。如果不手动指定分区选择策略类,则会使用默认的分区策略类(DefaultPartitioner)。

在默认分区策略下,如果不指定消息的 key,则消息发送到的分区是随着时间不停变换的。
如果指定了消息的 key,则会根据消息的 hash 值和 topic 的分区数取模来获取分区的。

如果应用有消息顺序性的需要,则可以通过指定消息的 key 和自定义分区类来将符合某种规则的消息发送到同一个分区。同一个分区消息是有序的,同一个分区只有一个消费者就可以保证消息的顺序性消费。

参考资料