# 可自己编译也可直接使用二进制包 unzip rocketmq-all-4.9.0-source-release.zip cd rocketmq-all-4.9.0/ mvn -Prelease-all -DskipTests clean install -U cd distribution/target/rocketmq-4.9.0/rocketmq-4.9.0
# 启动NameServer nohup sh bin/mqnamesrv &
# 启动Broker nohup sh bin/mqbroker -n localhost:9876 &
# 发送,消费消息 export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
# 关闭服务 sh bin/mqshutdown broker sh bin/mqshutdown namesrv
accounts: - accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - TopicTest=PUB groupPerms: # the group should convert to retry topic - test=DENY
- accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192.168.1.* # if it is admin, it could access all resources admin: true # 这里我使用4.9.0版本,修改配置文件后,ACL并未生效
常用命令
1 2 3 4 5
# 创建Topic sh mqadmin updateTopic -b localhost:9876 -t TopicA
# 创建消费组 sh mqadmin updateSubGroup -b localhost:9876 -t SubGroupA
// Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test",getAclRPCHook(),new AllocateMessageQueueAveragely());
// Specify name server addresses. consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume. consumer.subscribe("TopicTest", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });