前言
之前的文章spring-boot集成RabbitMQ介绍了spring-boot如何集成RabbitMQ。本篇文章是spring-boot集成MQ的姊妹篇,看看spring-boot如何集成常用的ActiveMQ.
JMS
JMS是什么
百度百科:
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
简单的说,JMS是j2ee技术中的提供了访问消息中间件的规范。它的常见实现如ActiveMQ。
JMS API简介
JMS 规范
和 javax.jms
包定义一组可供 Java 应用程序用于执行消息传递操作的接口。以下列表概括了主要的 JMS 接口:
- Destination:
Destination 对象是应用程序将消息发往的位置和/或应用程序从其接收消息的源。
- ConnectionFactory:
ConnectionFactory 对象包括连接的一组配置属性。应用程序使用连接工厂来创建连接。
- Connection:
Connection 对象包括应用程序与消息传递服务器的活动连接。应用程序使用连接来创建会话。
- Session:
Session 对象是用于发送和接收消息的单个线程上下文。应用程序使用会话来创建消息、消息生产者和消息使用者。会话是事务性或非事务性会话。
- Message:
Message 对象包括应用程序发送或接收的消息。
- MessageProducer:
应用程序使用消息生产者将消息发送到目标。
- MessageConsumer:
应用程序使用消息使用者来接收已发送到目标的消息。
JMS对象及其关系:
ActiveMQ介绍
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
特性:
- 支持多种语言和协议。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
- 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
- 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去
- 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 设计上保证了高性能的集群,客户端-服务器,点对点
ActiveMQ安装
通过国外地址下载通常比较慢,所以我们一般通过国内的镜像地址下载。
一些常用的apache其下的国内镜像地址:
- https://mirrors.hust.edu.cn/apache/
- https://mirrors.shu.edu.cn/apache/
- https://mirrors.tuna.tsinghua.edu.cn/apache/
通过国内镜像地址下载
wget http://mirror.bit.edu.cn/apache/activemq/5.14.5/apache-activemq-5.14.5-bin.tar.gz
解压:
tar xf apache-activemq-5.14.5-bin.tar.gz
启动:
cd apache-activemq-5.14.5\bin\
注意:如果一台机器启动RabbitMQ以及ActiveMQ,要注意端口冲突问题。 修改ActiveMQ端口,找到..\apache-activemq-5.x.x\conf\
目录下的activemq.xml
,修改<transportConnectors>
标签下的uri
属性,比如:
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
如何查看RabbitMQ的端口号?打开web控制台查看即可。
集成置ActiveMQ
使用过程与RabbitMQ
类似,三步完成:
- maven依赖
- 配置ActiveMQ
- 编写代码及单元自测
maven依赖
<!-- activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置ActiveMQ
在application.yml
里添加几行即可:
spring:
activemq:
broker-url: tcp://127.0.0.1:61616 #activeMQ地址
user: admin #用户名
password: admin #密码
in-memory: true #是否启用内存模式(也就是不安装MQ,项目启动时同时也启动一个MQ实例
pool:
enabled: false #是否替换默认的connectionFactory
packages:
trust-all: true #信任所有的包
编写代码及单元自测
一. 配置一个hello
队列:
@Configuration
public class MessageQueueConfig {
@Bean
public javax.jms.Queue helloQueueByActiveMQ() {
return new ActiveMQQueue("hello");
}
}
二. 消息推送端,向hello
队列发送一条消息:
@Component
@Slf4j
public class HelloActiveMQSender {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
public void send(String content) {
log.info("ActiveMQ send : {}", content);
this.jmsMessagingTemplate.convertAndSend("hello", content);
}
}
三. 消息消费端,消费一条hello
队列里的消息:
@Slf4j
@Component
@EnableJms
public class HelloActiveMQConsumer {
@JmsListener(destination = "hello")
public void process(String hello) {
log.info("ActiveMQ receiver : {}", hello);
}
}
四. 单元测试发送一条消息:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = App.class)
public class HelloActiveMQSenderTest {
@Autowired
private HelloActiveMQSender sender;
@Test
public void sendHello() {
sender.send("Hello " + new Date());
}
}
输出结果,可以确认消息发送并且被消费了:
2018-07-17 16:56:16.320 [main] INFO com.crw.mq.HelloActiveMQSender - ActiveMQ send : Hello Tue Jul 17 16:56:16 CST 2018
2018-07-17 16:56:16.355 [DefaultMessageListenerContainer-1] INFO com.crw.mq.HelloActiveMQConsumer - ActiveMQ receiver : Hello Tue Jul 17 16:56:16 CST 2018