循序渐进ActiveMQ(1)----HelloWorld
发布日期:2021-06-30 13:45:30 浏览次数:2 分类:技术文章

本文共 7308 字,大约阅读时间需要 24 分钟。

mq部署方式:单节点、集群、主从

介绍

当前,corba、docm、rmi等rpc中间件技术已经广泛应用于各个领域。但是面对

规模和复杂度都越来搞的分布式系统,这些技术也显示出其局限性:
(1)同步通信,客户发出调用后,必须等待服务对象完成处理并返回结果才能继续执行;
(2)客户和服务对象的生命周期紧密耦合:客户进程和服务对象进行都必须正常运行;如果由于
服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常;
(3)点对点通信:客户的一次调用只发送给某个单独的目标对象。
面向消息的中间件(Message Oriented middleware,MOM)较好的解决了以上问题。
发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。
1 这种模式下,发送和接收是异步的,发送者无需等待;
2 二者的生命周期未必相同:发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行:
3 一对多通信:对于一个消息可以有多个接收者。
JAVA消息服务(JMS)定义了java中访问消息中间件的接口。jms只是接口,并没有给予实现,实现了jms接口的
消息中间件称为jms provider,已有的MOM系统包括Apache的ActiveMQ、阿里的RocketMQ、IBM的MQSeries、
Microsoft的MSMQ和bEA的MessageQ、RabbitMQ等等,它们基本都遵循JMS规范

 

ActiveMq是Apache出品,最流行的,能力强劲的开源消息总线。

ActiveMq是一个完全支持jms1.1,和j2ee1.4规范的jms provider的实现。
尽管jms规范出台很久了,但是jms依然在当今的j2ee应用中扮演着特殊的角色。
可以说ActiveMq在业界应用最广泛,当然如果想要有更强大的性能和海量数据处理能力,
ActiveMq还需不断的升级版本,80%以上的业务,我们使用ActiveMq是可以满足需求的,
当然后续如淘宝、天猫及双11这种特殊时间,ActiveMq需要进行很复杂的优化源码及架构设计才能完成。
RocketMq是阿里巴巴的一个更强大的分布式消息中间件。而ActiveMq是核心和基础,所以需要掌握好。

 

 

JMS规范

消息中间件需要实现JMS接口:

  • Provider(MessageProvider):生产者
  • Consumer(MessageConsumer):消费者

 

  • PTP:Point To Point,即点对点的消息模型
  • Pub/Sub:Publish/Subscribe,即发布/订阅的消息模型

 

  • Queue:队列目标
  • Topic:主题目标

 

  • ConnectionFactory:连接工厂。JMS用它创建连接。
  • Connection:JMS客户端到JMS Provider的;连接。
  • Destination:消息的目的地。
  • Session:会话,一个发送或者接收消息的线程

 

ConnectionFactory接口(连接工厂)

用户用来创建到JMS提供者的连接的【被管对象】。jms客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或主题连接工厂。

 

Connection接口(连接)

连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂以后,就可以创建一个与jms提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。

 

Destination接口(目标)

目标是一个包装了消息目标标识符的【被管对象】,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。

JMS管理员创建这些对象,然后用户通过jndi发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的queue,以及发布者/订阅者模型的Topic
 

MessageConsumer接口(消息消费者)

由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

 

MessageProducer接口(消息生产者)

由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用发送者,在发送消息时指定目标。

 

Message接口(消息)

是在消费者和生产者之间传送的对象,也就是说从一个应用程序送到另一个应用程序。一个消息有三个主要部分:

消息头(必须):包含用于识别和消息寻找路由的操作设置。
一组消息属性(可选):包括额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)
一个消息体(可选):允许用户创建五种类型的消息(文本消息、映射消息、字节消息、流消息、对象消息)
消息接口非常灵活,并提供了许多方式来定制消息的内容。
JMS顶一个五种不过不通过的消息正文格式,一级调用的消息类型,允许你发送并接收一些不通过形式的数据,提供现有消息格式的一些级别的兼容性。

  • StreamMessage java原始的数据流
  • MapMessage 一套名称-值对
  • TextMessage 一个字符串对象
  • ObjectMessage 一个序列化的java对象
  • ByteMessage 一个未解释字节的数据流

 

Session接口(会话)

表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,也就是说消息是按照发送的顺序一个一个接收的。

会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。

一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

 

使用方式

去官网下载:,可以下载最新的,这里使用apache-activemq-5.11.1-bin.zip

使用windows系统改来操作下。现在已经更新到了ActiveMQ 5.15.12 (March 9, 2020)

目录结构

加压缩,查看目录结构:

 
 
        activemq内置jetty容器,activemq可以由不同的存储方式持久化,比如jdbc入库,webapps下是管控台代码,直接运行bin下bat文件就可以把管控台部署运行起来。
 
http://localhost:8161/  默认端口8161,在jetty.xml文件中可以改端口

在jetty-realm.properties可以改用户密码

 

开机启动

在D:\apache-activemq-5.15.12\bin下创建start.bat,如下:

@echo off	start /d "D:\apache-activemq-5.15.12\bin\win64"  activemq.bat@echo

打开Win+R-->gpedit.msc,按图操作:

确定。

或者直接点击InstallService.bat:

开始编码

