php rabbitmq yii2,PHP rabbitmq producer for yii2
发布日期:2021-06-24 17:40:00 浏览次数:2 分类:技术文章

本文共 8901 字,大约阅读时间需要 29 分钟。

在terminal下面运行

./yii test/a-m-q-p

yii TestController

namespace app\commands;

use app\lib\AMQP\Pusher;

class TestController extends Controller {

public function actionIndex() {

phpinfo();die;

echo "console test/index!\n";

return 0;

}

public function actionAMQP() {

// var_dump(\Yii::$app->params['amqp']); exit();

echo "console test/amqp!\n";

$res = Pusher::pushSMSToMQ($mobile='13692177080'

// , $sms_msg='【运呗】验证码1234,您正在注册成为ooo用户,感谢您的支持!'

// , $msg_platform='clysms'

, $sms_msg='【运呗】aseqwrqwerqwe成立业短信平台 2.2.个性短信接口测试,感谢您的支持! 测试'

, $msg_platform='clyafd_sale'

, $related_uid=1

, $related_row_id=time()

, $msg_datetime=''

, $related_table=''

);

var_dump($res);

var_dump(Pusher::getError());

return 0;

}

}

Pusher.php

/**

* Advanced Message Queuing Protocol 发送消息类

*

* @copyright leeyi.net @ 2014-2020

* @author leeyi

*/

namespace app\lib\AMQP;

class Pusher {

static public $msg_check_key_prefix = 'push_to_mq:';

static private $error = '';

/**

* 获取最近一次错误

*/

static public function getError() {

return self::$error;

}

/**

* 短信 发送MQ静态方法

*

* @param string $moible 接受短信的手机号码

* @param string $sms_msg 短信内容,内容格式为,字符串,或者json {"temp_name":"temp_name","params":{"key1":"val1","key2":"val2"}}

* @param string $msg_platform 消息平台,指定用哪个平台发送消息

* @param int $related_uid 关联业务UID

* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性

* @param datetime $msg_datetime 定时时间,为空时表示立即发送(选填) 格式:yyyy-MM-dd HH:mm:ss

* @param string $related_table 关联数据库表名称

*

* @access public

* @author leeyi

* @return string

*/

static public function pushSMSToMQ($mobile=''

, $sms_msg=''

, $msg_platform=''

, $related_uid=0

, $related_row_id=0

, $msg_datetime=''

, $related_table=''

) {

self::$error = '';

if(

!self::isMobile($mobile)

|| empty($sms_msg)

|| empty($msg_platform)

|| $related_uid<0

|| $related_row_id<0

) {

self::$error = '参数不合法';

return false;

}

$msg = [];

$msg['msg_platform'] = $msg_platform;

$msg['msg_datetime'] = $msg_datetime;

$msg['mobile'] = $mobile;

$msg['related_uid'] = intval($related_uid);

$msg['related_row_id'] = intval($related_row_id);

$msg['related_table'] = strval($related_table);

$msg_md5 = md5(json_encode($msg));

// sms_msg 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化

$msg['sms_msg'] = $sms_msg;

$message = ['msg_md5'=>$msg_md5, 'msg'=>$msg];

$res = self::checkMsg($msg_md5);

if( $res ) {

self::$error = '已经发送过了,请不要重复请求';

return false;

}

$mq = new Rabbit();

$res = $mq->sending($queue_name='push.sms', $message, $check_rule_time=time() );

if( $res ) {

$cache_key = self::$msg_check_key_prefix.$msg_md5;

//set方法的第一个参数是我们的数据对应的key值,方便我们获取到

//第二个参数即是我们要缓存的数据

//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0

\Yii::$app->cache->set($cache_key, 1, 86400);

}

return $res;

}

/**

* Email 发送MQ静态方法

*

* @param string $to_email 收件人Email地址

* @param string $name 收件人称呼

* @param string $msg_title 消息标题

* @param string $msg_body 消息内容

* @param int $related_uid 关联业务UID

* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性

*

* @access public

* @author leeyi

* @return string

*/

static public function pushEmailToMQ($to_email

, $name

, $msg_title

, $msg_body

, $related_uid=0

, $related_row_id=0

, $related_table=''

) {

self::$error = '';

if(

!self::isEmail($to_email)

|| empty($msg_title)

|| empty($msg_body)

|| $related_uid<0

|| $related_row_id<0

) {

self::$error = '参数不合法';

return false;

}

$msg = [];

$msg['to_email'] = $to_email;

$msg['name'] = $name;

$msg['msg_title'] = $msg_title;

$msg['related_uid'] = intval($related_uid);

$msg['related_row_id'] = intval($related_row_id);

$msg['related_table'] = strval($related_table);

$msg['msg_platform'] = 'email';

$msg_md5 = md5(json_encode($msg));

// msg_body 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化

$msg['msg_body'] = $msg_body;

$message = ['msg_md5'=>$msg_md5, 'msg'=>$msg];

$res = self::checkMsg($msg_md5);

if( $res ) {

self::$error = '已经发送过了,请不要重复请求';

return false;

}

$mq = new Rabbit();

$res = $mq->sending($queue_name='push.email', $message, $check_rule_time=time() );

if( $res ) {

$cache_key = self::$msg_check_key_prefix.$msg_md5;

//set方法的第一个参数是我们的数据对应的key值,方便我们获取到

//第二个参数即是我们要缓存的数据

//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0

\Yii::$app->cache->set($cache_key, 1, 86400);

}

return $res;

}

/**

* Jpush 发送MQ静态方法

*

* @param string $title 推送内容标题

* @param string $content 推送内容

* @param array $related_uids 关联业务UID 没有需要设置为 array(0)

* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性

*

* @access public

* @author leeyi

* @return string

*/

static public function jpushToMQ($title

, $content

, $related_row_id=0

, $related_uids=[]

, $platform='afd'

) {

self::$error = '';

if(

empty($content)

|| !is_array($related_uids)

|| $related_row_id<0

) {

self::$error = '参数不合法';

return false;

}

$msg = [];

$msg['title'] = $title;

$msg['related_uids'] = $related_uids;

$msg['related_row_id'] = intval($related_row_id);

$msg['platform'] = $platform;

$msg_md5 = md5(json_encode($msg));

// content 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化

$msg['content'] = $content;

$msg['msg_platform'] = 'jpush';

$message = array('msg_md5' => $msg_md5, 'msg' => $msg);

$res = self::checkMsg($msg_md5);

if( $res ) {

self::$error = '已经发送过了,请不要重复请求';

return false;

}

$mq = new Rabbit();

$res = $mq->sending($queue_name='push.jpush', $message, $check_rule_time=time() );

if( $res ) {

$cache_key = self::$msg_check_key_prefix.$msg_md5;

//set方法的第一个参数是我们的数据对应的key值,方便我们获取到

//第二个参数即是我们要缓存的数据

//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0

\Yii::$app->cache->set($cache_key, 1, 86400);

}

return $res;

}

/**

* 检查消息是否发送过

* @param string $msg_md5 32位 md5,消息唯一标示

*

* @access private

* @author leeyi

* @return boolean [true|false] 返回true 标示已经发送过消息了,不需要在发送了

*/

static private function checkMsg($msg_md5) {

$cache_key = self::$msg_check_key_prefix.$msg_md5;

$count = \Yii::$app->cache->get($cache_key);

return $count>0 ? true : false;

}

static private function isMobile($mobile) {

return preg_match("/^1\d{10}$/", $mobile);

}

static private function isEmail($email) {

$pattern = "/^([0-9A-Za-z\\-_\\.]+)@([0-9a-z]+\\.[a-z]{2,3}(\\.[a-z]{2})?)$/i";

return preg_match($pattern, $email);

}

}

Rabbit.php

/**

* @desc Advanced Message Queuing Protocol 发送消息类

*

* @name Rabbit

* @copyright leeyi.net @ 2014-2020

* @author leeyi

*/

