一、如何新增broker
1.1、初始化服务器
修改hostname、安装JDK、安装ZooKeeper、
1.2安装Kafka
如果Kafka是通过之前的服务器复制过去的,注意删除/kafka/datas /kafka/logs
修改/data/app/kafka/server.properties的borker.id=*(注意不能重复)
注意:新增的节点不会接管之前的数据,如果想接管之前的数据,需要使用负载均衡来处理。
1.3使用负载均衡设置新增节点接管之前的消息
1.3.1 创建一个需要负载均衡的主题
shell>vim topic-to-move.json
###content
{
"topics":[
{"topic":"first"},
{"topic":"second"}
]
}
1.3.2 创建一个负载均衡计划
shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topic-to-move.json --lroker-list "0,1,2,3" --generate
1.3.3创建副本存储计划
把1.3.2返回的信息,创建一个json文件保存
shell>vim increase-replication-factor.json
###内容即1.3.2返回的信息
1.3.4 执行副本存储计划
shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092,hadoop105:9092 --reassignment-json-file increase-replication-factor.json --verify
1.3.5验证查看主题详情验证
shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092 --topic first --describe
二、如何退役旧的broker
2.1、查看节点上有哪些主题
例如我们现在查看hostmane为“hadoop103”的主题
shell>cd /data/app/kafka
shell>cd bin/kafka-topic.sh --bootstrap-server hadoop103:9092 --list
2.2使用负载均衡的方式去除该节点的主题
2.2.1创建负载均衡主题
- hadoop102:broker.id=0
- hadoop103:broker.id=1
- hadoop:104:broker.id:2
例如在hadoop103主机上有两个主题“first、second”
shell>cd /data/app/kafka
shell>vim topic-to-move.json
####content
{
"topics":[
{"topic":"first"},{"topic":"second"}
]
}
2.2.2创建负载均衡计划
shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop103:9092 --topics-to-move-json-file topics-to-move.json --broker-list="0,2" --generate
2.2.3创建副本存储计划
shell>cd /data/app/kafka
shell>vim increase-relication-factor.json
####内容即2.2.2返回的内容,可参考1.3.2
2.2.4执行副本存储计划
shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop103:9092 --reassignment-json-file increase-replication-factor.json --execute
2.2.5验证副本存储计划
shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop103:9092 --reassignment-json-file increase-replication-factor.json --verify
2.3、关闭Kafka/Zookeeper
先停止Kafka再停止Zookeeper
shell>cd /data/app/kafka
shell>bin/kafka-server-stop.sh
三、如何修改主题Topic的副本数
先前我们讲到如何通过命令的方式提高分区,刚第二点讲到如何退役分区
3.1创建副本存储计划
shell>cd /data/app/kafka
shell>vim increase-replication-factor.json
####content
{
"version":1,
"partitions":[
{"topic":"first","partition":0,"replications":[0,1,2]},
{"topic":"first","partition":1,"replications":[0,1,2]},
{"topic":"first","partition":2,"replications":[0,1,2]},
]
}
3.2执行副本存储计划
shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --reassignment-json-file increase-replication-factor.json --execute
3.3、查看主题Topic的详情验证
shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic first --describe
四、Kafka的文件清理策略
Kafka的文件清理策略默认7天,负责设置检查周期,默认5分钟
对待过期消息有两种处理方法
- 删除:delete
- 压缩:compact