rocketMQ集群化部署以及常见运维命令

安装

打开 官方Quick Start 下载带bin版本的rocketmq,这样就不需要再次编译安装了,可以直接使用。如果下载的是带source的下载包的话,则必须使用 mvn -Prelease-all -DskipTests clean install -U 来打包。

1
wget https://mirrors.bfsu.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip

然后解压到指定目录即可。如果只要单机启动的话,就比较简单,运行以下命令即可:

1
2
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -c ./conf/broker.conf &

集群部署配置

模式介绍

一般部署方法有以下几种:

  • 单Master模式:这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
  • 多Master模式:一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
    • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
    • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
  • 多Master多Slave模式-异步复制:每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
    • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
    • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
  • 多Master多Slave模式-同步双写:每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
    • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
    • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
  • Dledger集群:RocketMQ-on-DLedger Group 是指一组「相同名称的 Broker」,至少需要 3 个节点,通过 「Raft」 自动选举出一个 Leader,其余节点作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。

同步双写

在conf目录下有以下目录,分别代表了不同的模式,2m-2s-async 表示多Master多Slave模式-异步复制,2m-2s-sync 表示多Master多Slave模式-同步复制,2m-noslave 表示多Master,broker.conf 表示单机配置,还有一种是dledger是自动选举模式,必须三台机器

1
2
3
4
5
6
7
[root@8fptpfxk957bjm rocketmq]# ll conf/
total 36
drwxr-xr-x 2 root root 118 Oct 23 2020 2m-2s-async
drwxr-xr-x 2 root root 118 Jun 8 17:45 2m-2s-sync
drwxr-xr-x 2 root root 91 Dec 4 2020 2m-noslave
-rw-r--r-- 1 root root 949 Oct 23 2020 broker.conf
drwxr-xr-x 2 root root 72 Dec 4 2020 dledger

我们以同步双写的方法来配置。

配置解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#mq-master01节点配置/data/rocketmq/conf/2m-2s-sync/broker-a.properties
[root@mq-master01 ~]# vim /data/rocketmq/conf/2m-2s-sync/broker-a.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样 例如:在a.properties 文件中写 broker-a 在b.properties 文件中写 broker-b
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,这里nameserver是单台,如果nameserver是多台集群的话,就用分号分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3)
namesrvAddr=192.168.10.207:9876;
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。由于是4个broker节点,所以设置为4
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/data/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/data/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/data/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/data/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER #要配置为MASTER或SLAVE的角色
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

