rabitmq消息队列集成2.4+

最后更新于:2022-04-02 07:10:24

# rabbitmq消息队列集成2.4+ > JeecgBoot 消息队列使用rabbitmq,本文以jeecg-demo模块为例介绍如何集成消息队列 > 应用场景:功能解耦、流量削峰、异步处理 [TOC] 前提:JeecgBoot启动会自动创建两个交换机 `jeecg.delayed.exchange` `jeecg.direct.exchange` 如果没有生成,可以手工创建 ## 第一步:引入消息队列starter依赖 ~~~ org.jeecgframework.boot jeecg-boot-starter-rabbitmq ~~~ ## 第二步:推送消息API介绍 **方法说明** | 参数名 | 参数描述 | 参数类型 | --- | --- |--- | | queueName | 队列名称(队列自动创建,无需手动)| String | handlerName| 参数|自定义消息处理器beanName | params| 参数|Object | expiration| 延迟时间| int(毫秒) ``` //立即发送 void sendMessage(String queueName, Object params) //发送延时消息 void sendMessage(String queueName, Object params, Integer expiration) //发送远程消息 void publishEvent(String handlerName, BaseMap params) ``` ## 第三步: 编写示例(简单3步完成消息的发送和接收) 1. **注入消息发送客户端** ``` @Autowired private RabbitMqClient rabbitMqClient; ``` 2. **发送消息示例代码** ``` BaseMap map = new BaseMap(); map.put("orderId", "12345"); rabbitMqClient.sendMessage("test", map); //延迟10秒发送 //rabbitMqClient.sendMessage("test", map,10000); ``` 3. **编写消息监听监听器** 定义接收者(可以定义N个接受者,消息会均匀的发送到N个接收者中) ~~~ import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler; import org.jeecg.boot.starter.rabbitmq.listenter.MqListener; import org.jeecg.common.annotation.RabbitComponent; import org.jeecg.common.base.BaseMap; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; @Slf4j @RabbitListener(queues = "test3") @RabbitComponent(value = "testListener3") public class DemoRabbitMqListener3 extends BaseRabbiMqHandler { @RabbitHandler public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { super.onMessage(baseMap, deliveryTag, channel, new MqListener() { @Override public void handler(BaseMap map, Channel channel) { String orderId = map.get("orderId").toString(); log.info("业务处理3:orderId:" + orderId); } }); } } ~~~ 或者 ~~~ import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler; import org.jeecg.boot.starter.rabbitmq.listenter.MqListener; import org.jeecg.common.annotation.RabbitComponent; import org.jeecg.common.base.BaseMap; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; @Slf4j @RabbitComponent(value = "testListener2") public class DemoRabbitMqListener2 extends BaseRabbiMqHandler { @RabbitListener(queues = "test2") public void onMessage(BaseMap baseMap, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { super.onMessage(baseMap, deliveryTag, channel, new MqListener() { @Override public void handler(BaseMap map, Channel channel) { String orderId = map.get("orderId"); log.info("业务处理2:orderId:" + orderId); } }); } } ~~~ N个接受者效果图 ![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/d0/bb/d0bbd0422f37eccf2786c2827516d3e0_1307x1242.png)
';