ActiveMQ


版本

ActiveMQ 5.11.3

官网下载

第一节 简介与安装

1.1 JMS简介
1.1.1 JMS
JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。

消息模型:

Point-to-Point(P2P) 点对点
Publish/Subscribe(Pub/Sub) 发布订阅
1.1.2 P2P

p1

涉及角色

消息队列(Queue)
发送者(Sender)
接收者(Receiver)
每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P的特点

每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
接收者在成功接收消息之后需向队列应答成功

如果希望发送的每个消息都应该被成功处理的话,那么你需要P2P模式
1.1.3 Pub/Sub

p2

涉及角色

主题(Topic)
发布者(Publisher)
订阅者(Subscriber) 
客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者

Pub/Sub的特点

每个消息可以有多个消费者
发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型
1.1.4 消息的消费
在JMS中,消息的产生和消息是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。 
同步 
订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞 
异步 
订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法
1.1.5 JMS编程模型

ConnectionFactory

创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

Destination

Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。
Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination

Connection

Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

Session

Session是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

消息的生产者

消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

消息消费者

消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

MessageListener

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener

p3

1.2 MQ
1.2.1 消息中间件
消息中间件(MOM:Message Orient middleware)
消息中间件有很多的用途和优点: 
1. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块; 
2. 负责建立网络通信的通道,进行数据的可靠传送。 
3. 保证数据不重发,不丢失 
4. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务
1.2.2 ActiveMQ
MQ英文名MessageQueue,中文名也就是大家用的消息队列,就是一个消息的接受和转发的容器,可用于消息推送。
ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能
1.2.3 特性
1、多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2、完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3、对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去
4、通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic,Tomcat)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
5、支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6、支持通过JDBC和journal提供高速的消息持久化
7、从设计上保证了高性能的集群,客户端-服务器,点对点
8、支持Ajax
9、支持与Axis的整合
10、可以很容易得调用内嵌JMS provider,进行测试
1.2.4 使用场景
1、多个项目之间集成 
  (1) 跨平台 
  (2) 多语言 
  (3) 多项目
2、降低系统间模块的耦合度,解耦 
	(1) 软件扩展性
3、系统前后端隔离 
(	1) 前后端隔离,屏蔽高安全区

第二节 安装和使用

2.1 安装
2.1.1 上传并解压

命令

cd /opt/work
tar -zxvf apache-activemq-5.11.3-bin.tar.gz

p4

bin存放的是脚本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是说明文档
examples存放的是简单的实例
lib存放的是activemq所需jar包
webapps用于存放项目的目录
2.1.2 启动
 /opt/work/apache-activemq-5.11.3/bin/activemq start  启动
 /opt/work/apache-activemq-5.11.3/bin/activemq stop   停止
 在浏览器访问
 http://10.211.55.12:8161/admin/
 账号和密码都是admin

p6

p7

p8

注意:如果主机名称包含下划线、小数点等特殊字符时。启动会失败
2.1.3 无法启动
如果无法启动,查看 data 目录下的 activemq.log 文件查看日志,可能会是因为缺少 commons-dbcp 和 commons-pool 依赖包,我们需要复制依赖包到 lib 目录,注意 lib 下的optional目录有 dbcp2和 pool2依赖包,但是这两个不行,我们需要不带2的包

2.2 基本使用

基于Maven+Idea进行代码编写

2.2.1 pom.xml
 
2.2.2 消息生产者

MQProducer 消息生产者

 
send(Destination destination,Message message,int deliveryMode,int priority,long timeToLive);
参数说明:
1、destination:通过session创建Destination对象,指的是一个客户端用来指定生产的消息目标或消息来源的对象。在PTP模式中,Destination被称作Queue队列,在Pub/Sub模式中Destination被称作topic主题。在程序中可以使用多个Queue或topic
2、message:消息
3、deliveryMode:传送模式,PERSISTENT(默认)和NON_PERSISTENT,如果容忍消息丢失,可以使用NON_PERSISTENT。
4、priority:消息优先级,从0-9十个级别,0-4是普通消息,5-9是加急消息,默认是4。
5、timeToLive:消息过期时间,默认情况下消息永不过期。

