我们都知道 Kafka 的一个 Topic 有多个 Partition,那么生产者将消息丢入消息队列的时候,到底是使用什么策略放入哪个 Partition 的呢?
Kafka 的分区策略
我们知道向 Kafka 消息队列发送消息都是通过 KafkaProducer.send()
方法实现的。其实 Kafka 的分区策略就藏在 send()
方法中。通过跟踪 send 方法,发现 KafkaProducer 是通过内部的私有方法 doSend 来发送消息的,里面有一行代码:
int partition = partition(record, serializedKey, serializedValue, cluster);
这行代码的功能其实就是选择分区,partition 方法的代码逻辑如下:
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
从上面的逻辑我们可以看出:如果 record 指定了分区策略,则指定的分区策略会被使用。如果没有指定分区策略,就使用默认的 DefaultPartitioner 分区策略。我们可以在创建 KafkaProducer 时传入 Partitioner 的实现类来实现自定义分区。
默认的分区策略
Kafka 的默认分区策略可以分为两种情况:消息 Key 为 null、消息 Key 不为 null。这里说的 key 就是我们将消息丢入 Kafka 时传入的一个参数。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// key 为空的情况
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// key 不为空的情况
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
消息 Key 为 null
如果 key 为 null,则先根据 topic 名获取上次计算分区时使用的一个整数并加一。
接着判断 topic 的可用分区数是否大于 0,如果大于 0 则使用获取的 nextValue 的值和可用分区数进行取模操作。 如果 topic 的可用分区数小于等于 0,则用获取的 nextValue 的值和总分区数进行取模操作(其实就是随机选择了一个不可用分区)。
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
消息 Key 不为 null
如果消息 Key 不为 null,就是根据 hash 算法 murmur2 就算出 key 的 hash 值,然后和分区数进行取模运算。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
总结
对于 Kafka 生产者来说,如果指定了分区策略类,那么会按照分区策略类执行。如果不手动指定分区选择策略类,则会使用默认的分区策略类(DefaultPartitioner)。
在默认分区策略下,如果不指定消息的 key,则消息发送到的分区是随着时间不停变换的。
如果指定了消息的 key,则会根据消息的 hash 值和 topic 的分区数取模来获取分区的。
如果应用有消息顺序性的需要,则可以通过指定消息的 key 和自定义分区类来将符合某种规则的消息发送到同一个分区。同一个分区消息是有序的,同一个分区只有一个消费者就可以保证消息的顺序性消费。