Kafka生产者-分区器

tech2023-12-28  69

Kafka默认使用Hash分区策略,即根据消息的key进行分区分配,hash(key)%numPartitions。如果key具有hash,则会分配到同一个分区中去。

源代码org.apache.kafka.clients.producer.internals.DefaultPartitioner

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.kafka.clients.producer.internals; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; /** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose a partition in a round-robin fashion */ public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<String, ?> configs) {} /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 获取Topic的分区信息 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); // 获取分区数量 int numPartitions = partitions.size(); // 如果传递的key为空,则判断活跃分区数量,最终根据nextValue值进行hash取模 if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } // 如果传递的key部位空,则根据key的hash值与分区数量取余得到具体要传递的分区 } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } public void close() {} }

自定义分区器Java实现

package com.example.kafkaproducer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** * @author Admin */ public class DefinePartitioner implements Partitioner { // 定义一个AtomicInteger计数器 private final AtomicInteger counter = new AtomicInteger(0); @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 获取topic的分区信息 List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); // 获取分区数量 int numPartitions = partitionInfos.size(); // 如果传递的值为null if (null == keyBytes) { // 获取计数器的当前值并与分区数量取模 return counter.getAndIncrement() % numPartitions; } else { // 否则使用key的hash值与分区数量取模 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }

自定义分区器的使用

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefinePartitioner.class.getName());
最新回复(0)