Producer_Main 主函数 启动生产者发布消息

 

p9

p12

2.2.3 消息消费者

MQConsumer 消息消费者

 

Consumer_Main 主函数 启动消费消息

 

p11

p13

p14

2.3 消息过滤
2.3.1 消息的同步和异步
消息的同步接收是指:客户端主动去接收消息,客户端课采用MessageConsume的receive方法去接收下一个消息。
消息的异步接收是指:当消息到达MQ服务器时,MQ服务器主动通知客户端,客户点通过注册一个实现MessageListener接口的对象到MessageConsumer。MessageListener只有一个必须实现的方法:onMessage,它只接受一个参数Message。在为每个发送到Destination的消息实现onMessage时,调用该方法。
2.3.2 消息过滤
MessageConsumer是一个由Session创建的对象,用来从Destination接收消息。
其中messageSelector为消息选择器,noLocal标志默认为false,设置为true时,限制消费者只能接受和自己相同连接(connection)所发布的消息,此标志只适用于topic主题模式,不适用于queue队列模式;name标识订阅topic主题所对应的订阅名称,持久订阅时需要设置此参数
选择器检查了传入消息的“JMS_TYPE”属性,并确定了这个属性的值是否等于某个值。如果相等,则消息被消费,如果不相等,那么消息会被忽略。
2.3.3 代码演示

消息生产者

MsgFilterSender 消息生产者

 

Producer_Main 启动生产者发送消息

 

p15

消息监听者

Listener 监听消息

 

消息消费者

MsgFilterConsumer 消息消费者

 

Consumer_Main 启动消息消费

 

p19

p17

p18

2.4 Pub/Sub模式
2.4.1 概述
发布订阅模式有点类似于我们日常生活中订阅报纸。每年到年尾的时候,邮局就会发一本报纸集合让我们来选择订阅哪一个,在这个表里头列了所有出版发行的报纸,那么对于我们每一个订阅者来说,我们可以选择一份或者多份报纸。比如北京日报、潇湘晨报等。那么这些个我们订阅的报纸就相当于发布订阅模式里的topic。有很多个人订阅报纸,也有人可能订阅了和我相同的报纸。那么在这里相当于我们在同一个topic里面注册了。对于一份报纸发行来说,它和所有的订阅者就构成了一个1对多的关系,这种关系如下所示:

p20

2.4.2 示例代码

PubSender 消息生产者

 

Producer_Main

 

p21

SubListener 消息监听器

 

SubConsumer 消息消费

 

Consumer_Main 启动

 

p22 p23

2.5 消息持久化

ActiveMQ消息持久化到Mysql

ActiveMQ提供多种数据持久化方式:可以持久化到文件,也可以持久化到数据库,其中数据库可以支持MySQL、Oracle等。
2.5.1 上传数据库驱动jar包
首先需要把MySql的驱动放到ActiveMQ的Lib目录下
如果前面没有添加 dbcp 和 pool, 现在无法启动,参考上面安装时候的错误方式,查看日志,可能是缺少依赖包
2.5.2 修改配置文件activemq.xml
 

p24

p29

2.5.3 测试
重新启动MQ,就会发现db_case库中多了三张表:activemq_acks,activemq_lock,activemq_msgs,OK,说明已经持久化成功了

p28

2.6 ActiveMQ与Spring整合

准备三个queckstart工程:mq-parent,mq-producer,mq-consumer

2.6.1 mq-parent

根项目,打包方式为pom

pom.xml

 
2.6.2 mq-produce

log4j.xml 日志配置文件

 

mq属性文件 mq.properties

 

spring核心配置 sping.xml

 

spring整合activemq配置 spring-mq.xml

 

MailParam.java 邮件消息模板类

 

MQProducer.java 邮件消息生产者

 

MQProducerTest.java 邮件消息生产测试类

 

运行MQProducerTest,观察MQ服务器

2.6.3 mq-consumer

log4j.xml

 

mq属性文件 mq.properties

 

mail.properties

 

spring.xml

 

spring-mail.xml

 

spring-mq.xml

 

队列监听器

 

邮件发送处理逻辑

 

邮件发送模板

 

测试运行