RocketMQ入门使用

最近需要接入数据源是RocketMQ,这里记录下RocketMQ的使用

安装部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 可自己编译也可直接使用二进制包
unzip rocketmq-all-4.9.0-source-release.zip
cd rocketmq-all-4.9.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.9.0/rocketmq-4.9.0

# 启动NameServer
nohup sh bin/mqnamesrv &

# 启动Broker
nohup sh bin/mqbroker -n localhost:9876 &

# 发送,消费消息
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

# 关闭服务
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

ACL配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# plain_acl.yml参数含义
globalWhiteRemoteAddresses: 全局白名单IP
accessKey: AccessKey用户名
secretKey: SecretKey密码
whiteRemoteAddress: 用户白名单IP
admin: 是否管理员账号
defaultTopicPerm: 默认Topic权限
defaultGroupPerm: 默认消费组权限
topicPerms: 各个Topic的权限
groupPerms: 各个消费组的权限

# 权限定义
DENY: 拒绝
ANY: 全权限
PUB: 发送权限
SUB: 订阅权限

# 修改conf/broker.conf,添加aclEnable
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
aclEnable = true

# 修改conf/plain_acl.conf
globalWhiteRemoteAddresses:
#- 10.10.103.*
#- 192.168.0.*

accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- TopicTest=PUB
groupPerms:
# the group should convert to retry topic
- test=DENY

- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true

# 这里我使用4.9.0版本,修改配置文件后,ACL并未生效

常用命令

1
2
3
4
5
# 创建Topic
sh mqadmin updateTopic -b localhost:9876 -t TopicA

# 创建消费组
sh mqadmin updateSubGroup -b localhost:9876 -t SubGroupA

消费代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {

// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test",getAclRPCHook(),new AllocateMessageQueueAveragely());

// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");

// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//Launch the consumer instance.
consumer.start();

System.out.printf("Consumer Started.%n");
}

static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("RocketMQ","1234567"));
}
}