Kafka系列(一)Kafka简介

一、Kafka的简介

Kafka:是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。

发布/订阅:消息发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的主题(topic),订阅着只接收感兴趣(topic)的消息

常用例子:数据埋点:记得购买者的行为数据(浏览、点赞、收藏、评论等)

架构能力Kafka系列(一)Kafka简介插图

二、消息队列的作用:

  • 缓存/削峰
  • 解耦(例如下单、扣库存、扣款)
  • 异步通讯

三、消息队列的两种模式

  • 点对点模式:消费者主动拉取数据,消息收到后清除消息
  • 发布/订阅模式:可以有多个topic主题;消费者消费数据后,不删除数据;每个消费者相互独立,都可以消费数据
架构能力Kafka系列(一)Kafka简介插图1

四、Kafka的集群模式

为了提高吞吐量,Kafka都会启用集群模式,并且每个主题多个分区多个副本。

多个分区:为了提高吞吐量;多个副本:为了保障数据安全

架构能力Kafka系列(一)Kafka简介插图2

主要在2.8后,Zookeeper将会被舍弃。使用Kraft

五、Kafka的安装(Zookeeper模式)

5.1环境准备

准备3台服务器,并且修改hostmane

5.2三台服务器安装Zookeeper

5.3下载Kafka(所用服务器)

https://kafka.apache.org/

下载Kafka的压缩包

5.3.1解压文件

shell>unzip kafka.zip /data/app/kafka

5.3.2 修改server.properties文件

shell>vim /data/app/kafka/config/server.properties
####修改broker.id(此ID不能重复),例如第一台服务器0,第二台设置成1....
borker.id=0
#####修改kafka日志保存位置
log.dirs=/data/app/kafka/logs
#####修改zookeeper.connect。例如三台服务器的hostname:hadoop102/hadoop103/hadoop104
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

5.3.3设置环境变量

shell>vim /etc/profile.d/my_env.sh

###KAFKA_HOME
export KAFKA_HOME=/data/app/kafka
export PATH=$KAFKA_HOME/bin

shell>source /etc/profile

6、启动kafka

shell>cd /data/app/kafka
shell>bin/kafka-server-start.sh -deamon config/server.properties

启动完成

六、创建批量开启和关停脚本

shell>cd /data/app/kafka
shell>vim kfk.sh
####content
#!/bin/sh

case $1 in
"start")
  for i in hadoop102 hadoop102 hadoop104
  do
    echo '----启动  $i kafka'
    ssh $i "/data/app/kafka/bin/kafka-server-start.sh -daemon /data/app/kafka/config/server.properties"
  done
;;
"stop")
  for i in  hadoop102 hadoop103 hadoop104
  do
    echo "---停止 $i kafka ---"
    ssh $i "/data/app/kafka/bin/kafka-server-stop.sh -daemon /data/app/kafka/config/server-properties"
  done
;;
esac
####脚本结束
shell>chome 777 kfk.sh

6.1 批量启动命令

shell>kfk.sh start

6.2 批量关停命令

shell>kfk.sh stop

七、Kafka的主题Topic

7.1 创建主题Topic

  • –topic:主题名
  • –partitions:分区(提升性能,提升效率)
  • –relication:副本数(数据安全,备份防丢失)
shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092,hadop103:9092,hadoop104:9092 --topic topicName --create --partitions 3 --replication-factor 3

7.2 查看主题列表

shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop1049092 --list

7.3查看主题的详细明细信息

shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092 --topic topicName --describe
  • PartitionCount:副本总数
  • Partition:从0开始
  • Leader:哪个Broker作为主方
  • Replicas:副本数
架构能力Kafka系列(一)Kafka简介插图3

例如上次TopicA有三个分区,三个副本都分别存在在不同的Broker。

通过上图可以看出:TopicA的“leader”:0,“isr”:[0,2],即broker的TopicA-Partition0作为主方,broker2的TopicA-Partition0作为备用方

7.4修改分区(分区只能增加不能减少)

shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092 --topic topicName --alter --partitions 4

即:把副本书改成4

7.5修改副本数

八、kafka的生产者

8.1、命令模式生产消息

shell>cd /data/app/kafka
shell>bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topicName
>>hello
###即生产了一个“hello”消息

8.2、Maven项目调用Kafka生产消息

新建一个maven项目

8.2.1 引入依赖

<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
  </dependency>
</dependencies>

8.2.2 普通异步发送

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomeProducer {

    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");

        //指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //1 创建kafka的消息生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //2 发送数据
        for(int i=0 ;i <5; i++){
            kafkaProducer.send(new ProducerRecord<>("first","fox"+i));
        }
        //3 关闭资源
        kafkaProducer.close();
    }
}
架构能力Kafka系列(一)Kafka简介插图4

8.2.3 带回调的异步发送

package kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomeProducerCallback {

    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");

        //指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //1 创建kafka的消息生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //2 发送数据
        for(int i=0 ;i <5; i++){
            kafkaProducer.send(new ProducerRecord<>("first", "fox" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    
                    if(e==null){
                        System.out.println("topic:"+recordMetadata.topic()+" 分区:"+recordMetadata.partition());
                    }
                }
            });
        }
        //3 关闭资源
        kafkaProducer.close();
    }
}
架构能力Kafka系列(一)Kafka简介插图5

8.2.4 同步发送