我们实现一个消息发送者Sender和消息接收者Receiver,发送者生产消息发送到activemq,启动接收者去消费消息

Sender:

package jeff.mq.helloworld;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/** * @author jeffSheng * 2018年7月3日 */public class Sender {		public static void main(String[] args) throws Exception {		/**		 * 第一步:		 * 建立ConnectionFactory工厂对象,需要填入用户名、密码、及要连接的地址,均		 * 使用默认即可,默认端口“tcp://localhost:61616”		 */			ConnectionFactory connectionFactory = 					new ActiveMQConnectionFactory(							ActiveMQConnectionFactory.DEFAULT_USER, 							ActiveMQConnectionFactory.DEFAULT_PASSWORD, 							"tcp://localhost:61616");				/**		 * 第二步:		 * 通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法		 * 开启连接,Connection连接默认是关闭的。		 */		Connection connection = connectionFactory.createConnection();		connection.start();				/**		 * 第三步:		 * 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务,		 * 参数配置2为签收模式,一般我们设置自动签收。		 */		//我们这里不开启事务		Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);				/**		 * 第四步:		 * 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象,		 * 在ptp模式中,Destination被称作Queue即队列;在Pub/Sub模式中Destination被称作Topic即主题		 * 在程序众包给可以使用多个Queue和Topic		 */		Destination destination = session.createQueue("queue1");						/**		 * 第五步:		 * 我们需要通过Session对象常见消息的发送和接收对象(生产者和消费者)		 * MessageProcuder/MessageConsumer		 */		MessageProducer messageProducer = session.createProducer(destination);				/**		 * 第六步:		 * 我们可以使用MessageProducer的setDeliverMode方法为其设置持久化特性和非持久化特性(DeliverMode)		 */		//如果设置为持久话方式,我们需要指定具体持久话策略,比如jdbc持久化到数据库		messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);				/**		 * 第七步:		 * 最后我们使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据,		 * 同理客户端使用receive方法进行接收数据。最后不要忘记关闭Connection		 */		for (int i = 0; i <= 5; i++) {			TextMessage textMessage = session.createTextMessage();			textMessage.setText("我是消息,Id:"+i);			messageProducer.send(textMessage);			System.out.println("生产者:"+textMessage.getText());		}						//关闭方法会递归向下关闭会话等连接		if(connection!=null){			connection.close();		}			}	}

启动打印:

生产者:我是消息,Id:0生产者:我是消息,Id:1生产者:我是消息,Id:2生产者:我是消息,Id:3生产者:我是消息,Id:4生产者:我是消息,Id:5

消息已经发送到activemq消息服务器了,我们看下:

 接下来我们启动消费者Receiver:

package jeff.mq.helloworld;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/** * @author jeffSheng * 2018年7月3日 */public class Receiver {		public static void main(String[] args) throws Exception {		/**		 * 第一步:		 * 建立ConnectionFactory工厂对象,需要填入用户名、密码、及要连接的地址,均		 * 使用默认即可,默认端口“tcp://localhost:61616”		 */			ConnectionFactory connectionFactory = 					new ActiveMQConnectionFactory(							ActiveMQConnectionFactory.DEFAULT_USER, 							ActiveMQConnectionFactory.DEFAULT_PASSWORD, 							"tcp://localhost:61616");				/**		 * 第二步:		 * 通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法		 * 开启连接,Connection连接默认是关闭的。		 */		Connection connection = connectionFactory.createConnection();		connection.start();				/**		 * 第三步:		 * 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务,		 * 参数配置2为签收模式,一般我们设置自动签收。		 */		//我们这里不开启事务		Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);				/**		 * 第四步:		 * 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象,		 * 在ptp模式中,Destination被称作Queue即队列;在Pub/Sub模式中Destination被称作Topic即主题		 * 在程序众包给可以使用多个Queue和Topic		 */		Destination destination = session.createQueue("queue1");						/**		 * 第五步:		 * 我们需要通过Session对象常见消息的发送和接收对象(生产者和消费者)		 * MessageProcuder/MessageConsumer		 */		MessageConsumer messageConsumer = session.createConsumer(destination);		while(true){			TextMessage msg = (TextMessage)messageConsumer.receive();			if(msg==null)break;			System.out.println("收到内容: "+msg.getText());		}		//关闭方法会递归向下关闭会话等连接		if(connection!=null){			connection.close();		}			}	}

 

 启动并打印:

 

收到内容: 我是消息,Id:0收到内容: 我是消息,Id:1收到内容: 我是消息,Id:2收到内容: 我是消息,Id:3收到内容: 我是消息,Id:4收到内容: 我是消息,Id:5

再观察下消息服务器:

 

消费者receive方法是同步PULL拉取模式,有三种重载方法:

 

receive():拉取时阻塞的直到获得消息的

receive(long arg):拉取时最多阻塞一段时间的

receiveNoWait():拉取时不阻塞的,有则返回,无则返回null

 

转载地址:https://jeffsheng.blog.csdn.net/article/details/80906521 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:循序渐进ActiveMQ(2)----activemq的安全认证,连接,会话,事务,签收模式及优先级
下一篇:网络编程之每天学习一点点[day15]-----netty实现http协议

发表评论

最新留言

初次前来,多多关照!
[***.217.46.12]2024年04月26日 09时13分16秒