第九章 消息和异步Stream

最后更新于:2022-03-31 23:46:13

# 消息驱动 微服务的目的: 松耦合 事件驱动的优势:高度解耦 Spring Cloud Stream 的几个概念 Spring Cloud Stream is a framework for building message-driven microservice applications. 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。 Spring Cloud Stream Application 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式 Binder Binder 是 Spring Cloud Stream 的一个抽象概念,是应用与消息中间件之间的粘合剂。目前 Spring Cloud Stream 实现了 Kafka 和 Rabbit MQ 的binder。 通过 binder ,可以很方便的连接中间件,可以动态的改变消息的 destinations(对应于 Kafka 的topic,Rabbit MQ 的 exchanges),这些都可以通过外部配置项来做到。 甚至可以任意的改变中间件的类型而不需要修改一行代码。 Publish-Subscribe 消息的发布(Publish)和订阅(Subscribe)是事件驱动的经典模式。Spring Cloud Stream 的数据交互也是基于这个思想。生产者把消息通过某个 topic 广播出去(Spring Cloud Stream 中的 destinations)。其他的微服务,通过订阅特定 topic 来获取广播出来的消息来触发业务的进行。 这种模式,极大的降低了生产者与消费者之间的耦合。即使有新的应用的引入,也不需要破坏当前系统的整体结构。 Consumer Groups “Group”,如果使用过 Kafka 的童鞋并不会陌生。Spring Cloud Stream 的这个分组概念的意思基本和 Kafka 一致。 微服务中动态的缩放同一个应用的数量以此来达到更高的处理能力是非常必须的。对于这种情况,同一个事件防止被重复消费,只要把这些应用放置于同一个 “group” 中,就能够保证消息只会被其中一个应用消费一次。 Durability 消息事件的持久化是必不可少的。Spring Cloud Stream 可以动态的选择一个消息队列是持久化,还是 present。 Bindings bindings 是我们通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binding 的配置来达到动态修改topic、exchange、type等一系列信息而不需要修改一行代码。 基于 RabbitMQ 使用 以下内容源码: spring cloud demo 消息接收 Spring Cloud Stream 基本用法,需要定义一个接口,如下是内置的一个接口。 ~~~ public interface Sink { String INPUT = "input"; @Input("input") SubscribableChannel input(); } ~~~ 注释\_\_ @Input\_\_ 对应的方法,需要返回 \_\_ SubscribableChannel \_\_ ,并且参入一个参数值。 这就接口声明了一个\_\_ binding \_\_命名为 “input” 。 其他内容通过配置指定: ~~~ spring: cloud: stream: bindings: input: destination: mqTestDefault ~~~ destination:指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 mqTestDefault ~~~ @SpringBootApplication @EnableBinding(Sink.class) public class Application { // 监听 binding 为 Sink.INPUT 的消息 @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("一般监听收到:" + message.getPayload()); } public static void main(String[] args) { SpringApplication.run(Application.class); } } ~~~ 定义一个 class (这里直接在启动类),并且添加注解@EnableBinding(Sink.class) ,其中 Sink 就是上述的接口。同时定义一个方法(此处是 input)标明注解为 \_\_ @StreamListener(Processor.INPUT) \_\_,方法参数为 Message 。 启动后,默认是会创建一个临时队列,临时队列绑定的exchange为 “mqTestDefault”,routing key为 “#”。 所有发送 exchange 为“mqTestDefault” 的MQ消息都会被投递到这个临时队列,并且触发上述的方法。 以上代码就完成了最基本的消费者部分。 消息发送 消息的发送同消息的接受,都需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口: ~~~ public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); } ~~~ 这就接口声明了一个 binding 命名为 “output” ,不同于上述的 “input”,这个binding 声明了一个消息输出流,也就是消息的生产者。 ~~~ spring: cloud: stream: bindings: output: destination: mqTestDefault contentType: text/plain ~~~ contentType:用于指定消息的类型。具体可以参考 spring cloud stream docs destination:指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 mqTestDefault 的所有消息队列中。 代码中调用: ~~~ @SpringBootApplication @EnableBinding(Source.class) public class Application implements CommandLineRunner { @Autowired @Qualifier("output") MessageChannel output; @Override public void run(String... strings) throws Exception { // 字符串类型发送MQ System.out.println("字符串信息发送"); output.send(MessageBuilder.withPayload("大家好").build()); } public static void main(String[] args) { SpringApplication.run(Application.class); } } ~~~ 通过注入MessageChannel的方式,发送消息。 通过注入Source 接口的方式,发送消息。 具体可以查看样例 以上代码就完成了最基本的生产者部分。 自定义消息发送接收 自定义接口 Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,而在我们实际使用中,往往是需要定义各种输入输出流。使用方法也很简单。 ~~~ interface OrderProcessor { String INPUT_ORDER = "inputOrder"; String OUTPUT_ORDER = "outputOrder"; @Input(INPUT_ORDER) SubscribableChannel inputOrder(); @Output(OUTPUT_ORDER) MessageChannel outputOrder(); } ~~~ 一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个订单输入,和订单输出两个 binding。 使用时,需要在 @EnableBinding 注解中,添加自定义的接口。 使用 @StreamListener 做监听的时候,需要指定 OrderProcessor.INPUT\_ORDER ~~~ spring: cloud: stream: defaultBinder: defaultRabbit bindings: inputOrder: destination: mqTestOrder outputOrder: destination: mqTestOrder ~~~ 如上配置,指定了 destination 为 mqTestOrder 的输入输出流。 分组与持久化 上述自定义的接口配置中,Spring Cloud Stream 会在 RabbitMQ 中创建一个临时的队列,程序关闭,对应的连接关闭的时候,该队列也会消失。而在实际使用中,我们需要一个持久化的队列,并且指定一个分组,用于保证应用服务的缩放。 只需要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.\[channelName\].group = XXX 。对应的队列就是持久化,并且名称为:mqTestOrder.XXX。 rabbitMQ routing key 绑定 用惯了 rabbitMQ 的童鞋,在使用的时候,发现 Spring Cloud Stream 的消息投递,默认是根据 destination + group 进行区分,所有的消息都投递到 routing key 为 “#‘’ 的消息队列里。 如果我们需要进一步根据 routing key 来进行区分消息投递的目的地,或者消息接受,需要进一步配,Spring Cloud Stream 也提供了相关配置: ~~~ spring: cloud: stream: bindings: inputProductAdd: destination: mqTestProduct group: addProductHandler # 拥有 group 默认会持久化队列 outputProductAdd: destination: mqTestProduct rabbit: bindings: inputProductAdd: consumer: bindingRoutingKey: addProduct.* # 用来绑定消费者的 routing key outputProductAdd: producer: routing-key-expression: '''addProduct.*''' # 需要用这个来指定 RoutingKey ~~~ spring.cloud.stream.rabbit.bindings.\[channelName\].consumer.bindingRoutingKey 指定了生成的消息队列的routing key spring.cloud.stream.rabbit.bindings.\[channelName\].producer.routing-key-expression 指定了生产者消息投递的routing key DLX 队列 DLX 作用 DLX:Dead-Letter-Exchange(死信队列)。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况: 消息被拒绝(basic.reject/ basic.nack)并且requeue=false 消息TTL过期(参考:RabbitMQ之TTL(Time-To-Live 过期时间)) 队列达到最大长度 DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理。 Spring Cloud Stream 中使用 ~~~ spring.cloud.stream.rabbit.bindings.[channelName].consumer.autoBindDlq=true spring.cloud.stream.rabbit.bindings.[channelName].consumer.republishToDlq=true ~~~ 配置说明,可以参考 spring cloud stream rabbitmq consumer properties 结论 Spring Cloud Stream 最大的方便之处,莫过于抽象了事件驱动的一些概念,对于消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件,切换topic。使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。 # [RabbitMq使用概况](http://localhost:8028/article/rabbitmq) 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。 \*\*消息队列(Message Queue)\*\*是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。 为何用消息队列 从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢? 以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。 以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。 RabbitMQ 特点 RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。 AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。 RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括: 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。 RabbitMQ 中的概念模型 消息模型 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。 消息流 RabbitMQ 基本概念 上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念需要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念: RabbitMQ 内部结构 Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。 Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 Connection 网络连接,比如一个TCP连接。 Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 Broker 表示消息队列服务器实体。 AMQP 中的消息路由 AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。![请输入图片描述](https://upload-images.jianshu.io/upload_images/5015984-7fd73af768f28704.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/484/format/webp) AMQP 的消息路由过程 Exchange 类型 Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型: direct ![请输入图片描述](https://upload-images.jianshu.io/upload_images/5015984-13db639d2c22f2aa.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/385/format/webp)direct 交换器 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。 fanout ![请输入图片描述](https://upload-images.jianshu.io/upload_images/5015984-2f509b7f34c47170.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/463/format/webp)fanout 交换器 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。 topic![请输入图片描述](https://upload-images.jianshu.io/upload_images/5015984-275ea009bdf806a0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/558/format/webp)topic 交换器 topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。 RabbitMQ 安装 一般来说安装 RabbitMQ 之前要安装 Erlang ,可以去Erlang官网下载。接着去RabbitMQ官网下载安装包,之后解压缩即可。根据操作系统不同官网提供了相应的安装说明:Windows、Debian / Ubuntu、RPM-based Linux、Mac 如果是Mac 用户,使用 HomeBrew 来安装,安装前要先更新 brew: brew update 接着安装 rabbitmq 服务器: brew install rabbitmq 这样 RabbitMQ 就安装好了,安装过程中会自动其所依赖的 Erlang 。 docker下启动安装rabbitmq: ~~~ docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.7.3-management ~~~ RabbitMQ 运行和管理 启动 启动很简单,找到安装后的 RabbitMQ 所在目录下的 sbin 目录,可以看到该目录下有6个以 rabbitmq 开头的可执行文件,直接执行 rabbitmq-server 即可,下面将 RabbitMQ 的安装位置以 . 代替,启动命令就是: ./sbin/rabbitmq-server 启动正常的话会看到一些启动过程信息和最后的 completed with 7 plugins,这也说明启动的时候默认加载了7个插件。 正常启动 后台启动 如果想让 RabbitMQ 以守护程序的方式在后台运行,可以在启动的时候加上 -detached 参数: ~~~ ./sbin/rabbitmq-server -detached ~~~ 查询服务器状态 sbin 目录下有个特别重要的文件叫 rabbitmqctl ,它提供了 RabbitMQ 管理需要的几乎一站式解决方案,绝大部分的运维命令它都可以提供。 查询 RabbitMQ 服务器的状态信息可以用参数 status : ~~~ ./sbin/rabbitmqctl status ~~~ 该命令将输出服务器的很多信息,比如 RabbitMQ 和 Erlang 的版本、OS 名称、内存等等 关闭 RabbitMQ 节点 我们知道 RabbitMQ 是用 Erlang 语言写的,在Erlang 中有两个概念:节点和应用程序。节点就是 Erlang 虚拟机的每个实例,而多个 Erlang 应用程序可以运行在同一个节点之上。节点之间可以进行本地通信(不管他们是不是运行在同一台服务器之上)。比如一个运行在节点A上的应用程序可以调用节点B上应用程序的方法,就好像调用本地函数一样。如果应用程序由于某些原因奔溃,Erlang 节点会自动尝试重启应用程序。 如果要关闭整个 RabbitMQ 节点可以用参数 stop : ~~~ ./sbin/rabbitmqctl stop ~~~ 它会和本地节点通信并指示其干净的关闭,也可以指定关闭不同的节点,包括远程节点,只需要传入参数 -n : ~~~ ./sbin/rabbitmqctl -n rabbit@server.example.com stop ~~~ \-n node 默认 node 名称是 rabbit@server ,如果你的主机名是 server.example.com ,那么 node 名称就是 rabbit@server.example.com 。 关闭 RabbitMQ 应用程序 如果只想关闭应用程序,同时保持 Erlang 节点运行则可以用 stop\_app: ~~~ ./sbin/rabbitmqctl stop_app ~~~ 这个命令在后面要讲的集群模式中将会很有用。 启动 RabbitMQ 应用程序 ~~~ ./sbin/rabbitmqctl start_app ~~~ 重置 RabbitMQ 节点 ~~~ ./sbin/rabbitmqctl reset ~~~ 该命令将清除所有的队列。 查看已声明的队列 ~~~ ./sbin/rabbitmqctl list_queues ~~~ 查看交换器 ~~~ ./sbin/rabbitmqctl list_exchanges ~~~ 该命令还可以附加参数,比如列出交换器的名称、类型、是否持久化、是否自动删除: ~~~ ./sbin/rabbitmqctl list_exchanges name type durable auto_delete ~~~ 查看绑定 ~~~ ./sbin/rabbitmqctl list_bindings ~~~ Java 客户端访问 RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。 maven工程的pom文件中添加依赖 ~~~ <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency> 消息生产者 package org.study.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //建立到代理服务器到连接 Connection conn = factory.newConnection(); //获得信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "hola"; //发布消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } } ~~~ 消息消费者 ~~~ package org.study.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //建立到代理服务器到连接 Connection conn = factory.newConnection(); //获得信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //声明队列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //绑定队列,通过键 hola 将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey); while(true) { //消费消息 boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); long deliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } } ~~~ 启动 RabbitMQ 服务器 ~~~ ./sbin/rabbitmq-server ~~~ 运行 Consumer 先运行 Consumer ,这样当生产者发送消息的时候能在消费者后端看到消息记录。 运行 Producer 接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息: Consumer 控制台 RabbitMQ 集群 RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是允许消费者和生产者在节点崩溃的情况下继续运行,以及通过添加更多的节点来线性扩展消息通信吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通信框架 OTP 来满足上述需求,使客户端在失去一个 RabbitMQ 节点连接的情况下,还是能够重新连接到集群中的任何其他节点继续生产、消费消息。 RabbitMQ 集群中的一些概念 RabbitMQ 会始终记录以下四种类型的内部元数据: 队列元数据 包括队列名称和它们的属性,比如是否可持久化,是否自动删除 交换器元数据 交换器名称、类型、属性 绑定元数据 内部是一张表格记录如何将消息路由到队列 vhost 元数据 为 vhost 内部的队列、交换器、绑定提供命名空间和安全属性 在单一节点中,RabbitMQ 会将所有这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储到硬盘上。存到硬盘上可以确保队列和交换器在节点重启后能够重建。而在集群模式下同样也提供两种选择:存到硬盘上(独立节点的默认设置),存在内存中。 如果在集群中创建队列,集群只会在单个节点而不是所有节点上创建完整的队列信息(元数据、状态、内容)。结果是只有队列的所有者节点知道有关队列的所有信息,因此当集群节点崩溃时,该节点的队列和绑定就消失了,并且任何匹配该队列的绑定的新消息也丢失了。还好RabbitMQ 2.6.0之后提供了镜像队列以避免集群节点故障导致的队列内容不可用。 RabbitMQ 集群中可以共享 user、vhost、exchange等,所有的数据和状态都是必须在所有节点上复制的,例外就是上面所说的消息队列。RabbitMQ 节点可以动态的加入到集群中。 当在集群中声明队列、交换器、绑定的时候,这些操作会直到所有集群节点都成功提交元数据变更后才返回。集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,但是它的执行比磁盘节点要好。内存节点可以提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用,那集群中如何平衡这两者呢? RabbitMQ 只要求集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入火离开集群时,它们必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的唯一磁盘节点崩溃的话,集群仍然可以运行,但知道该节点恢复,否则无法更改任何东西。 RabbitMQ 集群配置和启动 如果是在一台机器上同时启动多个 RabbitMQ 节点来组建集群的话,只用上面介绍的方式启动第二、第三个节点将会因为节点名称和端口冲突导致启动失败。所以在每次调用 rabbitmq-server 命令前,设置环境变量 RABBITMQ\_NODENAME 和 RABBITMQ\_NODE\_PORT 来明确指定唯一的节点名称和端口。下面的例子端口号从5672开始,每个新启动的节点都加1,节点也分别命名为test\_rabbit\_1、test\_rabbit\_2、test\_rabbit\_3。 启动第1个节点: ~~~ RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached ~~~ 启动第2个节点: ~~~ RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached ~~~ 启动第2个节点前建议将 RabbitMQ 默认激活的插件关掉,否则会存在使用了某个插件的端口号冲突,导致节点启动不成功。 现在第2个节点和第1个节点都是独立节点,它们并不知道其他节点的存在。集群中除第一个节点外后加入的节点需要获取集群中的元数据,所以要先停止 Erlang 节点上运行的 RabbitMQ 应用程序,并重置该节点元数据,再加入并且获取集群的元数据,最后重新启动 RabbitMQ 应用程序。 停止第2个节点的应用程序: ~~~ ./sbin/rabbitmqctl -n test_rabbit_2 stop_app ~~~ 重置第2个节点元数据: ~~~ ./sbin/rabbitmqctl -n test_rabbit_2 reset ~~~ 第2节点加入第1个节点组成的集群: ~~~ ./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost ~~~ 启动第2个节点的应用程序 ~~~ ./sbin/rabbitmqctl -n test_rabbit_2 start_app ~~~ 第3个节点的配置过程和第2个节点类似: ~~~ RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached ./sbin/rabbitmqctl -n test_rabbit_3 stop_app ./sbin/rabbitmqctl -n test_rabbit_3 reset ./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost ./sbin/rabbitmqctl -n test_rabbit_3 start_app ~~~ RabbitMQ 集群运维 停止某个指定的节点,比如停止第2个节点: ~~~ RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop ~~~ 查看节点3的集群状态: ~~~ ./sbin/rabbitmqctl -n test_rabbit_3 cluster_status ~~~
';