Java笔记-使用RabbitMQ的Java接口实现Fair dispatch(公平分发)
发布日期:2021-06-30 11:01:57
浏览次数:2
分类:技术文章
本文共 3942 字,大约阅读时间需要 13 分钟。
目录
基本概念
当某些客户端处理比较强的时候,就多发数据让其处理,当某些客户端处理一般的时候,就少发数据让其处理。
主要是让消费者处理完后,回信息给RabbitMQ,然后RabbitMQ才会发送下一个。
使用basicQos(perfetch = 1)
注意:使用公平分发必须关闭自动应答ack改成手动。
代码与实例
程序运行截图如下:
生产者:
两个消费者,其中一个的效率是另外一个的2倍(X2的效率)
源码如下:
Send.java
package work;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnect(); Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个 //限制发送给一个消费者不超过一条 int prefetchCount = 1; channel.basicQos(prefetchCount); for(int i = 0; i < 50; i++){ String msg = "Hello World : " + i; System.out.println("send msg : " + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); Thread.sleep(i * 20); } channel.close(); connection.close(); }}
Recv1.java
package work;import com.rabbitmq.client.*;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnect(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //保证一次只发一个 channel.basicQos(1); System.out.println("recv1 running"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("Recv[1] msg is : " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("Recv[1] done"); /*关闭自动应答,手动处理*/ channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); }}
Recv2.java
package work;import com.rabbitmq.client.*;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnect(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println("recv2 running"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("Recv[2] msg is : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("Recv[1] done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); }}
源码下载地址:
转载地址:https://it1995.blog.csdn.net/article/details/93596563 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
关注你微信了!
[***.104.42.241]2024年04月28日 23时27分59秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
csdn搜索博主的文章
2019-04-30
ORA-12552: TNS:operation was interrupted
2019-04-30
MyISAM表修复
2019-04-30
GNS3中关联使用SecureCRT
2019-04-30
oracle日期的相关有用统计SQL
2019-04-30
oracle数据库LOB管理
2019-04-30
oracle数据库BFILE的应用demo
2019-04-30
Oracle数据库自定义类型--对象类型
2019-04-30
oracle分区详解
2019-04-30
SuperSlide.js
2019-04-30
oracle查看索引所在表空间及索引重建
2019-04-30
网络知识的三组数
2019-04-30
让nginx支持ipv6
2019-04-30
dbms_metadata.get_ddl的用法
2019-04-30
oracle中PL/SQL学习笔记
2019-04-30
telnet命令关闭tomcat
2019-04-30
tomcat隐藏版本信息
2019-04-30
配置tomcat的错误404页面
2019-04-30
oracle游标详解
2019-04-30
给第三方用户提供oracle查询
2019-04-30