RocketMQ消息队列集成
前提:搭建RocketMQ服务
目前仅支持rocketmq 4.x,暂不支持rocketmq 5.x
快速安装参考文档 Docker安装RocketMQ
JeecgBoot集成 RocketMQ
MQ集成与配置
依赖引用
<dependency>
<groupId>org.jeecgframework.boot</groupId>
<artifactId>jeecg-cloud-test-rocketmq</artifactId>
<version>${jeecgboot.version}</version>
</dependency>
添加配置
rocketmq:
name-server: jeecg-boot-rocketmq:9876
producer:
group: jeecg-group
sendMessageTimeout: 300000
tls-enable: false
spring:
cloud:
stream:
rocketmq:
binder:
name-server: ${rocketmq.name-server} # RocketMQ Namesrv 地址
MQ的使用
发生普通消息
@Autowired
private RabbitMqClient rabbitMqClient;
rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER, map);
发送延迟消息
@Autowired
private RabbitMqClient rabbitMqClient;
/**
* 延迟消息, delayLevel取如下列表中的值
* Level 1: 1s(1秒)
* Level 2: 5s(5秒)
* Level 3: 10s(10秒)
* Level 4: 30s(30秒)
* Level 5: 1m(1分钟)
* Level 6: 2m(2分钟)
* Level 7: 3m(3分钟)
* Level 8: 4m(4分钟)
* Level 9: 5m(5分钟)
* Level 10: 6m(6分钟)
* Level 11: 7m(7分钟)
* Level 12: 8m(8分钟)
* Level 13: 9m(9分钟)
* Level 14: 10m(10分钟)
* Level 15: 20m(20分钟)
* Level 16: 30m(30分钟)
* Level 17: 1h(1小时)
* Level 18: 2h(2小时)
*/
rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER_TIME, map,2);
定义普通消息监听和延迟消息监听
@Component
// 监听topic名称,消费组
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver1")
public class HelloReceiver1 implements RocketMQListener<BaseMap> {
public void onMessage(BaseMap baseMap) {
log.info("helloReceiver1接收消息:" + baseMap);
}
}
消息总线的使用
发生总线消息
@Autowired
private RabbitMqClient rabbitMqClient;
//rocketmq消息总线测试
BaseMap params = new BaseMap();
params.put("orderId", "123456");
rabbitMqClient.publishEvent(CloudConstant.MQ_DEMO_BUS_EVENT, params);
总线消息监听
@Component(CloudConstant.MQ_DEMO_BUS_EVENT)
public class DemoBusEvent implements JeecgBusEventHandler {
@Override
public void onMessage(EventObj obj) {
if (ObjectUtil.isNotEmpty(obj)) {
BaseMap baseMap = obj.getBaseMap();
String orderId = baseMap.get("orderId");
log.info("业务处理----订单ID:" + orderId);
}
}
}
消费组
rocketmq 消费组
在监听消费注解@RocketMQMessageListener
中,除topic之外,还存在一个必填参数consumeGroup,消费组名称,rocketmq默认不提供消费组配置,需要自己指定消费组名称,不过在监听消费注解存,存在满足普通消息场景的配置,所以只需要指定一个消费名称即可,也可以通过rocketmq.consumer
指定消费组的配置,使rocketmq.consumer.group
与注解中的consumerGroup名称匹配即可使用配置文件中的消费配置,不同组名称配置不共享,如下配置为多组消费组消费一个topic的配置
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver1")
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver2")
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver3")
rocketmq生产组
目前生产配置仅支持rocketmq.produce
进行配置,如需要多生产组,可修改不同应用之间的生产组名称
rocketmq客户端 API
/**
* 同步发送
* @param topic
* @param payload
*/
public void sendMessage(String topic, Object payload)
/**
* 同步批量发送
* @param topic
* @param payload
* @return
* @param <T>
*/
public <T> SendResult sendMessage(String topic, Collection<T> payload)
/**
* 异步发送
* @param topic
* @param payload
* @param callback
*/
public void sendAsyncMessage(String topic, Object payload, SendCallback callback)
/**
* 异步批量发送
* @param topic
* @param payload
* @param sendCallback
* @param <T>
*/
public <T> void sendAsyncMessage(String topic, Collection<T> payload, SendCallback sendCallback)
/**
* 同步发送顺序消息
* @param topic
* @param payload
* @param hashKey
*/
public void sendOrderlyMessage(String topic, Object payload, String hashKey)
/**
* 异步发送顺序消息
* @param topic
* @param payload
* @param hashKey
* @param callback
*/
public void sendAsyncOrderlyMessage(String topic, Object payload, String hashKey, SendCallback callback)
/**
* Oneway消息
* @param topic
* @param payload
*/
public void sendOneway(String topic, Object payload)
/**
* 顺序Oneway消息
* @param topic
* @param payload
* @param hashKey
*/
public void sendOnewayOrderly(String topic, Object payload, String hashKey)
/**
* 事务消息
* @param topic
* @param payload
* @param arg
*/
public void sendMessageInTransaction(String topic, Object payload, Object arg)
/**
* pull模式
* 拉取消息
* @param clazz
* @return
* @param <T>
*/
public <T> List<T> receive(Class<T> clazz)
/**
* 延迟消息, delayLevel取如下列表中的值,等后续rocket-spring升到2.3.0时,可以设置延迟多少秒,目前适配版本只能设置延迟等级
* Level 1: 1s(1秒)
* Level 2: 5s(5秒)
* Level 3: 10s(10秒)
* Level 4: 30s(30秒)
* Level 5: 1m(1分钟)
* Level 6: 2m(2分钟)
* Level 7: 3m(3分钟)
* Level 8: 4m(4分钟)
* Level 9: 5m(5分钟)
* Level 10: 6m(6分钟)
* Level 11: 7m(7分钟)
* Level 12: 8m(8分钟)
* Level 13: 9m(9分钟)
* Level 14: 10m(10分钟)
* Level 15: 20m(20分钟)
* Level 16: 30m(30分钟)
* Level 17: 1h(1小时)
* Level 18: 2h(2小时)
* @param topic
* @param payload
* @param delayLevel
* @return
*/
public SendResult sendMessage(String topic, Object payload, Integer delayLevel)
/**
* 发送远程事件
*
* @param handlerName
* @param baseMap
*/
public void publishEvent(String handlerName, BaseMap baseMap)