Java笔记-使用RabbitMQ的Java接口实现Fair dispatch(公平分发)
发布日期:2021-06-30 11:01:57 浏览次数:2 分类:技术文章

本文共 3942 字,大约阅读时间需要 13 分钟。

目录

 

 


 

基本概念

当某些客户端处理比较强的时候,就多发数据让其处理,当某些客户端处理一般的时候,就少发数据让其处理。

主要是让消费者处理完后,回信息给RabbitMQ,然后RabbitMQ才会发送下一个。

使用basicQos(perfetch = 1)

注意:使用公平分发必须关闭自动应答ack改成手动。

 

代码与实例

程序运行截图如下:

生产者:

两个消费者,其中一个的效率是另外一个的2倍(X2的效率)

源码如下:

Send.java

package work;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Send {    private static final String QUEUE_NAME = "test_work_queue";    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        Connection connection = ConnectionUtils.getConnect();        Channel channel = connection.createChannel();        //声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        //每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个        //限制发送给一个消费者不超过一条        int prefetchCount = 1;        channel.basicQos(prefetchCount);        for(int i = 0; i < 50; i++){            String msg = "Hello World : " + i;            System.out.println("send msg : " + msg);            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());            Thread.sleep(i * 20);        }        channel.close();        connection.close();    }}

Recv1.java

package work;import com.rabbitmq.client.*;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv1 {    private static final String QUEUE_NAME = "test_work_queue";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtils.getConnect();        final Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        //保证一次只发一个        channel.basicQos(1);        System.out.println("recv1 running");        Consumer consumer = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String msg = new String(body, "utf-8");                System.out.println("Recv[1] msg is : " + msg);                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    System.out.println("Recv[1] done");                    /*关闭自动应答,手动处理*/                    channel.basicAck(envelope.getDeliveryTag(), false);                }            }        };        channel.basicConsume(QUEUE_NAME, false, consumer);    }}

Recv2.java

package work;import com.rabbitmq.client.*;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv2 {    private static final String QUEUE_NAME = "test_work_queue";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = ConnectionUtils.getConnect();        final Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        channel.basicQos(1);        System.out.println("recv2 running");        Consumer consumer = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String msg = new String(body, "utf-8");                System.out.println("Recv[2] msg is : " + msg);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    System.out.println("Recv[1] done");                    channel.basicAck(envelope.getDeliveryTag(), false);                }            }        };        channel.basicConsume(QUEUE_NAME, false, consumer);    }}

源码下载地址:

转载地址:https://it1995.blog.csdn.net/article/details/93596563 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Web前端笔记-使用bootstrap-table.js和bootstrap-table.css使得表格分页
下一篇:Qt文档阅读笔记-GridLayout QML Type解析与实例

发表评论

最新留言

关注你微信了!
[***.104.42.241]2024年04月28日 23时27分59秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章