前言
此篇文章转载自 Kafka学习(六)Kafka集群的迁移与扩容) ,目前很符合我的预期了,如有需要,可以直接到原站点查看。
Kafka的集群扩容实际上就是把Topic的Partition移动到新加的集群节点上。我们只需要copy一份Kafka的安装目录到新的节点机器上,修改一下相关配置文件(broker.id,logs.dir等等),但是新添加的Kafka节点机器是不会自动分配数据的,所以需要我们手动操作。
Kafka的扩容可以细分为以下三种:
- 增加节点
- 增加Topic副本
- 增加分区
具体的步骤有两种方式:
- 通过
–topics-to-move-json-file和–broker-list批量生成新的Topic分区信息,然后根据该信息执行转移操作。 - 手动写要移动的topic信息,更灵活,但是在大量Topic和Partition的情况下非常繁琐并且容易出错。
1 | kafka-reassign-partitions.sh命令的三种模式 |
Kafka动态增加节点(Node)
新添加的Kafka节点机器是不会自动分配数据的,所以无法分担集群的负载,除非我们新建一个Topic,此时新的Topic会使用新添加的Kafka节点机器。如果我们想新添加的Kafka节点机器能够分担集群存储,需要手动将部分分区移动到新添加的Kafka节点机器上。
我们原来的Kafka节点分别是0,1,2,现在要加入3,4两个新节点
topicMove.json
1 | { |
生成迁移的计划
1 | $ kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topicMove.json --broker-list "0,1,2,3,4" --generate |
上面是原来的所有partition在各个节点的分布情况,下面是加入4,5两个新节点之后所有partition在各个节点的分布情况
将生成的执行计划保存为add_node.json文件
重新分配partition(其实这里是添加新的节点)
1 | $ kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/Downloads/add_node.json --execute |
查看执行的状态
1 | $ kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/Downloads/add_node.json --verify |
假设出现类似这样的错误,他并不是真的出错,而是指目前仍在复制数据中。再过一段时间再运行verify命令,他就会消失(加入完成拷贝)
当然也可以不使用generate先生成执行计划,而是自己直接手动编辑生成add_node.json文件内容,然后直接execute执行,但是这样容易出错,Kafka节点比较少的时候推荐使用。
注意:其实通过generate的结果我们也可以看出,其实不管是扩容,减容,迁移,其实都是重新分配Topic或者Partition的过程。json文件的内容都是指定Topic下的Partition要移动到哪个Node上。
Kafka动态增加Topic副本(Replication)
查看当前node_log的Topic信息
1 | $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic node_log |
add_replication.json
1 | { |
重新分配partition(其实这里是添加副本replication)
1 | $ kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/Downloads/add_replication.json --execute |
查看执行的状态
1 | $ kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/Downloads/add_replication.json --verify |
如果遇到下面的错误很有可能是json文件格式有错误,仔细检查修正重新运行即可
1 | Partitions reassignment failed due to Partition reassignment data file add_replication.json is empty |
查看添加replication后的node_log的Topic信息
1 | $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic node_log |
Kafka动态增加分区(Parition)
查看当前node_log的Topic信息
1 | $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic node_log |
add_partition.json
1 | { |
重新分配partition
1 | $ kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/Downloads/add_partition.json --execute |
查看执行的状态
1 | $ kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/Downloads/add_partition.json --verify |
报错是因为我们只有一个partition0,没有partition1,partition2,所以跳过了重新分配partition的过程。
我们需要先增加partition的数量,我们把partition的数量变成6个
1 | $ kafka-topics.sh --zookeeper localhost:2181 --alter --topic node_log --partitions 6 |
查看添加partition后的node_log的Topic信息
1 | $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic node_log |
重新执行reassign,然后查看执行状态
1 | $ kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/Downloads/add_partition.json --verify |
重新分配执行完成之后,再次查看node_log的Topic信息
1 | $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic node_log |
执行reassign之后,我们发现0-3的partition的leader都是0,这是因为我们之前突然扩展partition到6个,而我们在reassign的之后只指定了0-2的partition的分配,这样是不合理的,所以我们使用kafka自带的重分区工具进行处理,即上文的 --generate:
1 |
|
这里使用了一个简单的脚本进行数据的迁移,只需要只topic名写到/tmp/topics.txt即可进行了。此脚本虽然没有做其他的异常判断,但也是可以进行topic的批量迁移的。
assigned replicas和preferred replica
但是这样我们发现还是有点小问题,partition2的leader仍然是0,这是为什么呢?。这里先普及一下assigned replicas和preferred replica。
每个partitiion的所有replicas叫做”assigned replicas”,”assigned replicas”中的第一个replicas叫”preferred replica”,刚创建的topic一般”preferred replica”是leader。leader replica负责所有的读写。但随着时间推移,broker可能会停机,会导致leader迁移,导致机群的负载不均衡。
我们这里preferred replica已经是2了,但是leader却不是2,这样我们需要重新选举一下leader,需要使用kafka-preferred-replica-election.sh来调整。
两种操作方式:
- 对所有Topics进行操作
1 | $ kafka-preferred-replica-election.sh --zookeeper localhost:2181 |
- 对某个Topic进行操作(json文件中指定要操作的Topic)
1 | $ kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file json_file |
注意:这里我们只需要调整node_log的Topic的partition2的leader。
leaderTopic.json
1 | { |
重新选举leader
1 | $ kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file leaderTopic.json |
再次查看Topic情况,发现leader也是我们所期望的均匀分配了
1 | $ kafka-topics.sh --describe --zookeeper localhost:2181 --topic node_log |
修改topic配置
kafka有一个全局配置定义了全部topic的属性,我们也可以对某些特定的topic进行一些特殊的配置。
单独修改一个topic的配置:
1 | # 查看配置 |
此外,使用命令操作kafka也有可能会不太友好,可以使用Kafka Manager工具来实现界面化管理,有兴趣的朋友可以试试。