本文共 3172 字,大约阅读时间需要 10 分钟。
步骤1
- 安装erlang
- 安装rabbitMQ
- 做好准备,连接成功,访问
这些我就不多说了,rabbitmq可以去官网下载,每个版本都有对应的erlang版本说明,这里推荐下载安装,另外我还参考了:这个博客
讲一下我遇到的问题:由于我第一次安装了6.2版本,与我的rabbitmq版本不匹配,但是卸载了之后,安装对应版本,依旧不可以,报错:Your installed version of Erlang (6.2) is too old. Please install a more recent version
解决:erlang卸载的时候用自带的进行卸载,我之前是用的window自带的卸载,linux系统就不说了
这一切准备好之后,进入预备状态
如果你还不知道什么是rabbitMQ,可以参考以及官方网站
七种消息模型,随后我会用代码实现
步骤2
在新建的maven项目中加入依赖
com.rabbitmq amqp-client 5.7.2
开启本地的rabbitmq服务,首先点击admin---》add a user---》分配权限
开启服务的时候我遇到的问题:此时已经按照这个博客配置好,但是点击在这里的时候我报了completed with 0 plugins.rabbitmq(正常是3)
解决:首先关闭服务,然后运行rabbitmq-plugins enable rabbitmq_management命令在sbin目录下
步骤3
模型1:点对点通信
首先建立连接工具类
public class ConnectionUtil { private static ConnectionFactory connectionFactory; static { connectionFactory = new ConnectionFactory(); //设置连接rabbitmq主机 connectionFactory.setHost("localhost"); //设置端口号,TCP通信的端口号的5672 connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123"); } public static Connection connection() { try { return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; } public static void closeConnectionAndChannel(Channel channel, Connection connection) { try { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } }}
建立生产者
//生产消息 @Test public void testSendMessage() throws IOException, TimeoutException { //获取连接对象 Connection connection = ConnectionUtil.connection(); //获取连接通道 Channel channel = connection.createChannel(); //通道绑定对应消息队列 //参数1:队列名称 如果不存在自动创建 //参数2:用来定义队列特性是否需要持久化 true 持久化队列 false 不持久化 //参数3:exclusive 是否独占队列 //参数4:autoDelete:在消费完是否自动展出队列 channel.queueDeclare("hello",false, false, false, null); //发布消息 channel.basicPublish("","hello", null, "hello rabbitmq".getBytes()); ConnectionUtil.closeConnectionAndChannel(channel, connection); }
这时候可以发送一下消息,看看在这里有没有变成1,以此来看下是否连接成功,如果消息拒绝了,就返回前面看看是不是服务器什么的写错了,还是权限不够
建立消费者
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.connection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); //消费消息 //参数1:消费的那个队列名称 //参数2:开始消息的自动确认机制 //参数3:消费时的回调接口 channel.basicConsume("hello", true, new DefaultConsumer(channel){ @Override //最后一个参数:消息队列中取出的消息 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("new String(body) = " + new String(body)); } }); }
执行后看是否消费消息,这里我没有进行close操作,会一直监听生产者生产的消息队列,一旦有消息就会进行消费,我这里默认每5秒刷新一次
注:同一个通道可以给多个队列发送消息,消费者和生产者队列特性要严格一致
转载地址:https://blog.csdn.net/hou_shiyu/article/details/112563589 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!