Fanout 分发模式 php 版

最后更新于:2022-04-02 04:02:11

[TOC] ## 概述 * 将消息推送到多个不同队列,如将日志发送给 log,屏幕输出 * fanout模式模式不像direct模式通过routingkey来进行匹配,而是会把消息发送到所以的已经绑定的队列中 ### code 发布消息
fanout.php ``` $exchangeName = 'log'; $message = 'log--'; $connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd')); $connection->connect() or die("Cannot connect to the broker!\n"); try { $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); for($i=0 ; $i<100;$i++){ $exchange->publish($message.$i,""); var_dump("[x] Sent $message $i"); } } catch (AMQPConnectionException $e) { var_dump($e); exit(); } $connection->disconnect(); ```

消费者 ca
ca.php ``` $exchangeName = 'log'; $queueName = 'queuea'; $routeKey = ''; $connection = new AMQPConnection(array('host' => '10.99.121.137','port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind($exchangeName, $routeKey); //阻塞模式接收消息 echo "Message:\n"; while(True){ $queue->consume('processMessage'); //$queue->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 } $conn->disconnect(); /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $q) { $msg = $envelope->getBody(); sleep(1); echo $msg."\n"; //处理消息 $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答 } ```

消费者 cb
cb.php ``` $exchangeName = 'log'; $queueName = 'queueb'; $routeKey = ''; $connection = new AMQPConnection(array('host' => '10.99.121.137','port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind($exchangeName, $routeKey); //阻塞模式接收消息 echo "Message:\n"; while(True){ $queue->consume('processMessage'); //$queue->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 } $conn->disconnect(); /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $q) { $msg = $envelope->getBody(); sleep(1); echo $msg."\n"; //处理消息 $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答 } ```

';