rabbitmq迁移rocketmq使用指引
JeecgBoot v3.6.4+ 支持
目前仅支持rocketmq 4.x,暂不支持rocketmq 5.x
文档
依赖引用
<dependency>
<groupId>org.jeecgframework.boot</groupId>
<artifactId>jeecg-cloud-test-rocketmq</artifactId>
<version>${jeecgboot.version}</version>
</dependency>
添加配置
rocketmq:
name-server: jeecg-boot-rocketmq:9876
producer:
group: jeecg-group
sendMessageTimeout: 300000
tls-enable: false
spring:
cloud:
stream:
rocketmq:
binder:
name-server: ${rocketmq.name-server} # RocketMQ Namesrv 地址
rabbitmq往rocketmq转换
考虑到迁移切换代码修改量问题,本次在rocketmq-starter中封装的client与原有rabbitmq-starter的client保持同名,都为RabbitMqClient,并且包结构也保持了一致,最大程度减轻了迁移时产生的工作量
发生普通消息
原rabbitmq
@Autowired
private RabbitMqClient rabbitMqClient;
rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER, map);
迁移至rocketmq
@Autowired
private RabbitMqClient rabbitMqClient;
rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER, map);
发送延迟消息
原rabbitmq
@Autowired
private RabbitMqClient rabbitMqClient;
// 延迟10秒
rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER_TIME, map,10);
迁移至rocketmq
@Autowired
private RabbitMqClient rabbitMqClient;
/**
* 延迟消息, delayLevel取如下列表中的值
* Level 1: 1s(1秒)
* Level 2: 5s(5秒)
* Level 3: 10s(10秒)
* Level 4: 30s(30秒)
* Level 5: 1m(1分钟)
* Level 6: 2m(2分钟)
* Level 7: 3m(3分钟)
* Level 8: 4m(4分钟)
* Level 9: 5m(5分钟)
* Level 10: 6m(6分钟)
* Level 11: 7m(7分钟)
* Level 12: 8m(8分钟)
* Level 13: 9m(9分钟)
* Level 14: 10m(10分钟)
* Level 15: 20m(20分钟)
* Level 16: 30m(30分钟)
* Level 17: 1h(1小时)
* Level 18: 2h(2小时)
*/
rabbitMqClient.sendMessage(CloudConstant.MQ_JEECG_PLACE_ORDER_TIME, map,2);
发生总线消息
原rabbitmq
@Autowired
private RabbitMqClient rabbitMqClient;
//rabbitmq消息总线测试
BaseMap params = new BaseMap();
params.put("orderId", "123456");
rabbitMqClient.publishEvent(CloudConstant.MQ_DEMO_BUS_EVENT, params);
迁移至rocketmq
@Autowired
private RabbitMqClient rabbitMqClient;
//rocketmq消息总线测试
BaseMap params = new BaseMap();
params.put("orderId", "123456");
rabbitMqClient.publishEvent(CloudConstant.MQ_DEMO_BUS_EVENT, params);
普通消息监听/延迟消息监听
@RabbitListener(queues = CloudConstant.MQ_JEECG_PLACE_ORDER) // 监听队列名称
@RabbitComponent(value = "helloReceiver1")
public class HelloReceiver1 extends BaseRabbiMqHandler<BaseMap> {
@Autowired
private RestTemplate restTemplate;
@RabbitHandler
public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(baseMap, deliveryTag, channel, new MqListener<BaseMap>() {
@Override
public void handler(BaseMap map, Channel channel) {
//业务处理
String orderId = map.get("orderId").toString();
log.info("【我是处理人1】 MQ Receiver1,orderId : " + orderId);
}
});
}
}
迁移至rocketmq
@Component
// 监听topic名称,消费组
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver1")
public class HelloReceiver1 implements RocketMQListener<BaseMap> {
public void onMessage(BaseMap baseMap) {
log.info("helloReceiver1接收消息:" + baseMap);
}
}
总线消息监听
@Component(CloudConstant.MQ_DEMO_BUS_EVENT)
public class DemoBusEvent implements JeecgBusEventHandler {
@Override
public void onMessage(EventObj obj) {
if (ObjectUtil.isNotEmpty(obj)) {
BaseMap baseMap = obj.getBaseMap();
String orderId = baseMap.get("orderId");
log.info("业务处理----订单ID:" + orderId);
}
}
}
迁移至rocketmq
@Component(CloudConstant.MQ_DEMO_BUS_EVENT)
public class DemoBusEvent implements JeecgBusEventHandler {
@Override
public void onMessage(EventObj obj) {
if (ObjectUtil.isNotEmpty(obj)) {
BaseMap baseMap = obj.getBaseMap();
String orderId = baseMap.get("orderId");
log.info("业务处理----订单ID:" + orderId);
}
}
}
至此,所以需要迁移的内容已经全部讲完,除普通消息/延迟消息的监听消费需要改造外,其它的操作均不需要进行修改。
rocketmq 消费组
在监听消费注解@RocketMQMessageListener
中,除topic之外,还存在一个必填参数consumeGroup,消费组名称,rocketmq默认不提供消费组配置,需要自己指定消费组名称,不过在监听消费注解存,存在满足普通消息场景的配置,所以只需要指定一个消费名称即可,也可以通过rocketmq.consumer
指定消费组的配置,使rocketmq.consumer.group
与注解中的consumerGroup名称匹配即可使用配置文件中的消费配置,不同组名称配置不共享,如下配置为多组消费组消费一个topic的配置
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver1")
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver2")
@RocketMQMessageListener(topic = CloudConstant.MQ_JEECG_PLACE_ORDER, consumerGroup = "helloReceiver3")
rocketmq生产组
目前生产配置仅支持rocketmq.produce
进行配置,如需要多生产组,可修改不同应用之间的生产组名称
rocketmq客户端 API
/**
* 同步发送
* @param topic
* @param payload
*/
public void sendMessage(String topic, Object payload)
/**
* 同步批量发送
* @param topic
* @param payload
* @return
* @param <T>
*/
public <T> SendResult sendMessage(String topic, Collection<T> payload)
/**
* 异步发送
* @param topic
* @param payload
* @param callback
*/
public void sendAsyncMessage(String topic, Object payload, SendCallback callback)
/**
* 异步批量发送
* @param topic
* @param payload
* @param sendCallback
* @param <T>
*/
public <T> void sendAsyncMessage(String topic, Collection<T> payload, SendCallback sendCallback)
/**
* 同步发送顺序消息
* @param topic
* @param payload
* @param hashKey
*/
public void sendOrderlyMessage(String topic, Object payload, String hashKey)
/**
* 异步发送顺序消息
* @param topic
* @param payload
* @param hashKey
* @param callback
*/
public void sendAsyncOrderlyMessage(String topic, Object payload, String hashKey, SendCallback callback)
/**
* Oneway消息
* @param topic
* @param payload
*/
public void sendOneway(String topic, Object payload)
/**
* 顺序Oneway消息
* @param topic
* @param payload
* @param hashKey
*/
public void sendOnewayOrderly(String topic, Object payload, String hashKey)
/**
* 事务消息
* @param topic
* @param payload
* @param arg
*/
public void sendMessageInTransaction(String topic, Object payload, Object arg)
/**
* pull模式
* 拉取消息
* @param clazz
* @return
* @param <T>
*/
public <T> List<T> receive(Class<T> clazz)
/**
* 延迟消息, delayLevel取如下列表中的值,等后续rocket-spring升到2.3.0时,可以设置延迟多少秒,目前适配版本只能设置延迟等级
* Level 1: 1s(1秒)
* Level 2: 5s(5秒)
* Level 3: 10s(10秒)
* Level 4: 30s(30秒)
* Level 5: 1m(1分钟)
* Level 6: 2m(2分钟)
* Level 7: 3m(3分钟)
* Level 8: 4m(4分钟)
* Level 9: 5m(5分钟)
* Level 10: 6m(6分钟)
* Level 11: 7m(7分钟)
* Level 12: 8m(8分钟)
* Level 13: 9m(9分钟)
* Level 14: 10m(10分钟)
* Level 15: 20m(20分钟)
* Level 16: 30m(30分钟)
* Level 17: 1h(1小时)
* Level 18: 2h(2小时)
* @param topic
* @param payload
* @param delayLevel
* @return
*/
public SendResult sendMessage(String topic, Object payload, Integer delayLevel)
/**
* 发送远程事件
*
* @param handlerName
* @param baseMap
*/
public void publishEvent(String handlerName, BaseMap baseMap)