本文共 8901 字,大约阅读时间需要 29 分钟。
在terminal下面运行
./yii test/a-m-q-p
yii TestController
namespace app\commands;
use app\lib\AMQP\Pusher;
class TestController extends Controller {
public function actionIndex() {
phpinfo();die;
echo "console test/index!\n";
return 0;
}
public function actionAMQP() {
// var_dump(\Yii::$app->params['amqp']); exit();
echo "console test/amqp!\n";
$res = Pusher::pushSMSToMQ($mobile='13692177080'
// , $sms_msg='【运呗】验证码1234,您正在注册成为ooo用户,感谢您的支持!'
// , $msg_platform='clysms'
, $sms_msg='【运呗】aseqwrqwerqwe成立业短信平台 2.2.个性短信接口测试,感谢您的支持! 测试'
, $msg_platform='clyafd_sale'
, $related_uid=1
, $related_row_id=time()
, $msg_datetime=''
, $related_table=''
);
var_dump($res);
var_dump(Pusher::getError());
return 0;
}
}
Pusher.php
/**
* Advanced Message Queuing Protocol 发送消息类
*
* @copyright leeyi.net @ 2014-2020
* @author leeyi
*/
namespace app\lib\AMQP;
class Pusher {
static public $msg_check_key_prefix = 'push_to_mq:';
static private $error = '';
/**
* 获取最近一次错误
*/
static public function getError() {
return self::$error;
}
/**
* 短信 发送MQ静态方法
*
* @param string $moible 接受短信的手机号码
* @param string $sms_msg 短信内容,内容格式为,字符串,或者json {"temp_name":"temp_name","params":{"key1":"val1","key2":"val2"}}
* @param string $msg_platform 消息平台,指定用哪个平台发送消息
* @param int $related_uid 关联业务UID
* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性
* @param datetime $msg_datetime 定时时间,为空时表示立即发送(选填) 格式:yyyy-MM-dd HH:mm:ss
* @param string $related_table 关联数据库表名称
*
* @access public
* @author leeyi
* @return string
*/
static public function pushSMSToMQ($mobile=''
, $sms_msg=''
, $msg_platform=''
, $related_uid=0
, $related_row_id=0
, $msg_datetime=''
, $related_table=''
) {
self::$error = '';
if(
!self::isMobile($mobile)
|| empty($sms_msg)
|| empty($msg_platform)
|| $related_uid<0
|| $related_row_id<0
) {
self::$error = '参数不合法';
return false;
}
$msg = [];
$msg['msg_platform'] = $msg_platform;
$msg['msg_datetime'] = $msg_datetime;
$msg['mobile'] = $mobile;
$msg['related_uid'] = intval($related_uid);
$msg['related_row_id'] = intval($related_row_id);
$msg['related_table'] = strval($related_table);
$msg_md5 = md5(json_encode($msg));
// sms_msg 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化
$msg['sms_msg'] = $sms_msg;
$message = ['msg_md5'=>$msg_md5, 'msg'=>$msg];
$res = self::checkMsg($msg_md5);
if( $res ) {
self::$error = '已经发送过了,请不要重复请求';
return false;
}
$mq = new Rabbit();
$res = $mq->sending($queue_name='push.sms', $message, $check_rule_time=time() );
if( $res ) {
$cache_key = self::$msg_check_key_prefix.$msg_md5;
//set方法的第一个参数是我们的数据对应的key值,方便我们获取到
//第二个参数即是我们要缓存的数据
//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0
\Yii::$app->cache->set($cache_key, 1, 86400);
}
return $res;
}
/**
* Email 发送MQ静态方法
*
* @param string $to_email 收件人Email地址
* @param string $name 收件人称呼
* @param string $msg_title 消息标题
* @param string $msg_body 消息内容
* @param int $related_uid 关联业务UID
* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性
*
* @access public
* @author leeyi
* @return string
*/
static public function pushEmailToMQ($to_email
, $name
, $msg_title
, $msg_body
, $related_uid=0
, $related_row_id=0
, $related_table=''
) {
self::$error = '';
if(
!self::isEmail($to_email)
|| empty($msg_title)
|| empty($msg_body)
|| $related_uid<0
|| $related_row_id<0
) {
self::$error = '参数不合法';
return false;
}
$msg = [];
$msg['to_email'] = $to_email;
$msg['name'] = $name;
$msg['msg_title'] = $msg_title;
$msg['related_uid'] = intval($related_uid);
$msg['related_row_id'] = intval($related_row_id);
$msg['related_table'] = strval($related_table);
$msg['msg_platform'] = 'email';
$msg_md5 = md5(json_encode($msg));
// msg_body 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化
$msg['msg_body'] = $msg_body;
$message = ['msg_md5'=>$msg_md5, 'msg'=>$msg];
$res = self::checkMsg($msg_md5);
if( $res ) {
self::$error = '已经发送过了,请不要重复请求';
return false;
}
$mq = new Rabbit();
$res = $mq->sending($queue_name='push.email', $message, $check_rule_time=time() );
if( $res ) {
$cache_key = self::$msg_check_key_prefix.$msg_md5;
//set方法的第一个参数是我们的数据对应的key值,方便我们获取到
//第二个参数即是我们要缓存的数据
//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0
\Yii::$app->cache->set($cache_key, 1, 86400);
}
return $res;
}
/**
* Jpush 发送MQ静态方法
*
* @param string $title 推送内容标题
* @param string $content 推送内容
* @param array $related_uids 关联业务UID 没有需要设置为 array(0)
* @param int $related_row_id 关联业务ID 主要用该值保证消息的唯一性
*
* @access public
* @author leeyi
* @return string
*/
static public function jpushToMQ($title
, $content
, $related_row_id=0
, $related_uids=[]
, $platform='afd'
) {
self::$error = '';
if(
empty($content)
|| !is_array($related_uids)
|| $related_row_id<0
) {
self::$error = '参数不合法';
return false;
}
$msg = [];
$msg['title'] = $title;
$msg['related_uids'] = $related_uids;
$msg['related_row_id'] = intval($related_row_id);
$msg['platform'] = $platform;
$msg_md5 = md5(json_encode($msg));
// content 不参与md5,因为它很有可能附带时间参数,导致md5结果每次都会变化
$msg['content'] = $content;
$msg['msg_platform'] = 'jpush';
$message = array('msg_md5' => $msg_md5, 'msg' => $msg);
$res = self::checkMsg($msg_md5);
if( $res ) {
self::$error = '已经发送过了,请不要重复请求';
return false;
}
$mq = new Rabbit();
$res = $mq->sending($queue_name='push.jpush', $message, $check_rule_time=time() );
if( $res ) {
$cache_key = self::$msg_check_key_prefix.$msg_md5;
//set方法的第一个参数是我们的数据对应的key值,方便我们获取到
//第二个参数即是我们要缓存的数据
//第三个参数是缓存时间,如果是0,意味着永久缓存。默认是0
\Yii::$app->cache->set($cache_key, 1, 86400);
}
return $res;
}
/**
* 检查消息是否发送过
* @param string $msg_md5 32位 md5,消息唯一标示
*
* @access private
* @author leeyi
* @return boolean [true|false] 返回true 标示已经发送过消息了,不需要在发送了
*/
static private function checkMsg($msg_md5) {
$cache_key = self::$msg_check_key_prefix.$msg_md5;
$count = \Yii::$app->cache->get($cache_key);
return $count>0 ? true : false;
}
static private function isMobile($mobile) {
return preg_match("/^1\d{10}$/", $mobile);
}
static private function isEmail($email) {
$pattern = "/^([0-9A-Za-z\\-_\\.]+)@([0-9a-z]+\\.[a-z]{2,3}(\\.[a-z]{2})?)$/i";
return preg_match($pattern, $email);
}
}
Rabbit.php
/**
* @desc Advanced Message Queuing Protocol 发送消息类
*
* @name Rabbit
* @copyright leeyi.net @ 2014-2020
* @author leeyi
*/
namespace app\lib\AMQP;
/**
* 检查环境是否支持
*/
extension_loaded("amqp") or die("not extension amqp!");
class Rabbit {
/**
* rabbitmq链接
* @name $connection
*/
public $connection=false;
public $channel=false;
public $error = '';
/**
* 构造函数,初始化数据库链接等
* @author leeyi
*/
public function __construct($options=array()) {
$amqp = \Yii::$app->params['amqp'];
if( empty($amqp) ) {
exit('not config rabbitmq');
}
$this->connection = $this->connect($options);
$this->channel = new \AMQPChannel($this->connection);
}
/**
* 获取最近一次错误
*/
public function getError() {
return $this->error;
}
/**
* 持久化的消息发送
* @access public
* @param string $queue_name 触发点标识(这里作为队列名称)
* @param array $message 消息主体,一个多维数组,只包含元素 msg_md5 和 msg 两个元素,msg 可以是任意数组
* @param int $check_rule_time 触发点检查“奖励规则的当前时间“,作为 mq queue 的 timestamp attr
* @return true
* @author leeyi
*/
public function sending($queue_name, $message, $check_rule_time) {
/**
* 下面的 message 必须是 包含 msg_md5 和 msg 元素的数组,不然后端程序不会处理该消息
* msg_md5 消息身份标识
*/
if(
!isset($message['msg_md5']) ||
empty($message['msg_md5']) ||
!isset($message['msg']) ||
$check_rule_time<1
) {
return false;
}
$direct_exchange_name = 'direct.exchange.'.$queue_name;
$routingkey_name = 'routingkey.'.$queue_name;
$queue_name = 'queue.'.$queue_name;
//创建exchange名称和类型
$exchange = new \AMQPExchange($this->channel);
$exchange->setName($direct_exchange_name);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE); // 持久化
$exchange->declareExchange();
//创建queue名称,使用exchange,绑定routingkey
$queue = new \AMQPQueue($this->channel);
$queue->setName($queue_name);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($direct_exchange_name, $routingkey_name);
$attrs = [];
$attrs['content_type'] = 'application/json';
$attrs['content_encoding'] = 'utf-8';
$attrs['timestamp'] = $check_rule_time;
// $attrs['delivery_mode'] = '2'; // 持久化消息
// 消息发布
// $flag=AMQP_MANDATORY 在发布消息时,消息必须被路由到一个有效的队列中。如果不是,将返回一个错误。
$this->channel->startTransaction();
$exchange->publish(json_encode($message), $routingkey_name, $flag=AMQP_MANDATORY, $attrs);
$this->channel->commitTransaction();
return true;
}
/**
* 链接rabbitmq 只运行在够着函数使用
* @param $options['host'] string 服务地址
* @param $options['port'] int 端口
* @param $options['login'] string 登录账号
* @param $options['password'] string 账号密码
* @param $options['vhost'] string 虚拟站点
* @access protected
* @author leeyi
* @return $connect
*/
protected function connect($options=[]) {
//连接RabbitMQ
$amqp = \Yii::$app->params['amqp'];
$options = array_merge( [
'host'=>$amqp['host'] ,
'port'=> $amqp['port'],
'vhost'=>$amqp['vhost'],
'login'=>$amqp['user'],
'password'=>$amqp['password'],
], (array)$options);
$connection = new \AMQPConnection($options);
$connection->connect();
return $connection;
}
/**
* 析构函数
* @access public
*/
public function __destruct() {
// 断开链接
$this->connection && $this->connection->disconnect();
}
}
转载地址:https://blog.csdn.net/weixin_34185033/article/details/115634141 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!