zookeeper & kafka部署和使用

zookeeper

简介

ZooKeeper是一个开源的分布式应用程序协调服务,是Google的Chubby一个开源的实现。ZooKeeper为分布式应用提供一致性服务,提供的功能包括:分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(Group Maintenance)、分布式锁(Distributed Lock)等,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。

ZooKeeper本身可以以单机模式安装运行,不过它的长处在于通过分布式ZooKeeper集群(一个Leader,多个Follower),基于一定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。

ZooKeeper主要有领导者(Leader)、跟随者(Follower)和观察者(Observer)三种角色。

角色 说明
领导者(Leader) 为客户端提供读和写的服务,负责投票的发起和决议,更新系统状态。
跟随者(Follower) 为客户端提供读服务,如果是写服务则转发给Leader。在选举过程中参与投票。
观察者(Observer) 为客户端提供读服务器,如果是写服务则转发给Leader。不参与选举过程中的投票,也不参与“过半写成功”策略。在不影响写性能的情况下提升集群的读性能。此角色于zookeeper3.3系列新增的角色。

部署

根据官方说明,如果使用java 1.8的版本的话,需要使用u211以上的版本,但实测u161也是可以使用的。

1
2
3
4
5
6
7
8
9
10
yum install java-1.8.0-openjdk -y

# https://zookeeper.apache.org/releases.html
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz

tar zxf apache-zookeeper-3.6.2-bin.tar.gz
cp -r apache-zookeeper-3.6.2-bin /usr/local/zookeeper-3.6.2

cd /usr/local/zookeeper-3.6.2/conf
cp zoo_sample.cfg zoo.cfg

zoo.cfg的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-3.6.2/data
clientPort=2181

maxClientCnxns=512
autopurge.snapRetainCount=30
autopurge.purgeInterval=2

server.13=192.168.0.13:2888:3888
server.14=192.168.0.14:2888:3888
server.15=192.168.0.15:2888:3888
  • tickTime:心跳基本时间单位,毫秒级,ZK基本上所有的时间都是这个时间的整数倍。

  • initLimit:tickTime的个数,表示在leader选举结束后,followers与leader同步需要的时间,如果followers比较多或者说leader的数据灰常多时,同步时间相应可能会增加,那么这个值也需要相应增加。当然,这个值也是follower和observer在开始同步leader的数据时的最大等待时间(setSoTimeout)

  • syncLimit:tickTime的个数,这时间容易和上面的时间混淆,它也表示follower和observer与leader交互时的最大等待时间,只不过是在与leader同步完毕之后,进入正常请求转发或ping等消息交互时的超时时间
  • dataDir:内存数据库快照存放地址,如果没有指定事务日志存放地址(dataLogDir),默认也是存放在这个路径下,建议两个地址分开存放到不同的设备上。
  • clientPort:配置ZK监听客户端连接的端口
  • dataLogDir:将事务日志存储在该路径下,比较重要,这个日志存储的设备效率会影响ZK的写吞吐量。
  • server.serverid=host:tickpot:electionport
    • server:固定写法
    • serverid:每个服务器的指定ID(必须处于1-255之间,必须每一台机器不能重复)
    • host:主机名,直接用IP更容易识别
    • tickpot:心跳通信端口
    • electionport:选举端口
    • 这里的serverid要写入到/usr/local/zookeeper-3.6.2/data/myid下,注意每台机器都不一样。

由于/usr/local/zookeeper-3.6.2/bin目录下面是zookeeper的运行命令所在位置,可以将他加入到系统变量中去。

1
2
3
[root@iloqg8n3yb9mje data]# cat /etc/profile.d/zookeeper.sh 
export ZOOKEEPER_HOME=/usr/local/zookeeper-3.6.2/
export PATH=$PATH:$ZOOKEEPER_HOME/bin

服务启动

