RabbitMQ消息队列集成
示例代码:采用rabbitmq实现消息队列。应用场景:功能解耦、流量削峰、异步处理
jeecg-cloud-module\jeecg-cloud-test\jeecg-cloud-test-rabbitmq
前提:搭建rabbitmq服务
-
1.安装RabbitMq软件
-
2.访问下RabbitM后台
- 3.创建两个交换机
如果程序未自动创建这两个交换机,请手工创建
交换机 | 类型 |
---|---|
jeecg.delayed.exchange | x-delayed-message |
jeecg.direct.exchange | direct |
效果图:
集成rabbitmq的starter
第一步:集成引入消息队列starter依赖
<!--引入rabbitmq Starter-->
<dependency>
<groupId>org.jeecgframework.boot</groupId>
<artifactId>jeecg-boot-starter-rabbitmq</artifactId>
</dependency>
第二步:推送Mq消息
为了简化MQ使用做了封装,提供推送消息的简易工具类
@Autowired
private RabbitMqClient rabbitMqClient;
常用方法
- 立即发送 void sendMessage(String queueName, Object params)
- 发送延时消息 void sendMessage(String queueName, Object params, Integer expiration)
- 发送远程消息 void publishEvent(String handlerName, BaseMap params)
方法参数统一说明
参数名 | 参数描述 | 参数类型 |
---|---|---|
queueName | 队列名称(队列自动创建,无需手动) | String |
handlerName | 参数 | 自定义消息处理器beanName |
params | 参数 | Object |
expiration | 延迟时间 | int(毫秒) |
queueName不需要去MQ中创建,jeecg做了封装会自动创建。
第三步: 编写完整示例
简单3步完成消息的发送和接收
- 1.引入RabbitMqClient
@Autowired
private RabbitMqClient rabbitMqClient;
- 2.发送消息
BaseMap map = new BaseMap();
map.put("orderId", "12345");
// test2是队列名字,程序自动创建
rabbitMqClient.sendMessage("test2", map);
// 延迟10秒发送
rabbitMqClient.sendMessage("test2", map,10000);
- 3.接受消息
使用注解@RabbitListener(queues = "test2")
定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中)
监听写法一
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import org.jeecg.common.annotation.RabbitComponent;
import org.jeecg.common.base.BaseMap;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
@Slf4j
@RabbitListener(queues = "test2")
@RabbitComponent(value = "testListener2")
public class DemoRabbitMqListener3 extends BaseRabbiMqHandler<BaseMap> {
@RabbitHandler
public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(baseMap, deliveryTag, channel, new MqListener<BaseMap>() {
@Override
public void handler(BaseMap map, Channel channel) {
String orderId = map.get("orderId").toString();
log.info("业务处理3:orderId:" + orderId);
}
});
}
}
或者
监听写法二
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import org.jeecg.common.annotation.RabbitComponent;
import org.jeecg.common.base.BaseMap;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
@Slf4j
@RabbitComponent(value = "testListener2")
public class DemoRabbitMqListener2 extends BaseRabbiMqHandler<BaseMap> {
@RabbitListener(queues = "test2")
public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(baseMap, deliveryTag, channel, new MqListener<BaseMap>() {
@Override
public void handler(BaseMap map, Channel channel) {
String orderId = map.get("orderId");
log.info("业务处理2:orderId:" + orderId);
}
});
}
}
N个接受者效果图