Java初识RabbitMQ一交换机(direct exchange)
发布日期:2021-06-30 16:02:37 浏览次数:2 分类:技术文章

本文共 4371 字,大约阅读时间需要 14 分钟。

推荐:

Java初识RabbitMQ一交换机(direct exchange)

首先看看AMQP协议,对RabbitMQ的架构会更了解。

创建一个Maven项目,根据自己服务器RabbitMQ的版本导入相应的包。

在这里插入图片描述

com.rabbitmq
amqp-client
3.6.5

直连交换机

直连交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应绑定键(我先定义为binding Key,其实是队列与交换机绑定的routing key)的队列。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:

1)将一个队列绑定到某个交换机上时,赋予该绑定一个绑定键(binding Key),假设为R。

2)当一个携带着路由键(routing Key)为R的消息被发送给直连交换机时,交换机会把它路由给绑定键为R的队列。

直连交换机的队列通常是循环分发任务给多个消费者(我们称之为轮询)。比如说有3个消费者,4个任务。分别分发每个消费者一个任务后,第4个任务又分发给了第一个消费者。综上,我们很容易得出一个结论,在 AMQP 0-9-1 中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

直连交换机图例:

在这里插入图片描述
当生产者(P)发送消息时 Rotuing key=booking 时,这时候将消息传送给 Exchange,Exchange 获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的 Queue,这时发现 Queue1 和 Queue2 都符合,就会将消息传送给这两个队列。

如果我们以 Rotuing key=create 和 Rotuing key=confirm 发送消息时,这时消息只会被推送到 Queue2 队列中,其他 Routing Key 的消息将会被丢弃。

生产端

package com.kaven.rabbitmq.exchange.directExchange;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class DirectProducer {
// 自己服务器的IP private static String ip = "IP"; // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口 private static int port = 5672; // RabbitMQ有一个 "/" 的虚拟主机 private static String virtualHost = "/"; // direct exchange ,RabbitMQ提供的direct exchange private static String exchangeName = "amq.direct"; // exchange type private static String exchangeType= "direct"; // 交换机路由的routingKey private static String routingKey = "test"; public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建一个连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(ip); connectionFactory.setPort(port); connectionFactory.setVirtualHost(virtualHost); // 2 创建连接 Connection connection = connectionFactory.newConnection(); // 3 创建Channel Channel channel = connection.createChannel(); // 4 发送消息 String msg = "RabbitMQ:Direct Exchange 发送数据"; channel.basicPublish(exchangeName ,routingKey ,null, msg.getBytes()); // 5 关闭连接 channel.close(); connection.close(); }}

消费端

package com.kaven.rabbitmq.exchange.directExchange;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;import java.io.IOException;import java.util.concurrent.TimeoutException;public class DirectConsumer {
// 自己服务器的IP private static String ip = "IP"; // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口 private static int port = 5672; // RabbitMQ有一个 "/" 的虚拟主机 private static String virtualHost = "/"; // direct exchange ,RabbitMQ提供的direct exchange private static String exchangeName = "amq.direct"; // exchange type private static String exchangeType= "direct"; // 队列名 private static String queueName = "queue"; // 队列与交换机绑定的routingKey private static String routingKey = "test"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1 创建一个连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(ip); connectionFactory.setPort(port); connectionFactory.setVirtualHost(virtualHost); // 2 创建连接 Connection connection = connectionFactory.newConnection(); // 3 创建Channel Channel channel = connection.createChannel(); // 4 定义Queue ,将Queue绑定到direct exchange channel.queueDeclare(queueName,true , false , false , null); channel.queueBind(queueName , exchangeName , routingKey); // 5 创建消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 6 设置 channel.basicConsume(queueName , true , consumer); // 7 接收消息 while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println(msg); } }}

测试

因为这里使用的是RabbitMQ已经给我们提供好了的交换机,所以无论先启动生产端还是消费端,消费端都可以成功收到消息,如果使用的交换机之前没有定义过,我们需要先定义好交换机,才能生产和消费消息。

RabbitMQ给我们提供了如下图所示的交换机。

在这里插入图片描述
当然我们也可以自己来定义一个交换机,方法名和参数看下图,这里就不展开了。
在这里插入图片描述
无论先启动生产端还是消费端,我们都可以成功收到消息。
在这里插入图片描述
看看RabbitMQ Management
在这里插入图片描述

转载地址:https://kaven.blog.csdn.net/article/details/104255363 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Java初识RabbitMQ一交换机(topic exchange)
下一篇:Java初识RabbitMQ一交换机(default exchange)

发表评论

最新留言

网站不错 人气很旺了 加油
[***.192.178.218]2024年04月17日 04时32分55秒