启动服务,使用zkServer.sh start,但我们可以使用以下systemd来接管。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ systemctl cat zookeeper
# /etc/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=forking
User=root
Group=root
ExecStart=/usr/local/zookeeper-3.6.2/bin/zkServer.sh start /usr/local/zookeeper-3.6.2/conf/zoo.cfg
ExecStop=/usr/local/zookeeper-3.6.2/bin/zkServer.sh stop /usr/local/zookeeper-3.6.2/conf/zoo.cfg

[Install]
WantedBy=multi-user.target

集群状态查看

使用zkServer.sh status查看集群状态;需要等三台全部启动之后才会显示MODE的状态,如果只启动一台的话,就会显示Error contacting service. It is probably not running.

1
2
3
4
5
6
7
8
9
[root@iabnt3f4lvt0va data]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.6.2/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@iabnt3f4lvt0va data]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.6.2/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader # 备显示为follower

如果三台都启动了,但没有显示mode,可以查看/usr/local/zookeeper-3.6.2/logs下面的日志进行排查。

zookeeper的操作,运行/usr/local/zookeeper-3.6.2/bin/zkCli.sh -server 192.168.0.14

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 1] create /cpu intel
Created /cpu
[zk: localhost:2181(CONNECTED) 2] ls /
[cpu, zookeeper]
[zk: localhost:2181(CONNECTED) 3] get /cpu
intel
[zk: localhost:2181(CONNECTED) 10] get -s /cpu
intel
cZxid = 0x200000007
ctime = Mon Jan 04 10:47:26 CST 2021
mZxid = 0x200000007
mtime = Mon Jan 04 10:47:26 CST 2021
pZxid = 0x200000007
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
[zk: localhost:2181(CONNECTED) 4] set /cpu AMD
[zk: localhost:2181(CONNECTED) 5] get /cpu
AMD
[zk: localhost:2181(CONNECTED) 6] delete
delete deleteall
[zk: localhost:2181(CONNECTED) 6] delete /cpu
[zk: localhost:2181(CONNECTED) 7] ls /
[zookeeper]

zxid:一个事务编号,zookeeper集群内部的所有事务,都有一个全局的唯一的顺序的编号,由两部分组成: 就是一个 64位的长整型 long:

  • 高32位: 用来标识leader关系是否改变,如 0x2
  • 低32位: 用来做当前这个leader领导期间的全局的递增的事务编号,如 00000007
  • get -s可以使用查看zxid的详细数据。
状态属性 说明
cZxid 数据节点创建时的事务ID
ctime 数据节点创建时的时间
mZxid 数据节点最后一次更新时的事务ID
mtime 数据节点最后一次更新时的时间
pZxid 数据节点的子节点列表最后一次被修改(是子节点列表变更,而不是子节点内容变更)时的事务ID
cversion 子节点的版本号
dataVersion 数据节点的版本号
aclVersion 数据节点的ACL版本号
ephemeralOwner 如果节点是临时节点,则表示创建该节点的会话的SessionID;如果节点是持久节点,则该属性值为0
dataLength 数据内容的长度
numChildren 数据节点当前的子节点个数

https://zookeeper.apache.org/doc/current/zookeeperStarted.html

Zookeeper学习之路 (二)集群搭建

ZooKeeper集群部署指南

kafka

Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据等)

Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。

在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

概念/对象 简单说明
Broker Kafka节点
Topic 主题,用来承载消息,可以理解为数据库里面的表
Partition 分区,用于主题分片存储,可以理解为多机器存储,比如说设置分区为3,kafka会根据一定的算法动态存储到不同的broker上
Producer 生产者,向主题发布消息的应用
Consumer 消费者,从主题订阅消息的应用
Consumer Group 消费者组,由多个消费者组成

安装

下载

1
2
3
4
5
6
cd /usr/local/src
# http://kafka.apache.org/downloads.html
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.0/kafka_2.12-2.7.0.tgz
tar zxf kafka_2.12-2.7.0.tgz
cp -r kafka_2.12-2.7.0 /usr/local/kakfa
cd /usr/local/kafka/config