#mq-master02节点配置的是/data/rocketmq/conf/2m-2s-sync/broker-b.properties #就下面三行配置不一样,其他配置行都一样!
[root@mq-master02 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-b.properties
......
brokerName=broker-b
brokerId=0
brokerRole=MASTER

#mq-slave01节点配置的是/data/rocketmq/conf/2m-2s-sync/broker-a-s.properties
[root@mq-slave01 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-a-s.properties
......
brokerName=broker-a #注意这一行的名称要和master保持一致
brokerId=1 #这个ID要跟master的不一致!
brokerRole=SLAVE #要配置为从

#mq-slave02节点配置的是/data/rocketmq/conf/2m-2s-sync/broker-b-s.properties
[root@mq-slave02 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-b-s.properties
......
brokerName=broker-b #注意这一行的名称要和master的保持一致
brokerId=1 #这个ID要跟master的不一致
brokerRole=SLAVE #要配置为从

详细配置

以下是我的环境,2台机器互为主备,这边使用配置文件进行区分,实际上2台机器上面的配置文件名是可以一样的。

配制文件 IP 角色
broker-a.properties 172.31.0.21 SYNC_MASTER
broker-a-s.properties 172.31.0.22 SLAVE
broker-b.properties 172.31.0.22 SYNC_MASTER
broker-b-s.properties 172.31.0.21 SLAVE

以下是 172.31.0.21 的配置文件,主备关系是通过brokerName+brokerId+brokerRole来确认的,实际上配置文件都是差不多的。同时注意目录路径是有区别的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
[root@8fptpfxk957bjm 2m-2s-sync]# cat broker-a.properties 
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH

autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
haListenPort=10912
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/rocketmq/data/broker-a
storePathCommitLog=/usr/local/rocketmq/data/broker-a/commitlog
maxMessageSize=65536
brokerIP1=172.31.0.21
namesrvAddr=172.31.0.21:9876;172.31.0.22:9876
[root@8fptpfxk957bjm 2m-2s-sync]#
[root@8fptpfxk957bjm 2m-2s-sync]# cat broker-b-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10923
haListenPort=10924
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/rocketmq/data/broker-b-s
storePathCommitLog=/usr/local/rocketmq/data/broker-b-s/commitlog
maxMessageSize=65536
brokerIP1=172.31.0.21
namesrvAddr=172.31.0.21:9876;172.31.0.22:9876

以下是 172.31.0.22 的配置文件,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
[root@27zrgari9hd937 2m-2s-sync]# cat broker-b.properties 
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH

autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
haListenPort=10912
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/rocketmq/data/broker-b
storePathCommitLog=/usr/local/rocketmq/data/broker-b/commitlog
maxMessageSize=65536
brokerIP1=172.31.0.22
namesrvAddr=172.31.0.21:9876;172.31.0.22:9876
[root@27zrgari9hd937 2m-2s-sync]#
[root@27zrgari9hd937 2m-2s-sync]#
[root@27zrgari9hd937 2m-2s-sync]# cat broker-a-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10923
haListenPort=10924
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/rocketmq/data/broker-a-s
storePathCommitLog=/usr/local/rocketmq/data/broker-a-s/commitlog
maxMessageSize=65536
brokerIP1=172.31.0.22
namesrvAddr=172.31.0.21:9876;172.31.0.22:9876

同时要注意配置storePathRootDir以及storePathCommitLog的路径。

java启动配置

官方默认的启动脚本所使用的内存比较大,由于是测试环境,需要修改一下:

  • bin/mqnamesrv:对应的脚本是 sh ${ROCKETMQ_HOME}/bin/runserver.sh ,按以下方式修改:

    1
    2
    3
    4
    5
    6
    # 修改内存大小
    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

    # 默认的日志是配置是在 $user.home/logs/rocketmqlogs/下,需要修改:
    JAVA_OPT="${JAVA_OPT} -Dons.client.logRoot=/usr/local/rocketmq"
    JAVA_OPT="${JAVA_OPT} -Duser.home=/usr/local/rocketmq"
  • bin/mqbroker:对应的脚本是sh ${ROCKETMQ_HOME}/bin/runbroker.sh:

    1
    2
    3
    JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn2g"
    JAVA_OPT="${JAVA_OPT} -Dons.client.logRoot=/usr/local/rocketmq"
    JAVA_OPT="${JAVA_OPT} -Duser.home=/usr/local/rocketmq"

启动脚本

nohup方式管理的方法如下:

  • nameserver
1
2
启动:cd /usr/local/rocketmq/bin && nohup ./mqnamesrv &
停止:cd /usr/local/rocketmq/bin && sh mqshutdown namesrv
  • broker
1
2
启动:cd /usr/local/rocketmq/ && nohup sh bin/mqbroker -c conf/2m-2s-sync/broker-a.properties &
停止:cd /usr/local/rocketmq/ && sh bin/mqshutdown broker

比较麻烦,还是通过systemctl来接管。

根据实际的情况修改以下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[root@8fptpfxk957bjm 2m-2s-sync]# systemctl cat rocketmq-namesrv.service 
# /usr/lib/systemd/system/rocketmq-namesrv.service
[Unit]
Description=rocketmq-namesrv
After=network.target

[Service]
Type=simple
ExecStart=/usr/local/rocketmq/bin/mqnamesrv
ExecStop=/usr/local/rocketmq/bin/mqshutdown namesrv

[Install]
WantedBy=multi-user.target

[root@8fptpfxk957bjm 2m-2s-sync]# systemctl cat rocketmq-broker-b-s
# /usr/lib/systemd/system/rocketmq-broker-b-s.service
[Unit]
Description=rocketmq-broker-b-s
After=network.target

[Service]
Type=simple

ExecStart=/usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties
ExecStop=/usr/local/rocketmq/bin/mqshutdown broker

[Install]
WantedBy=multi-user.target

然后start相对应的服务即可。

常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 生产消息
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

# 消费消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

# broker的写权限关闭
bin/mqadmin updateBrokerConfig -b 192.168.x.x:10911 -n 192.168.x.x:9876 -k brokerPermission -v 4

# 恢复该节点的写权限
bin/mqadmin updateBrokerConfig -b 192.168.x.x:10911 -n 192.168.x.x:9876 -k brokerPermission -v 6

# 查看集群信息,集群、BrokerName、BrokerId、TPS等信息
./bin/mqadmin clusterList -n localhost:9876

# 获取全部topic
./bin/mqadmin topicList -n localhost:9876 -c DevCluster > topiclist

# 获取topic 路由信息
./bin/mqadmin topicRoute -t demo-cluster -n localhost:9876

# 获取topic offset
./bin/mqadmin topicStatus -t demo-cluster -n localhost:9876

# 打印Topic订阅关系、TPS、积累量、24h读写总量等信息
./bin/mqadmin statsAll -n localhost:9876

# 修改broker 参数
./bin/mqadmin updateBrokerConfig -n localhost:9876 -b 10.0.xxx.2:10911 -k waitTimeMillsInSendQueue -v 500 -c TestCluster

# 发送消息
./bin/mqadmin sendMessage -n localhost:9876 -t test -p "this is test"

# 消费
./bin/mqadmin consumeMessage -n localhost:9876 -t test

有关 mqadmin 的命令说明,可以访问: mqadmin管理工具

⁨rocketmq-console部署

rocketmq-console是可以管理rocketmq集群的,先下载rocketmq-externals

1
git clone https://github.com/apache/rocketmq-externals.git

下载会比较慢,自己想办法解决:

修改‎⁨⁨rocketmq-externals⁩/⁨rocketmq-console⁩/⁨src⁩/⁨main⁩/⁨resources⁩/application.properties

1
2
3
server.address=0.0.0.0
server.port=8088
rocketmq.config.namesrvAddr=127.0.0.1:9876

修改‎⁨⁨rocketmq-externals⁩/⁨rocketmq-console/pom.xml,与服务端版本一致

1
<rocketmq.version>4.7.1</rocketmq.version>

打包, 打包期间报错修改pom.xml中的jar 版本

1
2
cd rocketmq-externals/rocketmq-console
mvn clean package -Dmaven.test.skip=true

启动 java -jar target/rocketmq-console-ng-2.0.0.jar,访问 http://localhost:8088 即可。

如果想自定义参数,可以直接使用配置文件来做自定义:

  • users.properties,配置控制台用户名和密码。
1
2
3
4
5
# This file supports hot change, any change will be auto-reloaded without Console restarting.
# Format: a user per line, username=password[,N] #N is optional, 0 (Normal User); 1 (Admin)

# Define Admin
admin=admin,1
  • application.properties:配置访问端口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    server.contextPath=
    server.port=8088

    ### SSL setting
    #server.ssl.key-store=classpath:rmqcngkeystore.jks
    #server.ssl.key-store-password=rocketmq
    #server.ssl.keyStoreType=PKCS12
    #server.ssl.keyAlias=rmqcngkey

    #spring.application.index=true
    spring.application.name=rocketmq-console
    spring.http.encoding.charset=UTF-8
    spring.http.encoding.enabled=true
    spring.http.encoding.force=true
    logging.config=classpath:logback.xml
    #if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
    rocketmq.config.namesrvAddr=127.0.0.1:9876
    #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
    rocketmq.config.isVIPChannel=
    #rocketmq-console's data path:dashboard/monitor
    rocketmq.config.dataPath=/usr/local/rocketmq/data/broker-a
    #set it false if you don't want use dashboard.default true
    rocketmq.config.enableDashBoardCollect=true
    #set the message track trace topic if you don't want use the default one
    rocketmq.config.msgTrackTopicName=
    rocketmq.config.ticketKey=ticket

    #Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
    rocketmq.config.loginRequired=true

然后使用 java -jar rocketmq-console-ng-2.0.0.jar -Dspring.config.location=./ 来启动。

使用方法,请参考 RocketMQ系列:rocketmq运维控制台使用详解(全网独家)

mqadmin详解

为了方便命令运行,将 export PATH=$PATH:/usr/local/rocketmq/bin/加入到/etc/profile。

集群状态查看

1
2
3
4
5
6
7
# 集群状态
[root@8fptpfxk957bjm bin]# sh mqadmin clusterList 2>/dev/null
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
DefaultCluster broker-a 0 172.31.0.21:10911 V4_8_0 0.00(0,0ms) 0.00(0,0ms) 0 450874.19 0.0095
DefaultCluster broker-a 1 172.31.0.22:10923 V4_8_0 0.00(0,0ms) 0.00(0,0ms) 0 450874.19 0.0110
DefaultCluster broker-b 0 172.31.0.22:10911 V4_8_0 0.00(0,0ms) 0.00(0,0ms) 0 450874.19 0.0110
DefaultCluster broker-b 1 172.31.0.21:10923 V4_8_0 0.00(0,0ms) 0.00(0,0ms) 0 450874.19 0.0095

topic相关命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# topicList
[root@8fptpfxk957bjm ~]# mqadmin topicList -n localhost:9876 2>/dev/null |egrep -v "OFFSET_MOVED_EVENT|%RETRY%|BenchmarkTest|broker-|DefaultCluster|SELF_TEST_TOPIC"
RMQ_SYS_TRANS_HALF_TOPIC
test
TBW102
SCHEDULE_TOPIC_XXXX
TopicTest

# 查看 Topic 消息队列offset
[root@8fptpfxk957bjm bin]# sh mqadmin topicStatus -t TopicTest 2>/dev/null
#Broker Name #QID #Min Offset #Max Offset #Last Updated
broker-a 0 0 125 2021-06-08 17:19:50,639
broker-a 1 0 125 2021-06-08 17:19:50,640
broker-a 2 0 125 2021-06-08 17:19:50,641
broker-a 3 0 125 2021-06-08 17:19:50,641
broker-b 0 0 125 2021-06-08 17:19:49,100
broker-b 1 0 125 2021-06-08 17:19:49,100
broker-b 2 0 125 2021-06-08 17:19:49,101
broker-b 3 0 125 2021-06-08 17:19:49,102

# 查看 Topic 路由信息,数据保存在哪个broker上,读写队列是多少等信息
[root@8fptpfxk957bjm bin]# sh mqadmin topicRoute -t TopicTest 2>/dev/null
{
"brokerDatas":[
{
"brokerAddrs":{0:"172.31.0.22:10911",1:"172.31.0.21:10923"
},
"brokerName":"broker-b",
"cluster":"DefaultCluster"
},
{
"brokerAddrs":{0:"172.31.0.21:10911",1:"172.31.0.22:10923"
},
"brokerName":"broker-a",
"cluster":"DefaultCluster"
}
],
"filterServerTable":{},
"queueDatas":[
{
"brokerName":"broker-b",
"perm":6,
"readQueueNums":4,
"topicSynFlag":0,
"writeQueueNums":4
},
{
"brokerName":"broker-a",
"perm":6,
"readQueueNums":4,
"topicSynFlag":0,
"writeQueueNums":4
}
]
}

[root@8fptpfxk957bjm ~]# mqadmin topicRoute -t TopicTest -n localhost:9876 2>/dev/null |grep readQueueNums
"readQueueNums":4,
"readQueueNums":4,

# 查询TOPIC被哪些Consumer Group订阅了
[root@8fptpfxk957bjm bin]# sh mqadmin statsAll 2>/dev/null
#Topic #Consumer Group #Accumulation #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour
SCHEDULE_TOPIC_XXXX 0 0.00 0 NO_CONSUMER
RMQ_SYS_TRANS_HALF_TOPIC 0 0.00 0 NO_CONSUMER
test 0 0.00 0 NO_CONSUMER
DefaultCluster_REPLY_TOPIC 0 0.00 0 NO_CONSUMER
broker-b 0 0.00 0 NO_CONSUMER
BenchmarkTest 0 0.00 0 NO_CONSUMER
OFFSET_MOVED_EVENT 0 0.00 0 NO_CONSUMER
TopicTest 0 0.00 1000 NO_CONSUMER
broker-a 0 0.00 0 NO_CONSUMER
TBW102 0 0.00 0 NO_CONSUMER
SELF_TEST_TOPIC 0 0.00 0 NO_CONSUMER
DefaultCluster 0 0.00 0 NO_CONSUMER

消息相关命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# 发送消息
[root@8fptpfxk957bjm src]# mqadmin sendMessage -n localhost:9876 -t test -p "this is test"
#Broker Name #QID #Send Result #MsgId
broker-a 14 SEND_OK 7F0000015DFD6FF3C5B52B7569C90000

# 消费消息
[root@8fptpfxk957bjm src]# mqadmin consumeMessage -n localhost:9876 -t test
MessageQueue [topic=test, brokerName=broker-a, queueId=15] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=15] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=13] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=13] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=11] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=11] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=9] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=9] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=7] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=7] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=5] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=5] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=3] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=3] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=1] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=1] print msg finished. status=NO_NEW_MSG, offset=0
Consume ok
MSGID: 7F0000015DFD6FF3C5B52B7569C90000 MessageExt [brokerName=broker-a, queueId=14, storeSize=182, queueOffset=0, sysFlag=0, bornTimestamp=1623205915082, bornHost=/172.31.0.21:52474, storeTimestamp=1623205915092, storeHost=/172.31.0.21:10911, msgId=AC1F001500002A9F0000000000018C48, commitLogOffset=101448, bodyCRC=4604200, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='test', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, UNIQ_KEY=7F0000015DFD6FF3C5B52B7569C90000, CLUSTER=DefaultCluster, WAIT=true}, body=[116, 104, 105, 115, 32, 105, 115, 32, 116, 101, 115, 116], transactionId='null'}] BODY: this is test
MessageQueue [topic=test, brokerName=broker-a, queueId=14] print msg finished. status=NO_NEW_MSG, offset=1
MessageQueue [topic=test, brokerName=broker-b, queueId=14] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=12] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=12] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=10] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=10] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=8] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=8] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=6] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=6] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=4] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=4] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=2] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=2] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-a, queueId=0] print msg finished. status=NO_NEW_MSG, offset=0
MessageQueue [topic=test, brokerName=broker-b, queueId=0] print msg finished. status=NO_NEW_MSG, offset=0

