RabbitMQ笔记-使用rabbitmq-c实现Fair dispatch(公平分发)
发布日期:2021-06-30 11:02:00 浏览次数:2 分类:技术文章

本文共 3121 字,大约阅读时间需要 10 分钟。

目录

 

 


 

概念及注意

这里C接口中有2个函数一个是:

amqp_basic_get()

另外一个是:

amqp_basic_consume()

前者可以一条一条的读取,后者是一次读取一大坨数据。所以目前要使用

amqp_basic_get()去读取RabbitMQ的数据,同时使用amqp_read_message()把数据给读出来。

读完一条数据就发一个确认,这样的话即能者多劳。

 

代码与实例

程序运行截图如下:

消息队列端:

源码如下:

consumer端:

#define _CRT_SECURE_NO_WARNINGS#include 
#include
#include
#include
#include
#include
#include
using namespace std;void delay_msec(int msec){ clock_t now = clock(); while(clock() - now < msec);}int main(int *argc, int *argv[]){ string name = (char *)argv[1]; string delayStr = (char*)argv[2]; //cout << "The delayStr is : " << delayStr << endl; int delay = std::stoi(delayStr); //cout << "The delay is : " << delay << endl; //string name = "one"; //int delay = 1; string hostName = "127.0.0.1"; int port = 5672; amqp_socket_t *socket = nullptr; amqp_connection_state_t conn; conn = amqp_new_connection(); socket = amqp_tcp_socket_new(conn); if(!socket){ cout << "create socket failed!"; exit(1); } if(amqp_socket_open(socket, hostName.c_str(), port)){ cout << "opening TCP socket failed" << endl; exit(1); } //登录 if(1 != amqp_login(conn, "/vhost_cff", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "cff", "123").reply_type){ cout << "login failed" << endl; exit(1); } amqp_channel_open(conn, 1); while(1){ amqp_basic_get(conn, 1, amqp_cstring_bytes("test_work_queue"), 1); amqp_message_t *msg = new amqp_message_t; amqp_read_message(conn, 1, msg, 0); cout << "【" << name << "】 The result is : " << (char *)msg->body.bytes << endl; amqp_destroy_message(msg); delete msg; delay_msec(1000 * delay); } amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); amqp_connection_close(conn, AMQP_REPLY_SUCCESS); getchar(); return 0;}

producer端:

#define _CRT_SECURE_NO_WARNINGS#include 
#include
#include
#include
#include
#include
#include
using namespace std;void delay_msec(int msec){ clock_t now = clock(); while(clock() - now < msec);}int main(int *argc, int *argv[]){ string hostName = "127.0.0.1"; int port = 5672; amqp_socket_t *socket = nullptr; amqp_connection_state_t conn; conn = amqp_new_connection(); socket = amqp_tcp_socket_new(conn); if(!socket){ cout << "create socket failed!"; exit(1); } if(amqp_socket_open(socket, hostName.c_str(), port)){ cout << "opening TCP socket failed" << endl; exit(1); } //登录 if(1 != amqp_login(conn, "/vhost_cff", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "cff", "123").reply_type){ cout << "login failed" << endl; } //管道 amqp_channel_open(conn, 1); int i = 0; while(1){ string str = "Hello " + to_string(i); char message[64] = {'\0'}; strcpy(message, str.c_str()); amqp_bytes_t message_bytes; message_bytes.len = sizeof(message); message_bytes.bytes = message; amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes("test_work_queue"), 0, 0, nullptr, message_bytes); cout << "send msg over!" << str << endl; i++; delay_msec(100); if(i == 10000) break; } amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); amqp_connection_close(conn, AMQP_REPLY_SUCCESS); getchar(); return 0;}

 

源码下载地址:

转载地址:https://it1995.blog.csdn.net/article/details/94012784 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Java笔记-使用RabbitMQ的Java接口实现Publish/Subscribe(订阅模式)
下一篇:C++设计模式-观察者模式

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2024年05月01日 08时04分42秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章