跳到主要内容

RabbitMQ消息队列集成

采用rabbitmq实现消息队列。应用场景:功能解耦、流量削峰、异步处理

示例代码:jeecg-cloud-module\jeecg-cloud-test\jeecg-cloud-test-rabbitmq

前提:搭建rabbitmq服务

  • 3.创建两个交换机

如果程序未自动创建这两个交换机,请手工创建

交换机类型
jeecg.delayed.exchangex-delayed-message
jeecg.direct.exchangedirect

效果图:

集成rabbitmq的starter

第一步:集成引入消息队列starter依赖

<!--引入rabbitmq Starter-->
<dependency>
<groupId>org.jeecgframework.boot</groupId>
<artifactId>jeecg-boot-starter-rabbitmq</artifactId>
</dependency>

第二步:推送Mq消息

为了简化MQ使用做了封装,提供推送消息的简易工具类

@Autowired
private RabbitMqClient rabbitMqClient;

常用方法

  • 立即发送 void sendMessage(String queueName, Object params)
  • 发送延时消息 void sendMessage(String queueName, Object params, Integer expiration)
  • 发送远程消息 void publishEvent(String handlerName, BaseMap params)

方法参数统一说明

参数名参数描述参数类型
queueName队列名称(队列自动创建,无需手动)String
handlerName参数自定义消息处理器beanName
params参数Object
expiration延迟时间int(毫秒)

queueName不需要去MQ中创建,jeecg做了封装会自动创建。

第三步: 编写完整示例

简单3步完成消息的发送和接收

  • 1.引入RabbitMqClient
@Autowired
private RabbitMqClient rabbitMqClient;
  • 2.发送消息
BaseMap map = new BaseMap();
map.put("orderId", "12345");
// test2是队列名字,程序自动创建
rabbitMqClient.sendMessage("test2", map);
// 延迟10秒发送
rabbitMqClient.sendMessage("test2", map,10000);
  • 3.接受消息

使用注解@RabbitListener(queues = "test2")定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中)

监听写法一

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import org.jeecg.common.annotation.RabbitComponent;
import org.jeecg.common.base.BaseMap;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;

@Slf4j
@RabbitListener(queues = "test2")
@RabbitComponent(value = "testListener2")
public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<BaseMap> {

@RabbitHandler
public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(baseMap, deliveryTag, channel, new MqListener<BaseMap>() {
@Override
public void handler(BaseMap map, Channel channel) {
String orderId = map.get("orderId").toString();
log.info("业务处理3:orderId:" + orderId);
}
});
}
}

或者

监听写法二

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import org.jeecg.common.annotation.RabbitComponent;
import org.jeecg.common.base.BaseMap;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;

@Slf4j
@RabbitComponent(value = "testListener2")
public class DemoRabbitMqListener2 extends BaseRabbiMqHandler<BaseMap> {

@RabbitListener(queues = "test2")
public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(baseMap, deliveryTag, channel, new MqListener<BaseMap>() {
@Override
public void handler(BaseMap map, Channel channel) {
String orderId = map.get("orderId");
log.info("业务处理2:orderId:" + orderId);
}
});
}

}

N个接受者效果图