# 将读写队列配置为4
[root@8fptpfxk957bjm rocketmq]# mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t test -r 4 -w 4 2>/dev/null
create topic to 172.31.0.21:10911 success.
create topic to 172.31.0.22:10911 success.
TopicConfig [topicName=test, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]

# 打印Topic订阅关系、TPS、积累量、24h读写总量等信息
[root@8fptpfxk957bjm src]# mqadmin statsAll -n localhost:9876 2>/dev/null
#Topic #Consumer Group #Accumulation #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour
RMQ_SYS_TRANS_HALF_TOPIC 0 0.00 0 NO_CONSUMER
test 0 0.00 0 NO_CONSUMER
BenchmarkTest 0 0.00 0 NO_CONSUMER
OFFSET_MOVED_EVENT 0 0.00 0 NO_CONSUMER
TBW102 0 0.00 0 NO_CONSUMER
SELF_TEST_TOPIC 0 0.00 0 NO_CONSUMER
DefaultCluster 0 0.00 0 NO_CONSUMER
SCHEDULE_TOPIC_XXXX 0 0.00 0 NO_CONSUMER
DefaultCluster_REPLY_TOPIC 0 0.00 0 NO_CONSUMER
broker-b 0 0.00 0 NO_CONSUMER
TopicTest please_rename_unique_group_name_ 605 0.00 0.00 1000 2586
broker-a 0 0.00 0 NO_CONSUMER

