【9】 – 与Spring集成
最后更新于:2022-04-01 15:59:00
RabbitMQ入门教程 For Java【9】 - 与Spring集成
简介:
RabbitMQ在与Spring集成我做了两个项目,一个项目是消息生产者,负责发送消息,另外一个是消息消费者,负责监听消息。大致的流程图如下:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507da03e4d.jpg)
项目环境:
Windows7 64bit
Eclipse Kepler SR2
JDK 1.7
Tomcat 7
RabbitMQ 3.6.0
项目源码地址:
生产者:https://github.com/chwshuang/spring-rabbitmq-producer
消费者:https://github.com/chwshuang/spring-rabbitmq-customer
# 生产者:
与Spring集成的项目使用的是Maven,只需要一个依赖配置就搞定:
~~~
lt;dependency>
org.springframework.amqp
spring-rabbit
1.3.5.RELEASE
lt;/dependency>
~~~
消息生产者在Spring的配置也比较简单,只需要一个连接工厂和一个连接模版类就搞定了。
生产者在Spring中使用时,只需要定义一个服务接口实现类即可。
~~~
@Service("rabbitService")
public class RabbitServiceImpl {
private static Logger log = LoggerFactory.getLogger(RabbitServiceImpl.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
* @param msg 消息内容
* @param routingKey 路由关键字
* void
*/
public void setMessage(String msg, String routingKey) {
rabbitTemplate.convertAndSend(routingKey, msg);
log.info("rabbitmq--发送消息完成: routingKey[{}]-msg[{}]", routingKey, msg);
}
}
~~~
生产者端还需要创建一个测试页面,通过Ajax技术发送消息到生产者端的控制层,由控制层调用消息服务层发送消息。
~~~
<%@ page contentType="text/html;charset=UTF-8" language="java"%>
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<script src="http://cdn.bootcss.com/jquery/1.11.3/jquery.min.js"></script>
<title>测试</title>
<script type="text/javascript">
function sendMsg(){
var t = $("#type").val();
//alert(t);
$.post('/producer/rabbit/setMessage',{msg:'hello world '+t+' rabbit!',type:t}, function(result){
if(result.success){
//alert("发送成功");
}else{
alert("发送失败");
}
},'json');
}
</script>
</head>
<body>
<h1>发送消息</h1><hr>
<select id="type" >
<option selected="selected" value="red">red</option>
<option value="blue">blue</option>
</select>
<button id="send" onclick="sendMsg()" value="发送消息" type="button" title="send">发送</button>
</body>
</html>
~~~
# 消费者:
RabbitMQ的消息消费者需要监听消息,以及处理收到的消息,所以需要配置一个监听器,声明一个需要监听的队列,和一个消息处理器。消费者端与spring集成的侵入较少。
~~~
<pre name="code" class="plain"> <!-- ========================================RabbitMQ========================================= -->
<!-- 连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="192.168.5.198" publisher-confirms="true" virtual-host="test" username="test" password="1234" />
<!-- 监听器 -->
<rabbit:listener-container connection-factory="connectionFactory">
<!-- queues是队列名称,可填多个,用逗号隔开, method是ref指定的Bean调用Invoke方法执行的方法名称 -->
<rabbit:listener queues="red" method="onMessage" ref="redQueueListener" />
<rabbit:listener queues="blue" method="onMessage" ref="blueQueueListener" />
</rabbit:listener-container>
<!-- 队列声明 -->
<rabbit:queue name="red" durable="true" />
<!-- 队列声明 -->
<rabbit:queue name="blue" durable="true" />
<!-- 红色监听处理器 -->
<bean id="redQueueListener" class="com.aitongyi.customer.RedQueueListener" />
<!-- 蓝色监听处理器 -->
<bean id="blueQueueListener" class="com.aitongyi.customer.BlueQueueListener" />
~~~
消息处理器可以只是一个通过spring管理的普通Bean对象,需要有一个在xml中定义method同名的方法
~~~
public class RedQueueListener{
private static Logger log = LoggerFactory.getLogger(RedQueueListener.class);
/**
* 处理消息
* @param message
* void
*/
public void onMessage(String message) {
log.info("RedQueueListener--receved:" + message);
}
}
~~~
# RabbitMQ创建消息环境
在项目开发完成后,我们需要在RabbitMQ中创建vhost,user、queues,这样,消费者在启动的时候,登录、声明虚拟机、队列就不会报错。
1\. 创建test虚拟机,然后点击Name标签,进入虚拟机,添加权限
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507da19ff0.jpg)
2\. 进入【Admin】菜单【Users】标签创建用户,然后点击用户名称进入详情设置权限
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507da308e2.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507da43229.jpg)
3\. 进入【Queues】菜单,创建队列
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507da567b1.jpg)
# 执行测试:
将生产者和消费者项目添加到Tomcat中,然后启动,在浏览器输入【http://localhost:8080/producer/test.jsp】,进入测试页面,分别发送红色、蓝色的消息到队列,消费者端会显示接收日志
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507da6f210.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507da7f629.jpg)
生产者端日志
~~~
2016-01-25 18:09:26 722 [INFO] c.a.p.c.RabbitController - rabbitmq--收到待发送消息: type[blue]-msg[hello world blue rabbit!]
2016-01-25 18:09:26 723 [INFO] c.a.p.s.RabbitServiceImpl - rabbitmq--发送消息完成: routingKey[blue]-msg[hello world blue rabbit!]
2016-01-25 18:09:28 715 [INFO] c.a.p.c.RabbitController - rabbitmq--收到待发送消息: type[red]-msg[hello world red rabbit!]
2016-01-25 18:09:28 716 [INFO] c.a.p.s.RabbitServiceImpl - rabbitmq--发送消息完成: routingKey[red]-msg[hello world red rabbit!]
~~~
消费者端日志
~~~
2016-01-25 18:09:26 727 [INFO] c.a.c.BlueQueueListener - BlueQueueListener Receved:hello world blue rabbit!
2016-01-25 18:09:28 719 [INFO] c.a.c.RedQueueListener - RedQueueListener Receved:hello world red rabbit!
~~~
【7】 – Window下的安装与配
最后更新于:2022-04-01 15:58:57
RabbitMQ入门教程 For Java【7】 - Window下的安装与配置
# 一、下载
1. RabbitMQ下载地址
进入RabbitMQ官网 : [http://www.rabbitmq.com/](http://www.rabbitmq.com/) 点击右侧【最新版本列表】中的3.6.0版本下载最新版本【rabbitmq-server-3.6.0.exe】。由于RabbitMQ是Erlang语言写的,所以,在安装RabbitMQ之前,必选先安装[Erlang OTP](http://www.erlang.org/download.html)到我们的电脑上.
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d89faba.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d8c13eb.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d8d3401.jpg)
2、Erlang OTP下载安装
Erlang OTP包含了一组库和实现方式,可以构建大规模、容错和分布式的应用程序,包含了许多强大的工具,能够实现H248,SNMP等多种协议. RabbitMQ是基于Erlang OTP开发出来的,所以需要下安装OTP环境。我的电脑是Windows7 64位系统,所以选择Windows 64-bit Binary File. 下载完成后,双击下载的应用【otp_win64_18.2.1.exe】一路下一步安装。安装完成后,需要在系统环境变量中配置一个【ERLANG_HOME】地址指向刚刚安装Erlang的目录:【C:\Program Files\erl7.2.1】,并将这个路径加入系统环境变量Path中【;%ERLANG_HOME%\bin】,“;”冒号是间隔,一定要加!
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d8ec116.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d910890.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d922559.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d935099.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d94939c.jpg)
# 二、RabbitMQ安装
双击下载的应用程序,选择一个安装目录【C:\Program Files\RabbitMQ Server】(保持默认就可以了),安装完成后,需要配置一下环境变量【RABBITMQ_BASE】到【C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.0】,并将路径加入到Path这个系统环境变量中【;%RABBITMQ_BASE%\sbin】,然后 进入安装目录【C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.0\sbin】下,双击【rabbitmq-server.bat】启动.
如果安装成功后双击【rabbitmq-server.bat】启动不了,提示【node with name rabbit already running on ***】的错误,就试着删除【C:\Users\Administrator\AppData\Roaming\rabbitmq】这个目录,如果还是没有效果,就点击开始菜单,在所有程序》RabbitMQ Service 》RabbitMQ Service stop,先关闭已经启动的RabbitMQ,然后再启动。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d95bd85.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d96c1fd.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d98019b.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d99151d.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d9a3989.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d9b43b0.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d9c40c0.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d9d5ac0.jpg)
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d9e2e01.jpg)
#
三、配置
安装完成后,需要对RabbitMQ进行配置,在RabbitMQ中,有几个概念需要了解:
virtual hosts : 虚拟主机。类似于数据库中的库的概念。一个RabbitMQ服务器可以有多个虚拟主机,默认虚拟主角是“/” 根目录。
user:访问虚拟主机的用户.
roles: 角色。有none、mamagement、policymaker、monitoring、administrator等权限。
permissions: 权限。分conf、write、read三种权限。类似Linux的执行、写、读权限。
1\. 添加用户【rabbitmqctl add_user username password】
2\. 分配角色【rabbitmqctl set_user_tags username administrator】
3\. 新增虚拟主机【rabbitmqctl add_vhost vhost_name】
4\. 将新虚拟主机授权给新用户【rabbitmqctl set_permissions -p vhost_name username '.*' '.*' '.*'】
#
# 四、角色说明:
**1\. none 最小权限角色**
不能访问管理插件。权限最小。
**2\. management 管理员角**
用户可以通过AMQP协议做的任何事,还包括:
列出自己可以通过AMQP登入的virtual hosts
查看自己的virtual hosts中的queues, exchanges 和 bindings
查看和关闭自己的channels 和 connections
查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
**3\. policymaker 决策者**
management可以做的任何事,还包括:
查看、创建和删除自己的virtual hosts所属的policies和parameters
**4\. monitoring 监控**
management可以做的任何事,还包括:
列出所有virtual hosts,包括他们不能登录的virtual hosts
查看其他用户的connections和channels
查看节点级别的数据如clustering和memory使用情况
查看真正的关于所有virtual hosts的全局的统计信息
**5.administrator 超级管理员**
policymaker和monitoring可以做的任何事,还包括:
创建和删除virtual hosts
查看、创建和删除users
查看创建和删除permissions
关闭其他用户的connections
# 五、权限说明
权限管理的指令格式是:【set_permissions [-p ] 】
的位置分别用正则表达式来匹配特定的资源
例如【'^(amq\.gen.*|amq\.default)$'】可以匹配RabbitMQ服务器默认生成的交换器
【'^$'】不匹配任何资源
# 六,用户、角色、权限、插件配置
**添加用户**
rabbitmqctl add_user [username] [password]
**添加虚拟主机**
rabbitmqctl add_vhost [vhost_name]
**修改虚拟机权限**
rabbitmqctl set_permissions -p [vhost_name] [username] '.*' '.*' '.*'
**设置角色**
rabbitmqctl set_user_tags [username] administrator
**启用web管理界面插件**
rabbitmq-plugins enable rabbitmq_management
【1】 – Hello World
最后更新于:2022-04-01 15:58:55
# RabbitMQ入门教程 For Java【1】 - Hello World
RabbitMQ是消息代理。从本质上说,它接受来自生产者的信息,并将它们传递给消费者。在两者之间,它可以根据你给它的路由,缓冲规则进行传递消息。
如果你的工作中需要用到RabbitMQ,那么我建议你先在电脑上安装好RabbitMQ服务器,然后打开eclipse,跟这我的教程一步步的学习RabbitMQ,这样你会对RabbitMQ有一个全面的认识,而且能打好一个很好的基础。如果你只是了解一下,那就随便看看吧。
### **我的开发环境:**
操作系统:**Windows7 64bit**
开发环境:**JDK 1.7 - 1.7.0_55**
开发工具:**Eclipse Kepler SR2**
RabbitMQ版本:**3.6.0**
Elang版本:**erl7.2.1**
关于Windows7下安装RabbitMQ的教程请先在网上找一下,有空我再补安装教程。
### 源码地址
https://github.com/chwshuang/rabbitmq.git
### 一、专业术语
### 1. 生产者:
在现实生活中就好比制造商品的工厂,他们是商品的生产者。生产者只意味着发送。发送消息的程序称之为一个生产者。我们用“P”表示:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d85b863.jpg)
### 2. 队列:
队列就像存放商品的仓库或者商店,是生产商品的工厂和购买商品的用户之间的中转站。队列就像是一个仓库或者流水线。在RabbitMQ中,信息流从你的应用程序出发,来到RabbitMQ的队列,所有信息可以只存储在一个队列中。队列可以存储很多的消息,因为它基本上是一个无限制的缓冲区,前提是你的机器有足够的存储空间。多个生产者可以将消息发送到同一个队列中,多个消费者也可以只从同一个队列接收数据。这就是队列的特性。队列用下面的图表示,图上面是队列的名字:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d86a62d.jpg)
### 3. 消费者
消费者就好比是从商店购买或从仓库取走商品的人,消费的意思就是接收。消费者是一个程序,主要是等待接收消息。我们的用“C”表示
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d8767b6.jpg)
**注意:**
生产者,消费者和队列(RabbitMQ)不必部署在同一台机器上。实际在生产环境的大多数应用中,他们都是分开部署的。
### 二、“Hello World”
### 1. 说明
在本教程中,我们我们通过2个java程序,一个发送消息的生产者,和一个接收信息并打印的消费者。想要了解rabbitmq,必须了解一些最基础的内容,我们先从一些代码片段来了解产生信息和接收消息的流程和方法。在编写代码前,我们先来用户一张图表示要做的事,在下图中,“P”是我们的生产者,“C”是我们的消费者。在中间红色框是代表RabbitMQ中的一个消息队列。箭头指向表示信息流的方向。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d885c0b.jpg)
### 2. 项目目录
在eclipse中创建一个rabbitmq的java项目,然后在项目下建一个名为lib的source folder, 然后将rabbitmq官网下载的rabbitmq-java-client-bin-3.6.0.rar解压出rabbitmq-client.jar拷贝到lib目录,如果创建的是Maven项目,只需要添加如下依赖:
~~~
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.0</version>
</dependency>
... ...
</dependencies>
~~~
然后在src目录下创建一个com.aitongyi.rabbit.helloworld包,最后项目目录如下:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d793b2e.jpg)
### 3. 消息生产者
~~~
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消息生产者
*
* @author hushuang
*
*/
public class P {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ地址
factory.setHost("localhost");
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 声明一个队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
// 发送消息到队列中
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("P [x] Sent '" + message + "'");
// 关闭频道和连接
channel.close();
connection.close();
}
}
~~~
### 4. 消息消费者
~~~
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消息消费者
*
* @author hushuang
*
*/
public class C {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ地址
factory.setHost("localhost");
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 声明要关注的队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("C [*] Waiting for messages. To exit press CTRL+C");
// DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("C [x] Received '" + message + "'");
}
};
// 自动回复队列应答 -- RabbitMQ中的消息确认机制,后面章节会详细讲解
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
~~~
### 5. 运行测试
如果在windows7下,你需要先双击【RabbitMQ Server\rabbitmq_server-3.6.0\sbin】目录下的rabbitmq-server.bat,来启动RabbitMQ服务。负责,运行我们的程序时,会提示【java.net.ConnectException: Connection refused: connect】
C [*] Waiting for messages. To exit press CTRL+CC [x] Received 'Hello World!'
先运行消费者服务器来关注【hello】这个队列的情况。
~~~
C [*] Waiting for messages. To exit press CTRL+C
~~~
C [*] Waiting for messages. To exit press CTRL+CC [x] Received 'Hello World!'
然后再运行生产者端,发送消息到队列中:
~~~
P [x] Sent 'Hello World!'
~~~
C [*] Waiting for messages. To exit press CTRL+CC [x] Received 'Hello World!'
再切换到消费者端的控制台,查看日志:
~~~
C [*] Waiting for messages. To exit press CTRL+C
C [x] Received 'Hello World!'
~~~
### 6. 总结
从上面的日志,我们就算是对RabbitMQ 的消息流有了一个基本的了解,如果你想更进一步,请进入到第二章-Work Queues的教程
本教程所有文章:
[RabbitMQ入门教程 For Java【1】 - Hello World](http://blog.csdn.net/chwshuang/article/details/50521708) - 你好世界!
[RabbitMQ入门教程 For Java【2】 - Work Queues](http://blog.csdn.net/chwshuang/article/details/50506284) - 工作队列
[RabbitMQ入门教程 For Java【3】 - Publish/Subscribe](http://blog.csdn.net/chwshuang/article/details/50512057) - 发布/订阅
[RabbitMQ入门教程 For Java【4】 - Routing](http://blog.csdn.net/chwshuang/article/details/50505060) - 消息路由
[RabbitMQ入门教程 For Java【5】 - Topic](http://blog.csdn.net/chwshuang/article/details/50516904) - 模糊匹配
[RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC)](http://blog.csdn.net/chwshuang/article/details/50518570) - 远程调用
### 提示
由于本教程中rabbitmq是在本机安装,使用的是默认端口(5672)。
如果你的例子运行中的主机、端口不同,请进行必要设置,否则可能无法运行。
如果你对rabbitmq有更多想法,可以通过GitHub项目成员找到我们的邮件地址联系他们:https://github.com/orgs/rabbitmq/people
【6】 – Remote procedure call (RPC)
最后更新于:2022-04-01 15:58:53
RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC)
### **我的开发环境:**
操作系统:**Windows7 64bit**
开发环境:**JDK 1.7 - 1.7.0_55**
开发工具:**Eclipse Kepler SR2**
RabbitMQ版本: **3.6.0**
Elang版本:**erl7.2.1**
关于Windows7下安装RabbitMQ的教程请先在网上找一下,有空我再补安装教程。
### 源码地址
https://github.com/chwshuang/rabbitmq.git
本教程中,我们将学习使用工作队列让多个消费者端来执行耗时的任务。比如我们需要通过远程服务器帮我们计算某个结果。这种模式通常被称之为远程方法调用或RPC.
我们通过RabbitMQ搭建一个RPC系统,一个客户端和一个RPC服务器,客户端有一个斐波那契数列方面的问题需要解决(Fibonacci numbers),RPC服务器负责技术收到这个消息,然后计算结果,并且返回这个斐波那契数列。
### 客户端接口
我们需要创建一个简单的客户端类,通过调用客户端的call方法,来计算结果。
~~~
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
~~~
远程方法调用的注意事项:
RPC在软件开发中非常常见,也经常被批评。当一个程序员对代码不熟悉的时候,跟踪RPC的性能问题是出在本地还是远程服务器就非常麻烦,对于RPC的使用,有几点需要特别说明:
- 使用远程调用时的本地函数最好独立出来
- 保证代码组件之间的依赖关系清晰明了,并用日志记录不同的执行过程和时间
- 发生客户端运行缓慢或者假死时,先确认RPC服务器是否还活着!
- 尽量使用异步队列来处理RPC请求,尽量不要用同步阻塞的方式运行RPC请求
### 回调队列
在RabbitMQ的RPC中,客户端发送请求后,还需要得到一个响应结果,我们需要像下面这样,在发送请求时,带上一个回调队列:
~~~
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
~~~
上面代码中,我们需要引入一个新的类
~~~
import com.rabbitmq.client.AMQP.BasicProperties;
~~~
### 消息属性
传输一条消息,AMQP协议预定义了14个属性,下面几个是使用比较频繁的几个属性:
- deliveryMode:配置一个消息是否持久化。(2表示持久化)这个在第二章中有说明。
- contentType :用来描述编码的MIME类型。与html的MIME类型类似,例如,经常使用JSON编码是将此属性设置为一个很好的做法:application/json。
- replyTo : 回调队列的名称。
- correlationId:RPC响应请求的相关编号。这个在下一节讲。
### 关联编号 Correlation Id
如果一个客户端有很多的计算任务,按照上面的代码,我们会为每个任务创建一个请求,然后等待返回的结果,这种方法貌似很耗时,如果把所有的任务都放到同一个连接中,那么我们又没法分辨出返回的结果是那个任务的?为了解决这个问题,RabbitMQ提供了一个correlationid属性来解决这个问题。RabbitMQ为每个请求提供唯一的编号,然后在返回队列里如果看到了这个编号,就知道我们的任务处理完成了,如果收到的编号不认识,就可以安全的忽略。
你可能会疑问,如果忽略了,那么想知道这个返回结果的客户端是不是就收不到这个结果了?这个基本上不会出现,但是,理论上也可能发生,例如一个RPC服务器,在发送确认消息前挂了,你收到的消息可能就是不完整的。这种情况,RabbitMQ会重新发送任务处理请求。这也是为什么客户端必须处理这些重复请求以及RPC启用幂次模式。
### 总结:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d847aeb.jpg)
RPC工作方式:
1. 当客户端启动时,会创建一个匿名的回调队列
1. 在RPC请求中,定义了两个属性:replyTo,表示回调队列的名称; correlationId,表示请求任务的唯一编号,用来区分不同请求的返回结果。
1. 将请求发送到rpc_queue队列中
1. RPC服务器等待rpc_queue队列的请求,如果有消息,就处理,它将计算结果发送到请求中的回调队列里。
1. 客户端监听回调队列中的消息,如果有返回消息,它根据回调消息中的correlationid进行匹配计算结果。
### 工程代码
**计算斐波那契数列的方法**
~~~
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
~~~
这个方法只是用来讲解我们的教程,你可别拿它在生产环境跑大数据!下面是客户端的代码
### 服务器端代码:
RPCServer.java
第一步仍然是建立连接、频道和声明队列。
如果我们运行多个RPC服务器,为了达到负载均衡,需要通过channel.basicQos来设置从队列中预取消息的个数。
我们通过basicConsume 访问队列,如果后消息任务来了,我们就开始工作,并将结果发送到回调队列中。
~~~
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0)
return 0;
if (n == 1)
return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println("RPCServer [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println("RPCServer [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
~~~
### 客户端代码
RPCClient.java
~~~
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println("RPCClient [x] Requesting fib(30)");
response = fibonacciRpc.call("30");
System.out.println("RPCClient [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}
~~~
在客户端,我们也建立一个连接和通道,并声明一个专用的“回调”队列
我们设置调队列中的唯一编号和回调队列名称
然后我们发送任务消息到RPC服务器
接下来循环监听回调队列中的每一个消息,找到与我们刚才发送任务消息编号相同的消息
### 总结:
这里的例子只是RabbitMQ中RPC服务的一个实现,你也可以根据业务需要实现更多。
rpc有一个优点,如果一个RPC服务器处理不来,可以再增加一个、两个、三个。
我们的例子中的代码还比较简单,还有很多问题没有解决:
如果没有发现服务器,客户端如何处理?
如果客户端的RPC请求超时了怎么办?
如果服务器出现了故障,发生了异常,是否将异常发送到客户端?
在处理消息前,怎样防止无效的消息?检查范围、类型?
如果你想还想继续了解RabbitMQ,你可以在RabbitMQ中安装管理插件,然后查看消息队列。
本教程所有文章:
[RabbitMQ入门教程 For Java【1】 - Hello World](http://blog.csdn.net/chwshuang/article/details/50521708) - 你好世界!
[RabbitMQ入门教程 For Java【2】 - Work Queues](http://blog.csdn.net/chwshuang/article/details/50506284) - 工作队列
[RabbitMQ入门教程 For Java【3】 - Publish/Subscribe](http://blog.csdn.net/chwshuang/article/details/50512057) - 发布/订阅
[RabbitMQ入门教程 For Java【4】 - Routing](http://blog.csdn.net/chwshuang/article/details/50505060) - 消息路由
[RabbitMQ入门教程 For Java【5】 - Topic](http://blog.csdn.net/chwshuang/article/details/50516904) - 模糊匹配
[RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC)](http://blog.csdn.net/chwshuang/article/details/50518570) - 远程调用
【5】 – Topic
最后更新于:2022-04-01 15:58:51
RabbitMQ入门教程 For Java【5】 - Topic
### **我的开发环境:**
操作系统:**Windows7 64bit**
开发环境:**JDK 1.7 - 1.7.0_55**
开发工具:**Eclipse Kepler SR2**
RabbitMQ版本: **3.6.0**
Elang版本:**erl7.2.1**
关于Windows7下安装RabbitMQ的教程请先在网上找一下,有空我再补安装教程。
### 源码地址
https://github.com/chwshuang/rabbitmq.git
# Topic模式
匹配模式,如果按照百度翻译和百度百科,直接叫主题或者话题就得了,但是如果你真的明白它在RabbitMQ中代表什么,就不能这么直接的翻译成中文了。如果要用中文理解它的意思,先了解它在RabbitMQ中用来做什么:topic类型的交换器允许在RabbitMQ中使用模糊匹配来绑定自己感兴趣的信息。
所以,我觉得这一章应该叫macth模式更合适,中文 - 匹配模式。
在上一章,通过直连交换器,生产者发送不同路由关键字的日志,消费者端通过绑定自己感兴趣的路由关键字来接收消息,进行完善日志系统。如果我想只接收生产者com.test.rabbitmq.topic包下的日志,其他包的忽略掉,之前的日志系统处理起来可能就非常麻烦,还好,我们有匹配模式,现在我们将生产者发送过来的消息按照包名来命名,那么消费者端就可以在匹配模式下使用【#.topic.*】这个路由关键字来获得感兴趣的消息。
### 匹配交换器
通过匹配交换器,我们可以配置更灵活的消息系统,你可以在匹配交换器模式下发送这样的路由关键字:
“a.b.c”、“c.d”、“quick.orange.rabbit”
不过一定要记住,路由关键字【routingKey】不能超过255个字节(bytes)
匹配交换器的匹配符
- *(星号)表示一个单词
- #(井号)表示零个或者多个单词
### **示例说明:**
这一章的例子中,我们使用三个段式的路由关键字,有三个单词和两个点组成。第一个词是速度,第二个词是颜色,第三个是动物名称。
我们用三个关键字来绑定,Q1绑定关键字是【*.orange.*】,Q2绑定关键字是【*.*.rabbit】和【lazy.#】,然后分析会发生什么:
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d8297a5.jpg)
- Q1会收到所有orange这种颜色相关的消息
- Q2会收到所有rabbit这个动物相关的消息和所有速度lazy的动物的消息
### **分析:**
生产者发送“quick.orange.rabbit”的消息,两个队列都会收到
生产者发送“lazy.orange.elephant”,两队列也都会收到。
生产者发送"quick.orange.fox",那么只有Q1会收到。
生产者发送"lazy.brown.fox",那么只会有Q2能收到。
生产者发送"quick.brown.fox",那么这条消息会被丢弃,谁也收不到。
生产者发送"quick.orange.male.rabbit",这个消息也会被丢弃,谁也收不到。
生产者发送"lazy.orange.male.rabbit",这个消息会被Q2的【lazy.#】规则匹配上,发送到Q2队列中。
### 注意
交换器在匹配模式下:
如果消费者端的路由关键字只使用【#】来匹配消息,在匹配【topic】模式下,它会变成一个分发【fanout】模式,接收所有消息。
如果消费者端的路由关键字中没有【#】或者【*】,它就变成直连【direct】模式来工作。
### 测试代码
### 包图
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d793b2e.jpg)
### 代码
ReceiveLogsTopic1.java
~~~
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个匹配模式的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
// 路由关键字
String[] routingKeys = new String[]{"*.orange.*"};
// 绑定路由关键字
for (String bindingKey : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
System.out.println("ReceiveLogsTopic1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
}
System.out.println("ReceiveLogsTopic1 [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsTopic1 [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
~~~
ReceiveLogsTopic2.java
~~~
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsTopic2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个匹配模式的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
// 路由关键字
String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
// 绑定路由关键字
for (String bindingKey : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
}
System.out.println("ReceiveLogsTopic2 [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("ReceiveLogsTopic2 [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
~~~
TopicSend.java
~~~
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class TopicSend {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
// 声明一个匹配模式的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 待发送的消息
String[] routingKeys = new String[]{"quick.orange.rabbit",
"lazy.orange.elephant",
"quick.orange.fox",
"lazy.brown.fox",
"quick.brown.fox",
"quick.orange.male.rabbit",
"lazy.orange.male.rabbit"};
// 发送消息
for(String severity :routingKeys){
String message = "From "+severity+" routingKey' s message!";
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println("TopicSend [x] Sent '" + severity + "':'" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
~~~
先运行ReceiveLogsTopic1.java
~~~
ReceiveLogsTopic1 [*] Waiting for messages. To exit press CTRL+C
~~~
再运行ReceiveLogsTopic2
~~~
ReceiveLogsTopic2 exchange:topic_logs, queue:amq.gen-JwqUJNUGpdFGkeY5B6TsLw, BindRoutingKey:*.*.rabbit
ReceiveLogsTopic2 exchange:topic_logs, queue:amq.gen-JwqUJNUGpdFGkeY5B6TsLw, BindRoutingKey:lazy.#
ReceiveLogsTopic2 [*] Waiting for messages. To exit press CTRL+C
~~~
然后运行TopicSend.java发送7条消息
~~~
TopicSend [x] Sent 'quick.orange.rabbit':'From quick.orange.rabbit routingKey' s message!'
TopicSend [x] Sent 'lazy.orange.elephant':'From lazy.orange.elephant routingKey' s message!'
TopicSend [x] Sent 'quick.orange.fox':'From quick.orange.fox routingKey' s message!'
TopicSend [x] Sent 'lazy.brown.fox':'From lazy.brown.fox routingKey' s message!'
TopicSend [x] Sent 'quick.brown.fox':'From quick.brown.fox routingKey' s message!'
TopicSend [x] Sent 'quick.orange.male.rabbit':'From quick.orange.male.rabbit routingKey' s message!'
TopicSend [x] Sent 'lazy.orange.male.rabbit':'From lazy.orange.male.rabbit routingKey' s message!'
~~~
再看ReceiveLogsTopic1.java,收到3条匹配的消息。
~~~
ReceiveLogsTopic1 [x] Received 'quick.orange.rabbit':'From quick.orange.rabbit routingKey' s message!'
ReceiveLogsTopic1 [x] Received 'lazy.orange.elephant':'From lazy.orange.elephant routingKey' s message!'
ReceiveLogsTopic1 [x] Received 'quick.orange.fox':'From quick.orange.fox routingKey' s message!'
~~~
再看ReceiveLogsTopic2.java,收到4条匹配的消息。
~~~
ReceiveLogsTopic2 [x] Received 'quick.orange.rabbit':'From quick.orange.rabbit routingKey' s message!'
ReceiveLogsTopic2 [x] Received 'lazy.orange.elephant':'From lazy.orange.elephant routingKey' s message!'
ReceiveLogsTopic2 [x] Received 'lazy.brown.fox':'From lazy.brown.fox routingKey' s message!'
ReceiveLogsTopic2 [x] Received 'lazy.orange.male.rabbit':'From lazy.orange.male.rabbit routingKey' s message!'
~~~
最后,咱们来开动脑筋看看下面的题,当是我留的课外作业:
1. 在匹配交互器模式下,消费者端路由关键字 “*” 能接收到生产者端发来路由关键字为空的消息吗?
1. 在匹配交换器模式下,消费者端路由关键字“#.*”能接收到生产者端以“..”为关键字的消息吗?如果发送来的消息只有一个单词,能匹配上吗?
1. “a.*.#” 与 “a.#” 有什么不同吗?
如果你的课外作业做完,并且自己动手验证过,OK,进入第六章-远程方法调用吧!
本教程所有文章:
[RabbitMQ入门教程 For Java【1】 - Hello World](http://blog.csdn.net/chwshuang/article/details/50521708) - 你好世界!
[RabbitMQ入门教程 For Java【2】 - Work Queues](http://blog.csdn.net/chwshuang/article/details/50506284) - 工作队列
[RabbitMQ入门教程 For Java【3】 - Publish/Subscribe](http://blog.csdn.net/chwshuang/article/details/50512057) - 发布/订阅
[RabbitMQ入门教程 For Java【4】 - Routing](http://blog.csdn.net/chwshuang/article/details/50505060) - 消息路由
[RabbitMQ入门教程 For Java【5】 - Topic](http://blog.csdn.net/chwshuang/article/details/50516904) - 模糊匹配
[RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC)](http://blog.csdn.net/chwshuang/article/details/50518570) - 远程调用
### 提示
由于本教程中rabbitmq是在本机安装,使用的是默认端口(5672)。
如果你的例子运行中的主机、端口不同,请进行必要设置,否则可能无法运行。
### 获得帮助
如果你阅读这个教程有障碍,可以通过GitHub项目成员找到开发者的邮件地址联系他们。
~~~
https://github.com/orgs/rabbitmq/people
~~~
【3】 – Publish/Subscribe
最后更新于:2022-04-01 15:58:48
# RabbitMQ入门教程 For Java【3】 - Publish/Subscribe
我的开发环境:
操作系统: Windows7 64bit
开发环境: JDK 1.7 - 1.7.0_55
开发工具: Eclipse Kepler SR2
RabbitMQ版本: 3.6.0
Elang版本: erl7.2.1
关于Windows7下安装RabbitMQ的教程请先在网上找一下,有空我再补安装教程。
源码地址
[https://github.com/chwshuang/rabbitmq.git](https://github.com/chwshuang/rabbitmq.git)
~~~
在上一章中,我们学习创建了一个消息队列,她的每个任务消息只发送给一个工人。这一章,我们会将同一个任务消息发送给多个工人。这种模式就是“发布/订阅”。
~~~
为了说明这种模式,我们将以一个日志系统进行讲解:一个日志发送者,两个日志接收者,接收者1可以把这条日志写入到磁盘上,另外一个接收者2可以将这条日志打印到控制台中。
“发布/订阅”模式的基础是将消息广播到所有的接收器上。
### 交换器
在之前的教程中,我们都是直接在消息队列中进行发送和接收消息,现在开始要介绍RabbitMQ完整的消息模型了。
首先,我们先来回顾一下之前学到关于RabbitMQ的内容:
- 生产者是发送消息的应用程序
- 队列是存储消息的缓冲区
- 消费者是接收消息的应用程序
实际上,RabbitMQ中消息传递模型的核心思想是:生产者不直接发送消息到队列。实际的运行环境中,生产者是不知道消息会发送到那个队列上,她只会将消息发送到一个交换器,交换器也像一个生产线,她一边接收生产者发来的消息,另外一边则根据交换规则,将消息放到队列中。交换器必须知道她所接收的消息是什么?它应该被放到那个队列中?它应该被添加到多个队列吗?还是应该丢弃?这些规则都是按照交换器的规则来确定的。
![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-18_56c53cbd5232a.jpg "")
#### **交换器的规则有:**
- direct (直连)
- topic (主题)
- headers (标题)
- fanout (分发)也有翻译为扇出的。
我们将使用【fanout】类型创建一个名称为 logs的交换器,
~~~
channel.exchangeDeclare("logs", "fanout");
~~~
分发交换器很简单,你通过名称也能想到,她是广播所有的消息,
> 交换器列表
通过rabbitmqctl list_exchanges指令列出服务器上所有可用的交换器
~~~
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.
~~~
> 这个列表里面所有以【amq.*】开头的交换器都是RabbitMQ默认创建的。在生产环境中,可以自己定义。
> **匿名交换器**
在之前的教程中,我们知道,发送消息到队列时根本没有使用交换器,但是消息也能发送到队列。这是因为RabbitMQ选择了一个空“”字符串的默认交换器。
来看看我们之前的代码:
~~~
channel.basicPublish("", "hello", null, message.getBytes());
~~~
> 第一个参数就是交换器的名称。如果输入“”空字符串,表示使用默认的匿名交换器。
第二个参数是【routingKey】路由线索
**匿名交换器规则:**
发送到routingKey名称对应的队列。
现在,我们可以发送消息到交换器中:
~~~
channel.basicPublish( "logs", "", null, message.getBytes());
~~~
### 临时队列
记得前两章中使用的队列指定的名称吗?(Hello World和task_queue).
如果要在生产者和消费者之间创建一个新的队列,又不想使用原来的队列,临时队列就是为这个场景而生的:
1. 首先,每当我们连接到RabbitMQ,我们需要一个新的空队列,我们可以用一个随机名称来创建,或者说让服务器选择一个随机队列名称给我们。
1.
一旦我们断开消费者,队列应该立即被删除。
在Java客户端,提供queuedeclare()为我们创建一个非持久化、独立、自动删除的队列名称。
~~~
String queueName = channel.queueDeclare().getQueue();
~~~
通过上面的代码就能获取到一个随机队列名称。
例如:它可能是:amq.gen-jzty20brgko-hjmujj0wlg。
### 绑定
![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-18_56c53cbd60cec.jpg "")
如果我们已经创建了一个分发交换器和队列,现在我们就可以就将我们的队列跟交换器进行绑定。
~~~
channel.queueBind(queueName, "logs", "");
~~~
执行完这段代码后,日志交换器会将消息添加到我们的队列中。
> **绑定列表**
如果要查看绑定列表,可以执行【rabbitmqctl list_bindings】命令
### 全部代码
![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-18_56c53cbd74d47.jpg "")
### 目录
![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d7b6df7.jpg "")
生产者程序,他负责发送日志消息,与之前不同的是它不是将消息发送到匿名交换器中,而是发送到一个名为【logs】的交换器中。我们提供一个空字符串的routingkey,它的功能被交换器的分发类型代替了。下面是EmitLog.java的代码:
~~~
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 分发消息
for(int i = 0 ; i < 5; i++){
String message = "Hello World! " + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
}
~~~
上面的代码中,在建立连接后,我们声明了一个交互。如果当前没有队列被绑定到交换器,消息将被丢弃,因为没有消费者监听,这条消息将被丢弃。
下面的代码是接收日志ReceiveLogs1.java 和ReceiveLogs2.java:
~~~
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogs1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
~~~
~~~
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogs1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
~~~
### 运行
先运行ReceiveLogs1和ReceiveLogs2可以看到日志:
~~~
[*] Waiting for messages. To exit press CTRL+C
~~~
然后运行EmitLog:
~~~
EmitLog日志:
[x] Sent 'Hello World! 0'
[x] Sent 'Hello World! 1'
[x] Sent 'Hello World! 2'
[x] Sent 'Hello World! 3'
[x] Sent 'Hello World! 4'
ReceiveLogs1和ReceiveLogs2日志
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World! 0'
[x] Received 'Hello World! 1'
[x] Received 'Hello World! 2'
[x] Received 'Hello World! 3'
[x] Received 'Hello World! 4'
~~~
看到这里,说明我们的程序运行正常,消费者通过声明【logs】交换器和【fanout】类型,接收到了来自【logs】交换器的所有消息。
使用【rabbitmqctl list_bindings】命令可以看到两个临时队列的名称
~~~
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.
~~~
以上就是这一章讲的发布/订阅模式,下一章将介绍消息路由(Routing)
本教程所有文章:
[RabbitMQ入门教程 For Java【1】 - Hello World - 你好世界!](http://blog.csdn.net/chwshuang/article/details/50521708)
[RabbitMQ入门教程 For Java【2】 - Work Queues - 工作队列](http://blog.csdn.net/chwshuang/article/details/50506284)
[RabbitMQ入门教程 For Java【3】 - Publish/Subscribe - 发布/订阅](http://blog.csdn.net/chwshuang/article/details/50512057)
[RabbitMQ入门教程 For Java【4】 - Routing - 消息路由](http://blog.csdn.net/chwshuang/article/details/50505060)
[RabbitMQ入门教程 For Java【5】 - Topic - 模糊匹配](http://blog.csdn.net/chwshuang/article/details/50516904)
[RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC) - 远程调用](http://blog.csdn.net/chwshuang/article/details/50518570)
【2】 – Work Queues
最后更新于:2022-04-01 15:58:46
RabbitMQ入门教程 For Java【2】 - Work Queues
提示:
我的开发环境:
操作系统: Windows7 64bit
开发环境: JDK 1.7 - 1.7.0_55
开发工具: Eclipse Kepler SR2
RabbitMQ版本: 3.6.0
Elang版本: erl7.2.1
关于Windows7下安装RabbitMQ的教程请先在网上找一下,有空我再补安装教程。
源码地址
[https://github.com/chwshuang/rabbitmq.git](https://github.com/chwshuang/rabbitmq.git)
# 工作队列
![工作队列](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-18_56c53cbccaaab.jpg "")
> 在使用此教程的时候,请记住,一定要将所有内容看一遍,特别是代码片段说明部分,这是非常重要的基础,如果你跳过这里直接将最后的源码拿去运行查看,效果会大打折扣。基础不牢固的情况下,后面学习就更难了。学习一定要静下心、琢磨透。
在第一个教程中,我们编写了一个程序来发送和接收来自一个指定队列的消息。在这一篇,我们将创建一个工作队列,将信息发送到多个消费者。这中分配方式主要场景是消费者需要根据消息中的内容进行业务逻辑处理,这种消息可以看成是一个任务指令,处理起来比较耗时,通过多个消费者来处理这些消息,来提高数据的吞吐能力。
工作队列(即任务队列)的主要思想是不用一直等待资源密集型的任务处理完成,这就像一个生产线,将半成品放到生产线中,然后在生产线后面安排多个工人同时对半成品进行处理,这样比一个生产线对应一个工人的吞吐量大几个数量级。
准备
在第一篇教程中,我们通过Hello World的例子,从生产者发送一条消息到RabbitMQ,然后消费者接收到这条消息并打印出来。这次我们模拟一个工厂流水线的场景,由工厂任务安排者(生产者P)向流水线(RabbitMQ的队列hello)放入半成品,然后由多个工人(消费者C1和C2)从流水线获取半成品进行处理。
我们先来看看工厂任务安排者的代码,我们一共发送5条消息,然后给每个消息编号,看看消费者分别收到了那些消息:
~~~
for(int i = 0 ; i < 5; i++){
String message = "Hello World! " + i;
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
~~~
再来看看流水线上的工人处理半成品的函数,我们使用线程休眠模拟工作处理一条消息花费1秒钟:
~~~
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暂停1秒钟
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
~~~
### 源码
目录结构:
![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d7b6df7.jpg "")
工厂任务安排者(生产者P)NewTask.java:
~~~
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* @author hushuang
*
*/
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws java.io.IOException, Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 分发消息
for(int i = 0 ; i < 5; i++){
String message = "Hello World! " + i;
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
}
~~~
工人(消费者C1和C2)Worker1.java
~~~
import java.io.IOException;
public class Worker1 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("Worker1 [*] Waiting for messages. To exit press CTRL+C");
// 每次从队列中获取数量
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Worker1 [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println("Worker1 [x] Done");
// 消息处理完成确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 消息消费完成确认
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暂停1秒钟
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
~~~
工人(消费者C1和C2)Worker2.java
~~~
import com.rabbitmq.client.*;
import java.io.IOException;
public class Worker2 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("Worker2 [*] Waiting for messages. To exit press CTRL+C");
// 每次从队列中获取数量
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Worker2 [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println("Worker2 [x] Done");
// 消息处理完成确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 消息消费完成确认
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
/**
* 任务处理
*
* @param task
* void
*/
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暂停1秒钟
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
~~~
### 消息轮询分发
#### **启动RabbitMQ 服务器**
在RabbitMQ Server\rabbitmq_server-3.6.0\sbin目录中,我们双击rabbitmq-server.bat,启动RabbitMQ ,Window下会弹出一个窗口,看到下面Starting broker…的信息就说明启动成功了:(关于RabbitMQ 在Windows7下的安装参考这里)
![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d7cac0d.jpg "")
#### **启动工人(消费者)**
然后在eclipse中,启动Worker1.java 和Worker2.java,可以看到Worker的启动日志:
![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d7dea0b.jpg "")
~~~
Worker1 [*] Waiting for messages. To exit press CTRL+C
Worker2 [*] Waiting for messages. To exit press CTRL+C
~~~
#### **启动工厂任务安排者(生产者)**
生产者启动后打印的日志:
~~~
[x] Sent 'Hello World! 0'
[x] Sent 'Hello World! 1'
[x] Sent 'Hello World! 2'
[x] Sent 'Hello World! 3'
[x] Sent 'Hello World! 4'
~~~
Worker1日志输入结果:
~~~
Worker1 [x] Received 'Hello World! 1'
Worker1 [x] Done
Worker1 [x] Received 'Hello World! 3'
Worker1 [x] Done
Worker1 [x] Received 'Hello World! 4'
Worker1 [x] Done
~~~
Worker2日志输入结果:
~~~
Worker2 [x] Received 'Hello World! 0'
Worker2 [x] Done
Worker2 [x] Received 'Hello World! 2'
Worker2 [x] Done
~~~
### 消息确认
如果处理一条消息需要几秒钟的时间,你可能会想,如果在处理消息的过程中,消费者服务器、网络、网卡出现故障挂了,那可能这条正在处理的消息或者任务就没有完成,就会失去这个消息和任务。
为了确保消息或者任务不会丢失,RabbitMQ支持消息确认–ACK。ACK机制是消费者端从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。如果一个消费者在处理消息时挂掉(网络不稳定、服务器异常、网站故障等原因导致频道、连接关闭或者TCP连接丢失等),那么他就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将此消息重新放入队列中。如果有其他消费者同时在线,RabbitMQ会立即将这个消息推送给这个在线的消费者。这种机制保证了在消费者服务器故障的时候,能不丢失任何消息和任务。
如果RabbitMQ向消费者发送消息时,消费者服务器挂了,消息也不会有超时;即使一个消息需要非常长的时间处理,也不会导致消息超时。这样消息永远不会从RabbitMQ服务器中删除。只有当消费者正确的发送ACK确认反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
消息的ACK确认机制默认是打开的。在上面的代码中,我们显示返回autoAck=true 这个标签。
看看下面的代码,即使你在发送消息过程中,停掉一个消费者,消费者没有通过ACK反馈确认的消息,很快会被退回。
~~~
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
~~~
> 忘记确认
忘记通过basicAck返回确认信息是常见的错误。这个错误非常严重,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息。
如果要监控这种错误,可以使用rabbitmqctl messages_unacknowledged命令打印出出相关的信息。
~~~
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
~~~
### 消息持久化
通过上一节我们已经知道如何确保消费者挂掉的情况下,任务不会消失。但是如果RabbitMQ服务器挂了呢?
如果你不告诉RabbitMQ,当RabbitMQ服务器挂了,她可能就丢失所有队列中的消息和任务。如果你想让RabbitMQ记住她当前的状态和内容,就需要通过2件事来确保消息和任务不会丢失。
第一件事,在队列声明时,告诉RabbitMQ,这个队列需要持久化:
~~~
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
~~~
上面的这个方法是正确的,当在我们的例子中也无法持久化!因为已经定义的队列,再次定义是无效的,这就是幂次原理。RabbitMQ不允许重新定义一个已有的队列信息,也就是说不允许修改已经存在的队列的参数。如果你非要这样做,只会返回异常。
咋整?
一个快速有效的方法就是重新声明另一个名称的队列,不过这需要修改生产者和消费者的代码,所以,在开发时,最好是将队列名称放到配置文件中。
这时,即使RabbitMQ服务器重启,新队列中的消息也不会丢失。
下面我们来看看新消息发送的代码:
~~~
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
~~~
> 关于消息持久化的说明
标记为持久化后的消息也不能完全保证不会丢失。虽然已经告诉RabbitMQ消息要保存到磁盘上,但是理论上,RabbitMQ已经接收到生产者的消息,但是还没有来得及保存到磁盘上,服务器就挂了(比如机房断电),那么重启后,RabbitMQ中的这条未及时保存的消息就会丢失。因为RabbitMQ不做实时立即的磁盘同步(fsync)。这种情况下,对于持久化要求不是特别高的简单任务队列来说,还是可以满足的。如果需要更强大的保证,那么你可以考虑使用生产者确认反馈机制。
### 负载均衡
默认情况下,RabbitMQ将队列消息随机分配给每个消费者,这时可能出现消息调度不均衡的问题。例如有两台消费者服务器,一个服务器可能非常繁忙,消息不断,另外一个却很悠闲,没有什么负载。RabbitMQ不会主动介入这些情况,还是会随机调度消息到每台服务器。
这是因为RabbitMQ此时只负责调度消息,不会根据ACK的反馈机制来分析那台服务器返回反馈慢,是不是处理不过来啊?
就像下面这个图:
![这里写图片描述](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-02-18_56c53cbd12e52.jpg "")
为了解决这个问题,我们可以使用【prefetchcount = 1】这个设置。这个设置告诉RabbitMQ,不要一次将多个消息发送给一个消费者。这样做的好处是只有当消费者处理完成当前消息并反馈后,才会收到另外一条消息或任务。这样就避免了负载不均衡的事情了。
~~~
int prefetchCount = 1;
channel.basicQos(prefetchCount);
~~~
> 关于队列大小的说明
你必选注意:如果所有的消费者负载都很高,你的队列很可能会被塞满。这时你需要增加更多的消费者或者其他方案。
想了解更多关于 Channel 方法和 MessageProperties 的信息,请浏览以下相关的文档:
[javadocs 在线文档.](http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/)
现在,咱们可以进入第三章的教程了。
本教程所有文章:
[RabbitMQ入门教程 For Java【1】 - Hello World - 你好世界!](http://blog.csdn.net/chwshuang/article/details/50521708)
[RabbitMQ入门教程 For Java【2】 - Work Queues - 工作队列](http://blog.csdn.net/chwshuang/article/details/50506284)
[RabbitMQ入门教程 For Java【3】 - Publish/Subscribe - 发布/订阅](http://blog.csdn.net/chwshuang/article/details/50512057)
[RabbitMQ入门教程 For Java【4】 - Routing - 消息路由](http://blog.csdn.net/chwshuang/article/details/50505060)
[RabbitMQ入门教程 For Java【5】 - Topic - 模糊匹配](http://blog.csdn.net/chwshuang/article/details/50516904)
[RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC) - 远程调用](http://blog.csdn.net/chwshuang/article/details/50518570)
### 提示
由于本教程中rabbitmq是在本机安装,使用的是默认端口(5672)。
如果你的例子运行中的主机、端口不同,请进行必要设置,否则可能无法运行。
### 获得帮助
如果你阅读这个教程有障碍,可以通过GitHub项目成员找到开发者的邮件地址联系他们。
~~~
https://github.com/orgs/rabbitmq/people
~~~
【4】 -Routing
最后更新于:2022-04-01 15:58:44
# RabbitMQ入门教程 For Java【4】 - [Routing](http://www.rabbitmq.com/tutorials/tutorial-four-python.html)
### **我的开发环境:**
操作系统:**Windows7 64bit**
开发环境:**JDK 1.7 - 1.7.0_55**
开发工具:**Eclipse Kepler SR2**
RabbitMQ版本: **3.6.0**
Elang版本:**erl7.2.1**
关于Windows7下安装RabbitMQ的教程请先在网上找一下,有空我再补安装教程。
### 源码地址
https://github.com/chwshuang/rabbitmq.git
### 消息路由
上一章教程中我们建立了一个简单的日志记录系统,能够将消息广播到多个消费者。本章,我们将添加一个新功能,类似订阅消息的子集。例如:我们只接收日志文件中ERROR类型的日志。
### 绑定关系
在之前的例子中也使用了类似的方式:
~~~
channel.queueBind(queueName, EXCHANGE_NAME, "");
~~~
绑定是交换器和队列之间的一种关系,用户微博,微信的例子可以简单的理解为关注,就是队列(某屌丝)对交换器(女神)非常感兴趣,关注了她,以后女神发的每条微博,屌丝都能看到。
绑定可以使用routingkey这个参数,是为了避免所有的消息都使用同一个路由线索带来的麻烦。为了区分路由规则,我们创建创建一个唯一的路由线索。
~~~
channel.queueBind(queueName, EXCHANGE_NAME, "black");
~~~
绑定关系中使用的路由关键字【routingkey】是否有效取决于交换器的类型。如果交换器是分发【fanout】类型,就会忽略路由关键字【routingkey】的作用。
### 直连类型交换器
上一章的例子是通过分发【fanout】类型的交换器【logs】广播日志信息,现在我们将日志分debug、info、warn、error这几种基本的级别,实际在生产环境中,避免磁盘空间浪费,应用只会将error级别的日志打印出来。而分发【fanout】类型的交换器会将所有基本的日志都发送出来,如果我们想只接收某一级别的日志信息,就需要使用直连【direct】类型的交换器了, 下面的图中,队列1通过ERROR这个routingkey绑定到E交换器,队列2通过WARN和INFO绑定到E交换器,E交换器的类型是直连【direct】的,如果生产者【P】发出ERROR的日志,只会有队列1会收到,如果生产者【P】发出INFO和WARN的日志,只有队列2会收到,如果生产者【P】发出DEBUG级别的日志,队列1和队列2都会忽略它。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d75d06e.jpg)
### 多重绑定
我们允许多个队列以相同的路由关键字绑定到同一个交换器中,可以看到,交换器虽然是直连类型,但是绑定后的效果却跟分发类型的交换器类似,相同的是队列1和队列2都会收到同一条来自交换器的消息。
他们的区别:分发模式下,队列1、队列2会收到所有级别(除ERROR级别以外)的消息,而直连模式下,他们仅仅只会收到ERROR关键字类型的消息。
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d770c51.jpg)
### 发送日志消息
我们还是用日志系统进行讲解,现在我们用日志的级别来作为路由关键字【routingkey】,这样,消费者端就可以按照他关心的日志级别进行接收,我们先看看如何发送日志:
先声明交换器
~~~
<span style="font-size: 18px;"> </span>channel.exchangeDeclare(EXCHANGE_NAME, "direct");
~~~
然后发送消息到交换器
~~~
for (String severity : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity);
}
~~~
### 订阅消息
我们先获取一个随机的队列名称,然后根据多个路由关键字【routingkey】将队列和交换器绑定起来:
~~~
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
~~~
### 项目说明
### 流程图
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d77fafc.jpg)
### 包图
![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-01_56d507d793b2e.jpg)
### 代码
RoutingSendDirect.java
~~~
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
/**
* @author hushuang
*
*/
public class RoutingSendDirect {
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"info" ,"warning", "error"};
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 发送消息
for(String severity :routingKeys){
String message = "Send the message level:" + severity;
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
channel.close();
connection.close();
}
}
~~~
ReceiveLogsDirect1.java和ReceiveLogsDirect2.java
~~~
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsDirect1 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"info" ,"warning", "error"};
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 获取匿名队列名称
String queueName = channel.queueDeclare().getQueue();
// 根据路由关键字进行多重绑定
for (String severity : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity);
}
System.out.println("ReceiveLogsDirect1 [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
~~~
~~~
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsDirect2 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKeys = new String[]{"error"};
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 获取匿名队列名称
String queueName = channel.queueDeclare().getQueue();
// 根据路由关键字进行多重绑定
for (String severity : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity);
}
System.out.println("ReceiveLogsDirect1 [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
~~~
先运行ReceiveLogsDirect1.java和ReceiveLogsDirect2.java
查看日志,我们发现RabbitMQ中已经创建了direct_logs的交换器,以及amq.gen-dVUpkqxmladY3Jg1upDsDQ 和amq.gen-skrmBAlYKSDzELKtVg_zFw这两个临时队列,
~~~
ReceiveLogsDirect1 exchange:direct_logs, queue:amq.gen-skrmBAlYKSDzELKtVg_zFw, BindRoutingKey:info
ReceiveLogsDirect1 exchange:direct_logs, queue:amq.gen-skrmBAlYKSDzELKtVg_zFw, BindRoutingKey:warning
ReceiveLogsDirect1 exchange:direct_logs, queue:amq.gen-skrmBAlYKSDzELKtVg_zFw, BindRoutingKey:error
ReceiveLogsDirect1 [*] Waiting for messages. To exit press CTRL+C
~~~
~~~
ReceiveLogsDirect2 exchange:direct_logs, queue:amq.gen-dVUpkqxmladY3Jg1upDsDQ, BindRoutingKey:error
ReceiveLogsDirect2 [*] Waiting for messages. To exit press CTRL+C
~~~
运行RoutingSendDirect.java发送消息:
### 运行结果
查看日志:
RoutingSendDirect.java
~~~
[x] Sent 'info':'Send the message level:info'
[x] Sent 'warning':'Send the message level:warning'
[x] Sent 'error':'Send the message level:error'
~~~
ReceiveLogsDirect1.java
~~~
[x] Received 'info':'Send the message level:info'
[x] Received 'warning':'Send the message level:warning'
[x] Received 'error':'Send the message level:error'
~~~
ReceiveLogsDirect2.java
~~~
[x] Received 'error':'Send the message level:error'
~~~
我们看到,队列1收到了所有的消息,队列2只收到了error级别的消息。这与我们的预期一样。
下一阶段我们可以进入第五章-主题的学习了。
本教程所有文章:
[RabbitMQ入门教程 For Java【1】 - Hello World](http://blog.csdn.net/chwshuang/article/details/50521708) - 你好世界!
[RabbitMQ入门教程 For Java【2】 - Work Queues](http://blog.csdn.net/chwshuang/article/details/50506284) - 工作队列
[RabbitMQ入门教程 For Java【3】 - Publish/Subscribe](http://blog.csdn.net/chwshuang/article/details/50512057) - 发布/订阅
[RabbitMQ入门教程 For Java【4】 - Routing](http://blog.csdn.net/chwshuang/article/details/50505060) - 消息路由
[RabbitMQ入门教程 For Java【5】 - Topic](http://blog.csdn.net/chwshuang/article/details/50516904) - 模糊匹配
[RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC)](http://blog.csdn.net/chwshuang/article/details/50518570) - 远程调用
前言
最后更新于:2022-04-01 15:58:42
> 原文出处:[RabbitMQ Java入门教程](http://blog.csdn.net/column/details/rabbitmq-for-java.html)
作者:[chwshuang](http://blog.csdn.net/chwshuang)
**本系列文章经作者授权在看云整理发布,未经作者允许,请勿转载!**
# RabbitMQ Java入门教程
> 讲解如何使用Java客户端使用RabbitMQ的功能,从Hello World开始,到工作队列、发布/订阅、消息路由、模糊匹配到远程方法调用,将RabbitMQ的所有基础进行讲解,教程简单,深入浅出,是Java开发人员了解RabbitMQ的不二之选。