namespace app\lib\AMQP;

/**

* 检查环境是否支持

*/

extension_loaded("amqp") or die("not extension amqp!");

class Rabbit {

/**

* rabbitmq链接

* @name $connection

*/

public $connection=false;

public $channel=false;

public $error = '';

/**

* 构造函数,初始化数据库链接等

* @author leeyi

*/

public function __construct($options=array()) {

$amqp = \Yii::$app->params['amqp'];

if( empty($amqp) ) {

exit('not config rabbitmq');

}

$this->connection = $this->connect($options);

$this->channel = new \AMQPChannel($this->connection);

}

/**

* 获取最近一次错误

*/

public function getError() {

return $this->error;

}

/**

* 持久化的消息发送

* @access public

* @param string $queue_name 触发点标识(这里作为队列名称)

* @param array $message 消息主体,一个多维数组,只包含元素 msg_md5 和 msg 两个元素,msg 可以是任意数组

* @param int $check_rule_time 触发点检查“奖励规则的当前时间“,作为 mq queue 的 timestamp attr

* @return true

* @author leeyi

*/

public function sending($queue_name, $message, $check_rule_time) {

/**

* 下面的 message 必须是 包含 msg_md5 和 msg 元素的数组,不然后端程序不会处理该消息

* msg_md5 消息身份标识

*/

if(

!isset($message['msg_md5']) ||

empty($message['msg_md5']) ||

!isset($message['msg']) ||

$check_rule_time<1

) {

return false;

}

$direct_exchange_name = 'direct.exchange.'.$queue_name;

$routingkey_name = 'routingkey.'.$queue_name;

$queue_name = 'queue.'.$queue_name;

//创建exchange名称和类型

$exchange = new \AMQPExchange($this->channel);

$exchange->setName($direct_exchange_name);

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->setFlags(AMQP_DURABLE); // 持久化

$exchange->declareExchange();

//创建queue名称,使用exchange,绑定routingkey

$queue = new \AMQPQueue($this->channel);

$queue->setName($queue_name);

$queue->setFlags(AMQP_DURABLE);

$queue->declareQueue();

$queue->bind($direct_exchange_name, $routingkey_name);

$attrs = [];

$attrs['content_type'] = 'application/json';

$attrs['content_encoding'] = 'utf-8';

$attrs['timestamp'] = $check_rule_time;

// $attrs['delivery_mode'] = '2'; // 持久化消息

// 消息发布

// $flag=AMQP_MANDATORY 在发布消息时,消息必须被路由到一个有效的队列中。如果不是,将返回一个错误。

$this->channel->startTransaction();

$exchange->publish(json_encode($message), $routingkey_name, $flag=AMQP_MANDATORY, $attrs);

$this->channel->commitTransaction();

return true;

}

/**

* 链接rabbitmq 只运行在够着函数使用

* @param $options['host'] string 服务地址

* @param $options['port'] int 端口

* @param $options['login'] string 登录账号

* @param $options['password'] string 账号密码

* @param $options['vhost'] string 虚拟站点

* @access protected

* @author leeyi

* @return $connect

*/

protected function connect($options=[]) {

//连接RabbitMQ

$amqp = \Yii::$app->params['amqp'];

$options = array_merge( [

'host'=>$amqp['host'] ,

'port'=> $amqp['port'],

'vhost'=>$amqp['vhost'],

'login'=>$amqp['user'],

'password'=>$amqp['password'],

], (array)$options);

$connection = new \AMQPConnection($options);

$connection->connect();

return $connection;

}

/**

* 析构函数

* @access public

*/

public function __destruct() {

// 断开链接

$this->connection && $this->connection->disconnect();

}

}

转载地址:https://blog.csdn.net/weixin_34185033/article/details/115634141 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:php选择什么框架,关于如何正确使用PHP框架及如何选择框架之我见
下一篇:php charat,如何在PHP中生成字符串的所有排列?

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年04月05日 03时55分10秒