Kafka系列(二)Broker

一、如何新增broker

1.1、初始化服务器

修改hostname、安装JDK、安装ZooKeeper、

1.2安装Kafka

如果Kafka是通过之前的服务器复制过去的,注意删除/kafka/datas /kafka/logs

修改/data/app/kafka/server.properties的borker.id=*(注意不能重复)

注意:新增的节点不会接管之前的数据,如果想接管之前的数据,需要使用负载均衡来处理。

1.3使用负载均衡设置新增节点接管之前的消息

1.3.1 创建一个需要负载均衡的主题

shell>vim topic-to-move.json
###content
{
  "topics":[
    {"topic":"first"},
    {"topic":"second"}
  ]
}

1.3.2 创建一个负载均衡计划

shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topic-to-move.json --lroker-list "0,1,2,3" --generate
架构能力Kafka系列(二)Broker插图

1.3.3创建副本存储计划

把1.3.2返回的信息,创建一个json文件保存

shell>vim increase-replication-factor.json
###内容即1.3.2返回的信息

1.3.4 执行副本存储计划

shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092,hadoop105:9092 --reassignment-json-file increase-replication-factor.json --verify

1.3.5验证查看主题详情验证

shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092 --topic first --describe

二、如何退役旧的broker

2.1、查看节点上有哪些主题

例如我们现在查看hostmane为“hadoop103”的主题

shell>cd /data/app/kafka
shell>cd bin/kafka-topic.sh --bootstrap-server hadoop103:9092 --list

2.2使用负载均衡的方式去除该节点的主题

2.2.1创建负载均衡主题

  • hadoop102:broker.id=0
  • hadoop103:broker.id=1
  • hadoop:104:broker.id:2

例如在hadoop103主机上有两个主题“first、second”

shell>cd /data/app/kafka
shell>vim topic-to-move.json
####content
{
  "topics":[
    {"topic":"first"},{"topic":"second"}
  ]
}

2.2.2创建负载均衡计划

shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop103:9092 --topics-to-move-json-file topics-to-move.json --broker-list="0,2" --generate

2.2.3创建副本存储计划

shell>cd /data/app/kafka
shell>vim increase-relication-factor.json
####内容即2.2.2返回的内容,可参考1.3.2

2.2.4执行副本存储计划

shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop103:9092 --reassignment-json-file increase-replication-factor.json --execute

2.2.5验证副本存储计划

shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop103:9092 --reassignment-json-file increase-replication-factor.json --verify

2.3、关闭Kafka/Zookeeper

先停止Kafka再停止Zookeeper

shell>cd /data/app/kafka
shell>bin/kafka-server-stop.sh

三、如何修改主题Topic的副本数

先前我们讲到如何通过命令的方式提高分区,刚第二点讲到如何退役分区

3.1创建副本存储计划

shell>cd /data/app/kafka
shell>vim increase-replication-factor.json
####content
{
  "version":1,
  "partitions":[
    {"topic":"first","partition":0,"replications":[0,1,2]},
    {"topic":"first","partition":1,"replications":[0,1,2]},
    {"topic":"first","partition":2,"replications":[0,1,2]},
  ]
}

3.2执行副本存储计划

shell>cd /data/app/kafka
shell>bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --reassignment-json-file increase-replication-factor.json --execute

3.3、查看主题Topic的详情验证

shell>cd /data/app/kafka
shell>bin/kafka-topic.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic first --describe

四、Kafka的文件清理策略

Kafka的文件清理策略默认7天,负责设置检查周期,默认5分钟

对待过期消息有两种处理方法

  • 删除:delete
  • 压缩:compact