rabitmq消息队列集成2.4+
最后更新于:2022-04-02 07:10:24
# rabbitmq消息队列集成2.4+
> JeecgBoot 消息队列使用rabbitmq,本文以jeecg-demo模块为例介绍如何集成消息队列
> 应用场景:功能解耦、流量削峰、异步处理
[TOC]
前提:JeecgBoot启动会自动创建两个交换机 `jeecg.delayed.exchange` `jeecg.direct.exchange`
如果没有生成,可以手工创建
## 第一步:引入消息队列starter依赖
~~~
org.jeecgframework.boot
jeecg-boot-starter-rabbitmq
~~~
## 第二步:推送消息API介绍
**方法说明**
| 参数名 | 参数描述 | 参数类型
| --- | --- |--- |
| queueName | 队列名称(队列自动创建,无需手动)| String
| handlerName| 参数|自定义消息处理器beanName
| params| 参数|Object
| expiration| 延迟时间| int(毫秒)
```
//立即发送
void sendMessage(String queueName, Object params)
//发送延时消息
void sendMessage(String queueName, Object params, Integer expiration)
//发送远程消息
void publishEvent(String handlerName, BaseMap params)
```
## 第三步: 编写示例(简单3步完成消息的发送和接收)
1. **注入消息发送客户端**
```
@Autowired
private RabbitMqClient rabbitMqClient;
```
2. **发送消息示例代码**
```
BaseMap map = new BaseMap();
map.put("orderId", "12345");
rabbitMqClient.sendMessage("test", map);
//延迟10秒发送
//rabbitMqClient.sendMessage("test", map,10000);
```
3. **编写消息监听监听器**
定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中)
~~~
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import org.jeecg.common.annotation.RabbitComponent;
import org.jeecg.common.base.BaseMap;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
@Slf4j
@RabbitListener(queues = "test3")
@RabbitComponent(value = "testListener3")
public class DemoRabbitMqListener3 extends BaseRabbiMqHandler {
@RabbitHandler
public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(baseMap, deliveryTag, channel, new MqListener() {
@Override
public void handler(BaseMap map, Channel channel) {
String orderId = map.get("orderId").toString();
log.info("业务处理3:orderId:" + orderId);
}
});
}
}
~~~
或者
~~~
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import org.jeecg.common.annotation.RabbitComponent;
import org.jeecg.common.base.BaseMap;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
@Slf4j
@RabbitComponent(value = "testListener2")
public class DemoRabbitMqListener2 extends BaseRabbiMqHandler {
@RabbitListener(queues = "test2")
public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(baseMap, deliveryTag, channel, new MqListener() {
@Override
public void handler(BaseMap map, Channel channel) {
String orderId = map.get("orderId");
log.info("业务处理2:orderId:" + orderId);
}
});
}
}
~~~
N个接受者效果图
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/d0/bb/d0bbd0422f37eccf2786c2827516d3e0_1307x1242.png)
';