在普通异步发送的基础上,增加.get()

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomeProducerSync {

    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");

        //指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //1 创建kafka的消息生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //2 发送数据 同步发送的话,只要后面追加.get()
        for(int i=0 ;i <5; i++){
            try {
                kafkaProducer.send(new ProducerRecord<>("first","fox"+i)).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        //3 关闭资源
        kafkaProducer.close();
    }
}
架构能力Kafka系列(一)Kafka简介插图6

九、Kafka的分区策略

9.1 常用分区策略

架构能力Kafka系列(一)Kafka简介插图7
package kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomeProducerPartCallback {

    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");

        //指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //1 创建kafka的消息生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //2 发送数据 可以指定分区,如果没有指定,会根据Key的Hashcode取余
        for(int i=0 ;i <5; i++){

            kafkaProducer.send(new ProducerRecord<>("first", "stock" ,"fox" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                    if(e==null){
                        System.out.println("topic:"+recordMetadata.topic()+" 分区:"+recordMetadata.partition());
                    }
                }
            });
        }
        //3 关闭资源
        kafkaProducer.close();
    }
}
架构能力Kafka系列(一)Kafka简介插图8

在生产环境中,为了让某个类型的数据都放在同一个分区中,一般都会指定一个Key

9.2 自定义分区策略

9.2.1创建自定义分区规则

创建方法类Mypartitioner

package kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class Mypartitioner implements Partitioner {

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        String msgValue = o1.toString();

        int partition;
        if (true == msgValue.contains("fox")) {
            partition = 0;
        } else {
            partition = 1;
        }
        return partition;
    }

    @Override
    public void configure(Map<String, ?> map) {
    }

    @Override
    public void close() {

    }
}
架构能力Kafka系列(一)Kafka简介插图9

9.2.2 关联自定义分区器

package kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomeProducerPartCallbacks {

    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");

        //指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //===关联自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,Mypartitioner.class.getName());

        //1 创建kafka的消息生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);


        //2 发送数据 可以指定分区,如果没有指定,会根据Key的Hashcode取余
        for(int i=0 ;i <5; i++){

            kafkaProducer.send(new ProducerRecord<>("first", "stock" ,"fox" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                    if(e==null){
                        System.out.println("topic:"+recordMetadata.topic()+" 分区:"+recordMetadata.partition());
                    }
                }
            });
        }
        //3 关闭资源
        kafkaProducer.close();
    }
}
架构能力Kafka系列(一)Kafka简介插图10

十、如何提高生产者的吞吐量

架构能力Kafka系列(一)Kafka简介插图11

提高生产者吞吐量的方法:

  • batch.size:默认是16k
  • linger.ms:延时时间5~10ms,但修改后因为延迟影响数据的及时性
  • comperssion.type:压缩方式snappy(同一数据合并保留最新)
  • RecordAccumulator:缓冲区大小,默认值64m
package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomeProducer {

    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");

        //指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,3355432);
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
        //1 创建kafka的消息生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //2 发送数据
        for(int i=0 ;i <5; i++){
            kafkaProducer.send(new ProducerRecord<>("first","fox"+i));
        }
        //3 关闭资源
        kafkaProducer.close();
    }
}
架构能力Kafka系列(一)Kafka简介插图12

十一、数据可靠性问题

保障数据完全可靠的条件:ACK级别设置成-1+分区副本数≥2+ISR应答的最小副本数≥2

架构能力Kafka系列(一)Kafka简介插图13

11.1Kafka的应答级别

Kafka在生产消息成功后会进行应答。称为ACK应答,它分为以下三中情况

  • acks=0,生产者发送过来数据后就不管了,可靠性差,效率高
  • acks=1,生产者发送过来数据后Leader应答,可靠性中等,效率中等
  • acks=-1,生产者发送过来数据,Leader和ISR队列里所有Follower应答,可靠性高,效率低

使用场景:

  • acks=0,主要使用测试
  • acks=1,一般用于传输普通日志,允许可别数据丢失
  • acks=-1,对可靠性要求比较高的场景

特别注意:acks=-1(all)会出现数据重复的问题

架构能力Kafka系列(一)Kafka简介插图14

如果所示:有三个分区的情况,第一个Leader和其中一个Follower正常传输消息,但Leader异常;此时其中一个Follower会转成Leader重新传输,就会出现重复的消息。

11.2 如何修改acks应答级别

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomeProducer {

    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");

        //指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG,"1");
        //重复次数
        properties.put(ProducerConfig.RETRIES_CONFIG,3);
        
        //1 创建kafka的消息生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
        
        //2 发送数据
        for(int i=0 ;i <5; i++){
            kafkaProducer.send(new ProducerRecord<>("first","fox"+i));
        }
        //3 关闭资源
        kafkaProducer.close();
    }
}
架构能力Kafka系列(一)Kafka简介插图15

11.3、如何保障数据重复性问题

生产者:

  • 幂等性
  • acks=all
  • 事务处理

幂等性原理

架构能力Kafka系列(一)Kafka简介插图16

使用事务:

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomeProducer {

    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");

        //指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //acks
        properties.put(ProducerConfig.ACKS_CONFIG,"1");
        //重复次数
        properties.put(ProducerConfig.RETRIES_CONFIG,3);

        //1 创建kafka的消息生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //开启事务处理
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();

        //2 发送数据
        try {
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "fox" + i));
            }
            kafkaProducer.commitTransaction();
        }catch(Exception e){
            kafkaProducer.abortTransaction();
        }finally {
            kafkaProducer.close();
        }
        //3 关闭资源
        kafkaProducer.close();
    }
}
架构能力Kafka系列(一)Kafka简介插图17

消费者:

  • 事务处理

十二、消费者

默认的情况:消费者启动后,生产者之前生产的数据不消费;即只消费启动后的增量数据

shell>cd /data/app/kafka
shell>bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topicName

如何让消费者也能消费历史的数据呢?

shell>cd /data/app/kafka
shell>bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topicName --from-beginning