Kafka作为消息队列,有消息生产者和消息消费者两大功能。其中如果消费者的groupid相同,则就是一个消费者。
菜单
一、消费者组介绍
消费者只有groupid相同,即是同一个消费者组。
1.1消费者组的特点
- 一个分区只能由一个组内的一个消费者消费
- 消费者组之间互不影响
- 如果消费者与corrdinator超时45s或消费者处理消息大于5分钟,会触发再平衡机制
二、消费者模式
2.1独立消费者模式
package kafka.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//1 创建一个消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
//2 订阅主题
ArrayList topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
//3消费数据
while(true) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(cc -> {
System.out.println(cc);
});
}
}
}
三、消费者消费一个指定分区
同一Topic的分区数据,只能由同一个消费者组中的某一个消费者消费
例如同一主题有三个分区,消费组也有三个。此时消费者组的每个消费者,只会消费同一主题的某一个分区。
package kafka.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerPart {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//1 创建一个消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
//2 订阅主题并指定分区
ArrayList topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first",0));
kafkaConsumer.assign(topicPartitions);
//3消费数据
while(true) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(cc -> {
System.out.println(cc);
});
}
}
}
四、消费者分配策略
消费者的分配策略主要分为以下三种情况:
- Range
- RoundRobin
- Sticky
4.1 Range(默认情况)
例如现在某个topic有7个分区,3个消费者。则根据Range策略,分配结果“ [012,34,56] ”。
如果当消费者1超时15s或者处理时间超过45s,会被剔除。则原有的[012]分区将由消费者2承担,最终结果消费者2负责消费[01234],会出现数据倾斜情况。
package kafka.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerPart {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/*配置分区策略 Range*/
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RangeAssignor");
//配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//1 创建一个消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
//2 订阅主题
ArrayList topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
//3消费数据
while(true) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(cc -> {
System.out.println(cc);
});
}
}
}
4.2 RoundRobin
例如,如果有7个分区,3个消费者。当其中一个消费者异常的时候,会重新分配
package kafka.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerPart {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/*配置分区策略 Range*/
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
//配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//1 创建一个消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
//2 订阅主题
ArrayList topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
//3消费数据
while(true) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(cc -> {
System.out.println(cc);
});
}
}
}
4.3 Sticky粘性分区
随机分配,例如如果7个分区,3个消费者,会随机分配。异常时,随机分配。
package kafka.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerPart {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/*配置分区策略 Range*/
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
//配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//1 创建一个消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
//2 订阅主题
ArrayList topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
//3消费数据
while(true) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(cc -> {
System.out.println(cc);
});
}
}
}
五、Offset消费
offset消费有以下三种情况
- latest(默认值, 从最新启动的offset开始 ) : 自动将偏移量重置为最新偏移量
- earliest( 从头开始消费 ): 自动将偏移量重置为最早的偏移量 相当于–from-beginning
- none: 如果未找到消费者组的先前偏移量,则向消费者抛出异常
5.1 指定Offset消费
指定从100开始消费
package kafka.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;
public class CustomConsumerSet {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//1 创建一个消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
//2 订阅主题
ArrayList topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = kafkaConsumer.assignment();
//保证分区分配反感已经制定完成
while(assignment.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
//指定消费的offset
for(TopicPartition topicPartition : assignment){
kafkaConsumer.seek(topicPartition,100);
}
//3消费数据
while(true) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(cc -> {
System.out.println(cc);
});
}
}
}
5.2 指定时间消费
思路:我们找到指定时间对应的Offset,然后通过5.1的方法,即可实现指定时间段开始消费消息
package kafka.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class CustomConsumerSet {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//设置自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//设置提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
//1 创建一个消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
//2 订阅主题
ArrayList topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = kafkaConsumer.assignment();
//保证分区分配已经制定完成
while(assignment.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
//把时间转换成对应的Offset
HashMap<TopicPartition,Long> topicPartitionLongHashMap = new HashMap<>();
//封装对应的集合
for(TopicPartition topicPartition : assignment){
kafkaConsumer.seek(topicPartition,System.currentTimeMillis()-1*24*3600*1000);
}
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);
//指定消费的Offset
for(TopicPartition topicPartition : assignment){
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
//3消费数据
while(true) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(cc -> {
System.out.println(cc);
});
}
}
}
5.3 自动提交Offset
package kafka.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;
public class CustomConsumerSet {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//设置自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//设置提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
//1 创建一个消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
//2 订阅主题
ArrayList topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = kafkaConsumer.assignment();
//保证分区分配已经制定完成
while(assignment.size()==0){
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
//指定消费的offset
for(TopicPartition topicPartition : assignment){
kafkaConsumer.seek(topicPartition,100);
}
//3消费数据
while(true) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(cc -> {
System.out.println(cc);
});
}
}
}
5.4 手动提交Offset
package kafka.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerSelf {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//设置手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//1 创建一个消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
//2 订阅主题
ArrayList topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
//3消费数据
while(true) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(cc -> {
System.out.println(cc);
});
//手动提交offset
kafkaConsumer.commitSync();
}
}
}
六、重复消费问题和漏消息问题
使用消费者事务的方式解决问题
1、生产者使用幂等性和事物,ack=all,副本数大于等于2
2、消费者也要使用事务的方式。例如mysql+springboot的事务回滚
===数据积压问题(如何提交吞吐量)
1、增加Topic的分区数并同时提升消费组的消费者数量
2、提高每次拉取数量,默认是500条,提高到1千条