Java笔记-使用RabbitMQ的Java接口实现Publish/Subscribe(订阅模式)
发布日期:2021-06-30 11:02:00
浏览次数:2
分类:技术文章
本文共 3055 字,大约阅读时间需要 10 分钟。
目录
基本概念
模型如上;
1. 一个生产者,多个消费者;
2. 每个消费者都有自己的队列;
3. 生产者没有直接把消息发送到队列,而是发送到交换机,通过交换机转发到队列;
4. 每个队列都要绑定到交换机上;
5. 生产者发送的消息经过交换机到达队列就能实现一个消息被多个消费者消费;
这里要注意:
1. 在RabbitMQ中交换机没有存储能力,只有队列里面有;
代码与实例
程序运行截图如下:
生产者:
两个消费者:
RabbitMQ相关:
关键源码如下:
RecvX.java
package ps;import com.rabbitmq.client.*;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv1 { private static final String QUEUE_NAME = "test_queue_fanout_sms"; private static final String EXCHAGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnect(); final Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定到交换机 channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, ""); channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Recv msg : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); }}
Send.java
package ps;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import util.ConnectionUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Send { private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnect(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //并分配 String msg = "Hello ps"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("Send : " + msg); channel.close(); connection.close(); }}
ConnectionUtils.java
package util;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConnectionUtils { public static Connection getConnect() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/vhost_cff"); factory.setUsername("cff"); factory.setPassword("123"); return factory.newConnection(); }}
源码下载地址:
转载地址:https://it1995.blog.csdn.net/article/details/94168743 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
能坚持,总会有不一样的收获!
[***.219.124.196]2024年04月30日 08时03分58秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Oracle数据库的数据备份,本地,异地,exp-imp,expdp-impdp
2019-04-30
补:Oracle 的数据泵导出(expdp)及导入(impdp)
2019-04-30
oracle 通过操作系统认证的方式登录sys时报错:ORA-01031:权限不足
2019-04-30
关于PL/SQL Developer导入csv文件
2019-04-30
Oracle的 wm_concat 的排序问题,Oracle的 listagg 函数
2019-04-30
Oracle 行转列 pivot函数基本用法
2019-04-30
Oracle 行转列 动态出转换的列
2019-04-30
Oracle 显式游标的简单案例
2019-04-30
Oracle字符串分隔符替换(替换奇数个或偶数个)
2019-04-30
Oracle 利用 UTL_SMTP 包发送邮件
2019-04-30
Oracle 自定义函数实现split功能,支持超长字符串和clob类型的分隔
2019-04-30
Oracle 的循环中的异常捕捉和处理
2019-04-30
Oracle通过pivot和unpivot配合实现行列转换
2019-04-30
给Oracle数据库换一个1522端口的监听
2019-04-30
Excel表格数据生成ECharts图表
2019-04-30
阿里云短信服务python版,pyinstaller打包运行时缺少文件
2019-04-30
Oracle的pfile和spfile的一点理解和笔记
2019-04-30
WebService的简单案例记录(Java)
2019-04-30
Html利用PHP与MySQL交互
2019-04-30