Kafka分区策略
创始人
2024-01-29 21:46:27
0

默认分区器DefaultPartitioner

(1)指明partition的情况下,直
接将指明的值作为partition值;

(2)没有指明partition值但有key的情况下,将key的hash值与topic的
partition数进行取余得到partition值;

例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那
么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。

(3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直
使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进
行使用(如果还是0会继续随机)。

自定义分区器

    1. 实现接口 Partitioner
    1. 实现 3 个方法:partition,close,configure
    1. 编写 partition 方法,返回分区号
      在这里插入图片描述
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** 1. 实现接口 Partitioner* 2. 实现 3 个方法:partition,close,configure* 3. 编写 partition 方法,返回分区号*/public class MyPartitioner implements Partitioner {/*** 返回信息对应的分区** @param topic      主题* @param key        消息的 key* @param keyBytes   消息的 key 序列化后的字节数组* @param value      消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster    集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//获取消息String msgValue = value.toString();//创建partitionint partition;//判断消息是否包含silenceif (msgValue.contains("silence")) {partition = 0;} else {partition = 1;}//返回分区号return partition;}//关闭资源@Overridepublic void close() {}//配置方法@Overridepublic void configure(Map configs) {}
}

使用分区器的方法,在生产者的配置中添加分区器参数
在这里插入图片描述

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerCallbackPartitions {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 添加自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.itsp.kafka.producer.MyPartitioner");// 3. 创建 kafka 生产者对象KafkaProducer kafkaProducer = new KafkaProducer<>(properties);// 4. 调用 send方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "call of silence" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e == null) {System.out.println("主题 : " + metadata.topic() + "->" + "分区" + metadata.partition());} else {e.printStackTrace();}}});}// 5. 关闭资源kafkaProducer.close();}
}

测试

开启 Kafka 消费者。

在这里插入图片描述

启动生产者,在 IDEA 控制台观察回调信息。

在这里插入图片描述

相关内容

热门资讯

微信聊天记录删了怎么恢复找回来...   相信很多人因为换手机和手机内存不足的情况下可能就会误删微信里面的聊天记录,其实有些聊天记录对我们...
白条怎么协商延期还款处理?逾期...   现在逾期是生活中经常发生的事情,在申请贷款后,要按照规定的时间进行还款,但是有很多人到还款的时候...
养老年金保险的优点与缺点?45...   养老保险是当下非常热门的一种理财保险,养老保险能直接蛮子养老需求,是大部分人都所需要的,但是没有...
急用钱征信不好哪里可以贷,支付...   现在的借款平台是越来越了,逾期的人也越来越普遍了,平台上最看重的就是个人的征信,征信好的话借钱就...
封闭理财到期自动到账吗?封闭理...   在我们选择理财的时候,很多用户都会选择流动性好的产品,一些会选择收益高的理财产品。每个人的需求是...
信用卡逾期多久会上征信?信用卡...   信用卡在生活中使用的次数也是非常多的,我们可以使用信用卡借钱贷款,但是有很多人在上面借钱贷款逾期...
公积金贷款有什么好处?公积金贷...   随着生活质量的不断提高,很多人都开始买属于自己的房子,大家买房都会选择使用公积金贷款的方式来买房...
微信零钱通有风险吗?微信零钱通...   相信大家都在微信里见到零钱通,其实它是一个理财产品,用户把钱转入零钱通确认之后即可产生收益,而且...
公积金贷款要满足哪些条件?公积...   现在很多人都是通过公积金贷款来购买房子,这样给自己带来很多便捷,公积金贷款的话利率也是相对低的,...
信用卡逾期5万多会坐牢吗? 信...   现在相信大家都应该有信用卡,现在信用卡逾期的情况是非常多的,银行对信用卡的管控是非常严格的,如果...