Kafka系列(三)消费者

Kafka作为消息队列,有消息生产者和消息消费者两大功能。其中如果消费者的groupid相同,则就是一个消费者。

一、消费者组介绍

消费者只有groupid相同,即是同一个消费者组。

1.1消费者组的特点

  • 一个分区只能由一个组内的一个消费者消费
  • 消费者组之间互不影响
  • 如果消费者与corrdinator超时45s或消费者处理消息大于5分钟,会触发再平衡机制

二、消费者模式

2.1独立消费者模式

架构能力Kafka系列(三)消费者插图
package kafka.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //1 创建一个消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
        //2 订阅主题
        ArrayList topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        //3消费数据
        while(true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            consumerRecords.forEach(cc -> {
                System.out.println(cc);
            });
        }
    }
}
架构能力Kafka系列(三)消费者插图1

三、消费者消费一个指定分区

同一Topic的分区数据,只能由同一个消费者组中的某一个消费者消费

例如同一主题有三个分区,消费组也有三个。此时消费者组的每个消费者,只会消费同一主题的某一个分区。

package kafka.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerPart {
    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //1 创建一个消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
        //2 订阅主题并指定分区
        ArrayList topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("first",0));
        kafkaConsumer.assign(topicPartitions);
        
        //3消费数据
        while(true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            consumerRecords.forEach(cc -> {
                System.out.println(cc);
            });
        }
    }
}
架构能力Kafka系列(三)消费者插图2

四、消费者分配策略

消费者的分配策略主要分为以下三种情况:

  • Range
  • RoundRobin
  • Sticky

4.1 Range(默认情况)

例如现在某个topic有7个分区,3个消费者。则根据Range策略,分配结果“ [012,34,56] ”。

如果当消费者1超时15s或者处理时间超过45s,会被剔除。则原有的[012]分区将由消费者2承担,最终结果消费者2负责消费[01234],会出现数据倾斜情况。

package kafka.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerPart {
    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        /*配置分区策略 Range*/
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RangeAssignor");
        //配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //1 创建一个消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
        //2 订阅主题
        ArrayList topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        //3消费数据
        while(true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            consumerRecords.forEach(cc -> {
                System.out.println(cc);
            });
        }
    }
}

4.2 RoundRobin

例如,如果有7个分区,3个消费者。当其中一个消费者异常的时候,会重新分配

package kafka.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerPart {
    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        /*配置分区策略 Range*/
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        //配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //1 创建一个消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
        //2 订阅主题
        ArrayList topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        //3消费数据
        while(true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            consumerRecords.forEach(cc -> {
                System.out.println(cc);
            });
        }
    }
}

4.3 Sticky粘性分区

随机分配,例如如果7个分区,3个消费者,会随机分配。异常时,随机分配。

package kafka.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerPart {
    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        /*配置分区策略 Range*/
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
        //配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //1 创建一个消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
        //2 订阅主题
        ArrayList topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        //3消费数据
        while(true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            consumerRecords.forEach(cc -> {
                System.out.println(cc);
            });
        }
    }
}

五、Offset消费

offset消费有以下三种情况

  • latest(默认值, 从最新启动的offset开始 ) : 自动将偏移量重置为最新偏移量
  • earliest( 从头开始消费 ): 自动将偏移量重置为最早的偏移量 相当于–from-beginning
  • none: 如果未找到消费者组的先前偏移量,则向消费者抛出异常

5.1 指定Offset消费

指定从100开始消费

package kafka.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;

public class CustomConsumerSet {
    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //1 创建一个消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
        //2 订阅主题
        ArrayList topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        //保证分区分配反感已经制定完成
        while(assignment.size()==0){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            assignment = kafkaConsumer.assignment();
        }
        //指定消费的offset
        for(TopicPartition topicPartition : assignment){
            kafkaConsumer.seek(topicPartition,100);
        }
        
        //3消费数据
        while(true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            consumerRecords.forEach(cc -> {
                System.out.println(cc);
            });
        }
    }
}

5.2 指定时间消费

思路:我们找到指定时间对应的Offset,然后通过5.1的方法,即可实现指定时间段开始消费消息

package kafka.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class CustomConsumerSet {
    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //设置自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //设置提交时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
        //1 创建一个消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
        //2 订阅主题
        ArrayList topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        //保证分区分配已经制定完成
        while(assignment.size()==0){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            assignment = kafkaConsumer.assignment();
        }
        //把时间转换成对应的Offset
        HashMap<TopicPartition,Long> topicPartitionLongHashMap = new HashMap<>();
        //封装对应的集合
        for(TopicPartition topicPartition : assignment){
            kafkaConsumer.seek(topicPartition,System.currentTimeMillis()-1*24*3600*1000);
        }
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);
        //指定消费的Offset
        for(TopicPartition topicPartition : assignment){
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
            kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
        }

        //3消费数据
        while(true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            consumerRecords.forEach(cc -> {
                System.out.println(cc);
            });
        }
    }
}
架构能力Kafka系列(三)消费者插图3

5.3 自动提交Offset

package kafka.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;

public class CustomConsumerSet {
    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //设置自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //设置提交时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
        
        //1 创建一个消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
        //2 订阅主题
        ArrayList topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        //保证分区分配已经制定完成
        while(assignment.size()==0){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            assignment = kafkaConsumer.assignment();
        }
        //指定消费的offset
        for(TopicPartition topicPartition : assignment){
            kafkaConsumer.seek(topicPartition,100);
        }

        //3消费数据
        while(true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            consumerRecords.forEach(cc -> {
                System.out.println(cc);
            });
        }
    }
}
架构能力Kafka系列(三)消费者插图4

5.4 手动提交Offset

package kafka.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerSelf {
    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //设置手动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        
        //1 创建一个消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
        //2 订阅主题
        ArrayList topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        //3消费数据
        while(true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            consumerRecords.forEach(cc -> {
                System.out.println(cc);
            });
            //手动提交offset
            kafkaConsumer.commitSync();
        }
    }
}
架构能力Kafka系列(三)消费者插图5

六、重复消费问题和漏消息问题

架构能力Kafka系列(三)消费者插图6

使用消费者事务的方式解决问题
1、生产者使用幂等性和事物,ack=all,副本数大于等于2
2、消费者也要使用事务的方式。例如mysql+springboot的事务回滚

===数据积压问题(如何提交吞吐量)
1、增加Topic的分区数并同时提升消费组的消费者数量
2、提高每次拉取数量,默认是500条,提高到1千条