修改配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
$ cat server.properties |grep -v '^#' |grep -v '^$'
# Broker唯一标识,和zookeeper的myid性质一样
broker.id=13
# 监听信息,PLAINTEXT表示明文传输,这个最好是直接配置IP进行通信
listeners=PLAINTEXT://192.168.0.13:9092
# borker进行网络处理的线程数
num.network.threads=3
# borker进行I/O处理的线程数
num.io.threads=8
# 发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=102400
# kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400
# 这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
socket.request.max.bytes=104857600
# kafka数据存放地址,可以填写多个。用”,”间隔
log.dirs=/usr/local/kafka/logs
# 默认分区数
num.partitions=1
# 每个数据目录用来日志恢复的线程数目
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# 默认消息的最大持久化时间,168小时,7天
log.retention.hours=168
# kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件,默认为1G
log.segment.bytes=1073741824
# 每隔300000毫秒去检查上面配置的log失效时间
log.retention.check.interval.ms=300000
# ZooKeeper服务器地址,多台用”,”间隔
zookeeper.connect=192.168.0.13:2181,192.168.0.14:2181,192.168.0.15:2181
# 设置zookeeper的连接超时时间
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

启动服务

命令运行之前,最好是把软件包配置到其他机器上再来运行。

运行命令kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties就可以运行命令了。第一次运行时,可能会报错,可以将daemon先去掉,这样就可以输出报错日志了。

另外可以使用systemctl来管理服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 修改环境变量
$ cat /etc/profile.d/kafka.sh
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin


$ systemctl cat kafka
# /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target kafka-zookeeper.service

[Service]
Type=simple
User=root
Group=root
#Environment=JAVA_HOME=/usr/java/jdk1.8.0_102
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

相关操作

创建Topic

随便找一台机器创建测试Tpoic:test,这里我们指定了3个副本、1个分区:

1
2
3
4
5
6
7
8
9
[root@4n1eq6wnfvdwvj logs]# kafka-topics.sh --create --bootstrap-server 192.168.0.14:9092 --replication-factor 3 --partitions 1 --topic test
Created topic test.
# 三台机器上都有这个tpoic
[root@4n1eq6wnfvdwvj logs]# kafka-topics.sh --list --bootstrap-server 192.168.0.14:9092
test
[root@4n1eq6wnfvdwvj logs]# kafka-topics.sh --list --bootstrap-server 192.168.0.15:9092
test
[root@4n1eq6wnfvdwvj logs]# kafka-topics.sh --list --bootstrap-server 192.168.0.13:9092
test

生产数据

运行命令之后,输入内容就是一条消息:

1
2
3
4
[root@4n1eq6wnfvdwvj logs]# kafka-console-producer.sh --broker-list 192.168.0.13:9092 --topic test
>test by wumingx
>date:20210114
>

消费数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 从头开始消费
[root@iloqg8n3yb9mje ~]# kafka-console-consumer.sh --bootstrap-server 192.168.0.14:9092 --topic test --from-beginning
test by wumingx
date:20210114
^CProcessed a total of 2 messages
[root@iloqg8n3yb9mje ~]# kafka-console-consumer.sh --bootstrap-server 192.168.0.15:9092 --topic test --from-beginning
test by wumingx
date:20210114
^CProcessed a total of 2 messages
[root@iloqg8n3yb9mje ~]#
[root@iloqg8n3yb9mje ~]#
[root@iloqg8n3yb9mje ~]# kafka-console-consumer.sh --bootstrap-server 192.168.0.13:9092 --topic test --from-beginning
test by wumingx
date:20210114
^CProcessed a total of 2 messages
[root@iloqg8n3yb9mje ~]#

# 可以对consumer进行分组,那么只允许组内的一个消费者进行消费,其他消费者则不行
[root@iloqg8n3yb9mje ~]# kafka-console-consumer.sh --bootstrap-server 192.168.0.13:9092 --topic test --from-beginning --group testgroup
test by wumingx
date:20210114
^CProcessed a total of 2 messages
[root@iloqg8n3yb9mje ~]#
[root@iloqg8n3yb9mje ~]# kafka-console-consumer.sh --bootstrap-server 192.168.0.14:9092 --topic test --from-beginning --group testgroup
^CProcessed a total of 0 messages

