php-rabbitmq结合rabbitmq_delayed_message_exchange实现延时队列
发布日期:2021-06-29 14:32:39 浏览次数:2 分类:技术文章

本文共 4023 字,大约阅读时间需要 13 分钟。

#查看插件列表rabbitmq-plugins list#如果未安装,则下载wget https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip#解压unzip rabbitmq_delayed_message_exchange-20171201-3.7.x.zip#移动到rabbitmq plugins文件夹中,本机的rabbitmq是docker容器,plugins文件夹位于/pluginsmv rabbitmq_delayed_message_exchange-20171201-3.7.x.ez ./plugins#启用rabbitmq_delayed_message_exchange插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange#查看插件启用情况rabbitmq-plugins list

重启rabbitmq

2,php-rabbitmq(自行composer安装)

parseQueueConfig($queue_name); $host = $queue_config['host']; $port = $queue_config['port']; $username = $queue_config['username']; $password = $queue_config['password']; //创建连接 $this->connection = new AMQPStreamConnection($host, $port, $username, $password); $this->channel = $this->connection->channel(); $this->channel->exchange_declare( $queue_name, //exchange类型为x-delayed-message 'x-delayed-message', false, true, false, false, false, //此处是重点,$argument必须使用new AMQPTable()生成 new AMQPTable([ "x-delayed-type" => 'direct' ]) ); //队列声明 $this->channel->queue_declare($queue_name, false, true, false, false); //队列与exchange绑定 $this->channel->queue_bind($queue_name, $queue_name, $queue_name); } //生成消息 private function createDelayMsg($data) {
$this->msg = new AMQPMessage( $data, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, //此处是重点,设置延时时间,单位是毫秒 1s=1000ms,实例延迟20s 'application_headers' => new AMQPTable([ 'x-delay' => 20000, ]) ] ); return $this->msg; } //生产者发送消息,代码中只需调用此方法发送消息即可 public function publishDelayMsg($queue_name, $msg_data) {
$this->initDelay($queue_name); $msg = $this->createDelayMsg($msg_data); $this->channel->basic_publish($msg, $queue_name, $queue_name); $this->channel->close(); $this->connection->close(); } //消费者,代码中调用消费 public function workerMsg($queue_name, $callback = null) {
$this->init($queue_name); $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($queue_name, '', false, false, false, false, $callback); while (count($this->channel->callbacks)) {
$this->channel->wait(); } $this->channel->close(); $this->connection->close(); } //常驻内存 消费实例 protected function execute(InputInterface $input, OutputInterface $output) {
$queue = new QueueService(); $queue->workerMsg('order_check', function ($msg) {
$unique = md5(uniqid(time())); try {
logger("[$unique]order check Queue Received[{
$msg->body}] "); $orderData = json_decode($msg->body, true); $orderId = $orderData['id']; $flag = array_get($orderData, 'flag'); $order = new OrderService(); list($code, $errMsg,) = $order->changeOrderStatus($orderId, $flag); if ($code > 0) {
//意外错误重新分配 logger("[$unique]order check Queue Exception[{
$code} {
$errMsg}] "); $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false); } else {
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } } catch (\Exception $e) {
//意外错误重新分配 logger("[$unique]order check Queue Exception[{
$e->getCode()} {
$e->getMessage()}] "); $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false); } }); }

转载地址:https://chocolate.blog.csdn.net/article/details/106680582 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:【网络安全实验】解决 ERROR 1064 (42000): You have an error in your SQL syntax ... near …
下一篇:【C++实现】编译原理 免考小队 FIRSTVT集生成算法

发表评论

最新留言

不错!
[***.144.177.141]2024年04月14日 19时39分40秒