RabbitMQ笔记-使用rabbitmq-c实现Fair dispatch(公平分发)
发布日期:2021-06-30 11:02:00
浏览次数:2
分类:技术文章
本文共 3121 字,大约阅读时间需要 10 分钟。
目录
概念及注意
这里C接口中有2个函数一个是:
amqp_basic_get()
另外一个是:
amqp_basic_consume()
前者可以一条一条的读取,后者是一次读取一大坨数据。所以目前要使用
amqp_basic_get()去读取RabbitMQ的数据,同时使用amqp_read_message()把数据给读出来。
读完一条数据就发一个确认,这样的话即能者多劳。
代码与实例
程序运行截图如下:
消息队列端:
源码如下:
consumer端:
#define _CRT_SECURE_NO_WARNINGS#include#include #include #include #include #include #include using namespace std;void delay_msec(int msec){ clock_t now = clock(); while(clock() - now < msec);}int main(int *argc, int *argv[]){ string name = (char *)argv[1]; string delayStr = (char*)argv[2]; //cout << "The delayStr is : " << delayStr << endl; int delay = std::stoi(delayStr); //cout << "The delay is : " << delay << endl; //string name = "one"; //int delay = 1; string hostName = "127.0.0.1"; int port = 5672; amqp_socket_t *socket = nullptr; amqp_connection_state_t conn; conn = amqp_new_connection(); socket = amqp_tcp_socket_new(conn); if(!socket){ cout << "create socket failed!"; exit(1); } if(amqp_socket_open(socket, hostName.c_str(), port)){ cout << "opening TCP socket failed" << endl; exit(1); } //登录 if(1 != amqp_login(conn, "/vhost_cff", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "cff", "123").reply_type){ cout << "login failed" << endl; exit(1); } amqp_channel_open(conn, 1); while(1){ amqp_basic_get(conn, 1, amqp_cstring_bytes("test_work_queue"), 1); amqp_message_t *msg = new amqp_message_t; amqp_read_message(conn, 1, msg, 0); cout << "【" << name << "】 The result is : " << (char *)msg->body.bytes << endl; amqp_destroy_message(msg); delete msg; delay_msec(1000 * delay); } amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); amqp_connection_close(conn, AMQP_REPLY_SUCCESS); getchar(); return 0;}
producer端:
#define _CRT_SECURE_NO_WARNINGS#include#include #include #include #include #include #include using namespace std;void delay_msec(int msec){ clock_t now = clock(); while(clock() - now < msec);}int main(int *argc, int *argv[]){ string hostName = "127.0.0.1"; int port = 5672; amqp_socket_t *socket = nullptr; amqp_connection_state_t conn; conn = amqp_new_connection(); socket = amqp_tcp_socket_new(conn); if(!socket){ cout << "create socket failed!"; exit(1); } if(amqp_socket_open(socket, hostName.c_str(), port)){ cout << "opening TCP socket failed" << endl; exit(1); } //登录 if(1 != amqp_login(conn, "/vhost_cff", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "cff", "123").reply_type){ cout << "login failed" << endl; } //管道 amqp_channel_open(conn, 1); int i = 0; while(1){ string str = "Hello " + to_string(i); char message[64] = {'\0'}; strcpy(message, str.c_str()); amqp_bytes_t message_bytes; message_bytes.len = sizeof(message); message_bytes.bytes = message; amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes("test_work_queue"), 0, 0, nullptr, message_bytes); cout << "send msg over!" << str << endl; i++; delay_msec(100); if(i == 10000) break; } amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); amqp_connection_close(conn, AMQP_REPLY_SUCCESS); getchar(); return 0;}
源码下载地址:
转载地址:https://it1995.blog.csdn.net/article/details/94012784 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
做的很好,不错不错
[***.243.131.199]2024年05月01日 08时04分42秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
SQL Server的数据加密简介
2019-04-30
python 部分代码2
2019-04-30
56种哈希类算法
2019-04-30
RF工具ride使用
2019-04-30
pip简单命令
2019-04-30
Python自动化操作Excel表格
2019-04-30
openssl 实现https 网站
2019-04-30
SQLite3日期与时间,常见函数
2019-04-30
sql 添加时间段内随机时间
2019-04-30
Python 字符串
2019-04-30
十、数值计算函数
2019-04-30
十一、打印和打印机设置函数
2019-04-30
十二、注册表操作函数
2019-04-30
C++的除法需要留意的几点情况
2019-04-30
C++打印三角形、四边形
2019-04-30
C++程序的基本组成简介
2019-04-30
JavaScript变量及访问方式介绍
2019-04-30
centos7 hbase1.4.13+hadoop2.7.1+单机环境搭建
2019-04-30
十四、系统与环境函数
2019-04-30
十五、定时函数
2019-04-30