Direct 直发模式 php 版

最后更新于:2022-04-02 04:02:08

[TOC] ## 概述 > [参考 CSDN](https://blog.csdn.net/Super_RD/article/details/70792746) * 没有模糊匹配,直接 RouteKey 安全相同 * 如果一个消费者处理不够,可再创建一个代码不变,即RouteKey 相同 ### code 生产者代码
p.php ``` $queueName = 'superrd'; $exchangeName = 'superrd'; $routeKey = 'superrd'; $message = 'task--'; $connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd')); $connection->connect() or die("Cannot connect to the broker!\n"); try { $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind($exchangeName, $routeKey); for($i=0 ; $i<100;$i++){ $exchange->publish($message.$i,$routeKey); var_dump("[x] Sent $message $i"); } } catch (AMQPConnectionException $e) { var_dump($e); exit(); } $connection->disconnect(); ```

消费者 c1.php 与c2.php 相同
c1.php ``` $queueName = 'superrd'; $exchangeName = 'superrd'; $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind($exchangeName, $routeKey); //阻塞模式接收消息 echo "Message:\n"; while(True){ $queue->consume('processMessage'); //自动ACK应答 //$queue->consume('processMessage', AMQP_AUTOACK); } $conn->disconnect(); /* * 消费回调函数 * 处理消息 */ function processMessage($envelope, $q) { $msg = $envelope->getBody(); sleep(1); //sleep1秒模拟任务处理 echo $msg."\n"; //处理消息 $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答 } ```

';