# 查看消费的情况,不带-g是否总体的情况
[root@8fptpfxk957bjm rocketmq]# mqadmin consumerProgress -n localhost:9876 2>/dev/null
#Group #Count #Version #Type #Model #TPS #Diff Total
please_rename_unique_group_name_ 0 OFFLINE 0 0
[root@8fptpfxk957bjm rocketmq]#

# 查看具体consumer的消费情况
[root@8fptpfxk957bjm rocketmq]# mqadmin consumerProgress -n localhost:9876 -g please_rename_unique_group_name_4 2>/dev/null
#Topic #Broker Name #QID #Broker Offset #Consumer Offset #Client IP #Diff #LastTime
%RETRY%please_rename_unique_grou broker-a 0 0 0 N/A 0 N/A
%RETRY%please_rename_unique_grou broker-b 0 0 0 N/A 0 N/A
TopicTest broker-a 0 125 125 N/A 0 2021-06-08 17:19:50
TopicTest broker-a 1 125 125 N/A 0 2021-06-08 17:19:50
TopicTest broker-a 2 125 125 N/A 0 2021-06-08 17:19:50
TopicTest broker-a 3 125 125 N/A 0 2021-06-08 17:19:50
TopicTest broker-b 0 125 125 N/A 0 2021-06-08 17:19:49
TopicTest broker-b 1 125 125 N/A 0 2021-06-08 17:19:49
TopicTest broker-b 2 125 125 N/A 0 2021-06-08 17:19:49
TopicTest broker-b 3 125 125 N/A 0 2021-06-08 17:19:49