查看leader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 创建一个名为cpu的topic,分为4个分区,3副本
[root@iloqg8n3yb9mje logs]# kafka-topics.sh --create --bootstrap-server 192.168.0.13:9092 --replication-factor 3 --partitions 4 --topic cpu
# 查看topic列表,只有名字
[root@iloqg8n3yb9mje logs]# kafka-topics.sh --list --bootstrap-server 192.168.0.14:9092
__consumer_offsets
cpu
test
[root@iloqg8n3yb9mje logs]# kafka-topics.sh --list --zookeeper 192.168.0.14:2181
__consumer_offsets
cpu
test

#查看cpu这个topic的属性,可以看到分区0的leader是在14上,分区1是在15上。
[root@iloqg8n3yb9mje logs]# kafka-topics.sh --describe --zookeeper 192.168.0.14:2181 --topic cpu
Topic: cpu PartitionCount: 4 ReplicationFactor: 3 Configs:
Topic: cpu Partition: 0 Leader: 14 Replicas: 14,15,13 Isr: 14,15,13
Topic: cpu Partition: 1 Leader: 15 Replicas: 15,13,14 Isr: 15,13,14
Topic: cpu Partition: 2 Leader: 13 Replicas: 13,14,15 Isr: 13,14,15
Topic: cpu Partition: 3 Leader: 14 Replicas: 14,13,15 Isr: 14,13,15

Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。

kafka in zookeeper

查看id和topics

/brokers/topics/[topic] :存储某个topic的partitions所有分配信息

运行zkCli.sh

1
2
3
4
5
6
7
8
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids 
[13, 14, 15] # 表示三台机器
[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics
[__consumer_offsets, cpu, test] # 有cpu test这2个topics
[zk: localhost:2181(CONNECTED) 2] get /brokers/topics/cpu
{"version":2,"partitions":{"2":[13,14,15],"1":[15,13,14],"0":[14,15,13],"3":[14,13,15]},"adding_replicas":{},"removing_replicas":{}}
[zk: localhost:2181(CONNECTED) 7] get /brokers/ids/13
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.0.13:9092"],"jmx_port":-1,"features":{},"host":"192.168.0.13","timestamp":"1609747952920","port":9092,"version":5}

partition状态信息

/brokers/topics/[topic]/partitions/[0…N] 其中[0..N]表示partition索引号

/brokers/topics/[topic]/partitions/[partitionId]/state

1
2
3
4
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/cpu/partitions 
[0, 1, 2, 3]
[zk: localhost:2181(CONNECTED) 6] get /brokers/topics/cpu/partitions/0/state
{"controller_epoch":6,"leader":14,"version":1,"leader_epoch":0,"isr":[14,15,13]}

最后一条表示partitions 0这个分区的leader在14这台机器上面;controller_epoch表示kafka集群中的中央控制器选举次数;leader_epoch表示 该partition leader选举次数。

controller信息

  • controller_epoch:此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1;
  • controller:存储center controller中央控制器所在kafka broker的信息
1
2
3
4
[zk: localhost:2181(CONNECTED) 8] get /controller_epoch 
6
[zk: localhost:2181(CONNECTED) 9] get /controller
{"version":1,"brokerid":13,"timestamp":"1609747992848"}

Kafka集群部署指南

Kafka学习之路 (五)Kafka在zookeeper中的存储

为什么Kafka速度那么快

apache kafka技术分享系列(目录索引)

  • 本文作者: wumingx
  • 本文链接: https://www.wumingx.com/linux/zk-kafka.html
  • 本文主题: zookeeper & kafka部署和使用
  • 版权声明: 本站所有文章除特别声明外,转载请注明出处!如有侵权,请联系我删除。
0%