菜单
一、Kafka的简介
Kafka:是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
发布/订阅:消息发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的主题(topic),订阅着只接收感兴趣(topic)的消息
常用例子:数据埋点:记得购买者的行为数据(浏览、点赞、收藏、评论等)
二、消息队列的作用:
- 缓存/削峰
- 解耦(例如下单、扣库存、扣款)
- 异步通讯
三、消息队列的两种模式
- 点对点模式:消费者主动拉取数据,消息收到后清除消息
- 发布/订阅模式:可以有多个topic主题;消费者消费数据后,不删除数据;每个消费者相互独立,都可以消费数据
四、Kafka的集群模式
为了提高吞吐量,Kafka都会启用集群模式,并且每个主题多个分区多个副本。
多个分区:为了提高吞吐量;多个副本:为了保障数据安全
主要在2.8后,Zookeeper将会被舍弃。使用Kraft
五、Kafka的安装(Zookeeper模式)
5.1环境准备
准备3台服务器,并且修改hostmane
5.2三台服务器安装Zookeeper
5.3下载Kafka(所用服务器)
下载Kafka的压缩包
5.3.1解压文件
shell>unzip kafka.zip /data/app/kafka
5.3.2 修改server.properties文件
shell>vim /data/app/kafka/config/server.properties
####修改broker.id(此ID不能重复),例如第一台服务器0,第二台设置成1....
borker.id=0
#####修改kafka日志保存位置
log.dirs=/data/app/kafka/logs
#####修改zookeeper.connect。例如三台服务器的hostname:hadoop102/hadoop103/hadoop104
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
5.3.3设置环境变量
shell>vim /etc/profile.d/my_env.sh
###KAFKA_HOME
export KAFKA_HOME=/data/app/kafka
export PATH=$KAFKA_HOME/bin
shell>source /etc/profile
6、启动kafka
shell>cd /data/app/kafka
shell>bin/kafka-server-start.sh -deamon config/server.properties
启动完成
六、创建批量开启和关停脚本
shell>cd /data/app/kafka
shell>vim kfk.sh
####content
#!/bin/sh
case $1 in
"start")
for i in hadoop102 hadoop102 hadoop104
do
echo '----启动 $i kafka'
ssh $i "/data/app/kafka/bin/kafka-server-start.sh -daemon /data/app/kafka/config/server.properties"
done
;;
"stop")
for i in hadoop102 hadoop103 hadoop104
do
echo "---停止 $i kafka ---"
ssh $i "/data/app/kafka/bin/kafka-server-stop.sh -daemon /data/app/kafka/config/server-properties"
done
;;
esac
####脚本结束
shell>chome 777 kfk.sh
6.1 批量启动命令
shell>kfk.sh start
6.2 批量关停命令
shell>kfk.sh stop
七、Kafka的主题Topic
7.1 创建主题Topic
- –topic:主题名
- –partitions:分区(提升性能,提升效率)
- –relication:副本数(数据安全,备份防丢失)
shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092,hadop103:9092,hadoop104:9092 --topic topicName --create --partitions 3 --replication-factor 3
7.2 查看主题列表
shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop1049092 --list
7.3查看主题的详细明细信息
shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092 --topic topicName --describe
- PartitionCount:副本总数
- Partition:从0开始
- Leader:哪个Broker作为主方
- Replicas:副本数
例如上次TopicA有三个分区,三个副本都分别存在在不同的Broker。
通过上图可以看出:TopicA的“leader”:0,“isr”:[0,2],即broker的TopicA-Partition0作为主方,broker2的TopicA-Partition0作为备用方
7.4修改分区(分区只能增加不能减少)
shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092 --topic topicName --alter --partitions 4
即:把副本书改成4
7.5修改副本数
八、kafka的生产者
8.1、命令模式生产消息
shell>cd /data/app/kafka
shell>bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topicName
>>hello
###即生产了一个“hello”消息
8.2、Maven项目调用Kafka生产消息
新建一个maven项目
8.2.1 引入依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
8.2.2 普通异步发送
package kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomeProducer {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//1 创建kafka的消息生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
//2 发送数据
for(int i=0 ;i <5; i++){
kafkaProducer.send(new ProducerRecord<>("first","fox"+i));
}
//3 关闭资源
kafkaProducer.close();
}
}
8.2.3 带回调的异步发送
package kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomeProducerCallback {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//1 创建kafka的消息生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
//2 发送数据
for(int i=0 ;i <5; i++){
kafkaProducer.send(new ProducerRecord<>("first", "fox" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println("topic:"+recordMetadata.topic()+" 分区:"+recordMetadata.partition());
}
}
});
}
//3 关闭资源
kafkaProducer.close();
}
}
8.2.4 同步发送
在普通异步发送的基础上,增加.get()
package kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomeProducerSync {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//1 创建kafka的消息生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
//2 发送数据 同步发送的话,只要后面追加.get()
for(int i=0 ;i <5; i++){
try {
kafkaProducer.send(new ProducerRecord<>("first","fox"+i)).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
//3 关闭资源
kafkaProducer.close();
}
}
九、Kafka的分区策略
9.1 常用分区策略
package kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomeProducerPartCallback {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//1 创建kafka的消息生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
//2 发送数据 可以指定分区,如果没有指定,会根据Key的Hashcode取余
for(int i=0 ;i <5; i++){
kafkaProducer.send(new ProducerRecord<>("first", "stock" ,"fox" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println("topic:"+recordMetadata.topic()+" 分区:"+recordMetadata.partition());
}
}
});
}
//3 关闭资源
kafkaProducer.close();
}
}
在生产环境中,为了让某个类型的数据都放在同一个分区中,一般都会指定一个Key
9.2 自定义分区策略
9.2.1创建自定义分区规则
创建方法类Mypartitioner
package kafka;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class Mypartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
String msgValue = o1.toString();
int partition;
if (true == msgValue.contains("fox")) {
partition = 0;
} else {
partition = 1;
}
return partition;
}
@Override
public void configure(Map<String, ?> map) {
}
@Override
public void close() {
}
}
9.2.2 关联自定义分区器
package kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomeProducerPartCallbacks {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//===关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,Mypartitioner.class.getName());
//1 创建kafka的消息生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
//2 发送数据 可以指定分区,如果没有指定,会根据Key的Hashcode取余
for(int i=0 ;i <5; i++){
kafkaProducer.send(new ProducerRecord<>("first", "stock" ,"fox" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println("topic:"+recordMetadata.topic()+" 分区:"+recordMetadata.partition());
}
}
});
}
//3 关闭资源
kafkaProducer.close();
}
}
十、如何提高生产者的吞吐量
提高生产者吞吐量的方法:
- batch.size:默认是16k
- linger.ms:延时时间5~10ms,但修改后因为延迟影响数据的及时性
- comperssion.type:压缩方式snappy(同一数据合并保留最新)
- RecordAccumulator:缓冲区大小,默认值64m
package kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomeProducer {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,3355432);
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
//压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
//1 创建kafka的消息生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
//2 发送数据
for(int i=0 ;i <5; i++){
kafkaProducer.send(new ProducerRecord<>("first","fox"+i));
}
//3 关闭资源
kafkaProducer.close();
}
}
十一、数据可靠性问题
保障数据完全可靠的条件:ACK级别设置成-1+分区副本数≥2+ISR应答的最小副本数≥2
11.1Kafka的应答级别
Kafka在生产消息成功后会进行应答。称为ACK应答,它分为以下三中情况
- acks=0,生产者发送过来数据后就不管了,可靠性差,效率高
- acks=1,生产者发送过来数据后Leader应答,可靠性中等,效率中等
- acks=-1,生产者发送过来数据,Leader和ISR队列里所有Follower应答,可靠性高,效率低
使用场景:
- acks=0,主要使用测试
- acks=1,一般用于传输普通日志,允许可别数据丢失
- acks=-1,对可靠性要求比较高的场景
特别注意:acks=-1(all)会出现数据重复的问题
如果所示:有三个分区的情况,第一个Leader和其中一个Follower正常传输消息,但Leader异常;此时其中一个Follower会转成Leader重新传输,就会出现重复的消息。
11.2 如何修改acks应答级别
package kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomeProducer {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//acks
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//重复次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);
//1 创建kafka的消息生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
//2 发送数据
for(int i=0 ;i <5; i++){
kafkaProducer.send(new ProducerRecord<>("first","fox"+i));
}
//3 关闭资源
kafkaProducer.close();
}
}
11.3、如何保障数据重复性问题
生产者:
- 幂等性
- acks=all
- 事务处理
幂等性原理
使用事务:
package kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomeProducer {
public static void main(String[] args) {
//0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//acks
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//重复次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);
//1 创建kafka的消息生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
//开启事务处理
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
//2 发送数据
try {
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "fox" + i));
}
kafkaProducer.commitTransaction();
}catch(Exception e){
kafkaProducer.abortTransaction();
}finally {
kafkaProducer.close();
}
//3 关闭资源
kafkaProducer.close();
}
}
消费者:
- 事务处理
十二、消费者
默认的情况:消费者启动后,生产者之前生产的数据不消费;即只消费启动后的增量数据
shell>cd /data/app/kafka
shell>bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topicName
如何让消费者也能消费历史的数据呢?
shell>cd /data/app/kafka
shell>bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topicName --from-beginning