Consume TPS: 0.00
Diff Total: 0

# 以broker为角度,查看consume的消费情况
[root@8fptpfxk957bjm rocketmq]# mqadmin brokerConsumeStats -n localhost:9876 -b 172.31.0.22:10911 2>/dev/null
#Topic #Group #Broker Name #QID #Broker Offset #Consumer Offset #Diff #LastTime
TopicTest please_rename_unique_group_name_4 broker-b 0 125 125 0 2021-06-08 17:19:49
TopicTest please_rename_unique_group_name_4 broker-b 1 125 125 0 2021-06-08 17:19:49
TopicTest please_rename_unique_group_name_4 broker-b 2 125 125 0 2021-06-08 17:19:49
TopicTest please_rename_unique_group_name_4 broker-b 3 125 125 0 2021-06-08 17:19:49

Diff Total: 0

# 创建订阅组
[root@8fptpfxk957bjm rocketmq]# mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g my_consumer_01
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
create subscription group to 172.31.0.21:10911 success.
create subscription group to 172.31.0.22:10911 success.
SubscriptionGroupConfig [groupName=my_consumer_01, consumeEnable=true, consumeFromMinEnable=false, consumeBroadcastEnable=false, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]

可以使用命令创建订阅组,但是没有找到方法怎么样去跟topic关联起来。所以在rocketmq里面,不需要创建消费组。

参考资料

RocketMQ学习之安装部署及基础讲解

Apache RocketMQ开发者指南

RocketMQ-Dledger集群搭建

  • 本文作者: wumingx
  • 本文链接: https://www.wumingx.com/linux/rocketMQ.html
  • 本文主题: rocketMQ集群化部署以及常见运维命令
  • 版权声明: 本站所有文章除特别声明外,转载请注明出处!如有侵权,请联系我删除。
0%