Fork me on GitHub

spring-boot集成ActiveMQ

前言

之前的文章spring-boot集成RabbitMQ介绍了spring-boot如何集成RabbitMQ。本篇文章是spring-boot集成MQ的姊妹篇,看看spring-boot如何集成常用的ActiveMQ.

JMS

JMS是什么

百度百科:

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

简单的说,JMS是j2ee技术中的提供了访问消息中间件的规范。它的常见实现如ActiveMQ。

JMS API简介

JMS 规范javax.jms 包定义一组可供 Java 应用程序用于执行消息传递操作的接口。以下列表概括了主要的 JMS 接口:

  • Destination:

Destination 对象是应用程序将消息发往的位置和/或应用程序从其接收消息的源。

  • ConnectionFactory:

ConnectionFactory 对象包括连接的一组配置属性。应用程序使用连接工厂来创建连接。

  • Connection:

Connection 对象包括应用程序与消息传递服务器的活动连接。应用程序使用连接来创建会话。

  • Session:

Session 对象是用于发送和接收消息的单个线程上下文。应用程序使用会话来创建消息、消息生产者和消息使用者。会话是事务性或非事务性会话。

  • Message:

Message 对象包括应用程序发送或接收的消息。

  • MessageProducer:

应用程序使用消息生产者将消息发送到目标。

  • MessageConsumer:

应用程序使用消息使用者来接收已发送到目标的消息。

JMS对象及其关系:

ActiveMQ介绍

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

特性:

  • 支持多种语言和协议。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
  • 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  • 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去
  • 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  • 设计上保证了高性能的集群,客户端-服务器,点对点

ActiveMQ安装

通过国外地址下载通常比较慢,所以我们一般通过国内的镜像地址下载。

一些常用的apache其下的国内镜像地址:

通过国内镜像地址下载

wget http://mirror.bit.edu.cn/apache/activemq/5.14.5/apache-activemq-5.14.5-bin.tar.gz

解压:

tar xf apache-activemq-5.14.5-bin.tar.gz

启动:

cd apache-activemq-5.14.5\bin\

注意:如果一台机器启动RabbitMQ以及ActiveMQ,要注意端口冲突问题。 修改ActiveMQ端口,找到..\apache-activemq-5.x.x\conf\目录下的activemq.xml,修改<transportConnectors>标签下的uri属性,比如:

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

如何查看RabbitMQ的端口号?打开web控制台查看即可。

集成置ActiveMQ

使用过程与RabbitMQ类似,三步完成:

  • maven依赖
  • 配置ActiveMQ
  • 编写代码及单元自测

maven依赖

<!-- activemq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

配置ActiveMQ

application.yml里添加几行即可:

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616   #activeMQ地址
    user: admin                         #用户名
    password: admin                     #密码
    in-memory: true                     #是否启用内存模式(也就是不安装MQ,项目启动时同时也启动一个MQ实例
    pool:
      enabled: false                    #是否替换默认的connectionFactory
    packages:
      trust-all: true                   #信任所有的包

编写代码及单元自测

一. 配置一个hello队列:

@Configuration
public class MessageQueueConfig {

    @Bean
    public javax.jms.Queue helloQueueByActiveMQ() {
        return new ActiveMQQueue("hello");
    }

}

二. 消息推送端,向hello队列发送一条消息:

@Component
@Slf4j
public class HelloActiveMQSender {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    public void send(String content) {
        log.info("ActiveMQ send : {}", content);
        this.jmsMessagingTemplate.convertAndSend("hello", content);
    }
}

三. 消息消费端,消费一条hello队列里的消息:

@Slf4j
@Component
@EnableJms
public class HelloActiveMQConsumer {

    @JmsListener(destination = "hello")
    public void process(String hello) {
        log.info("ActiveMQ receiver : {}", hello);
    }
}

四. 单元测试发送一条消息:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = App.class)
public class HelloActiveMQSenderTest {

    @Autowired
    private HelloActiveMQSender sender;

    @Test
    public void sendHello() {
        sender.send("Hello " + new Date());
    }
}

输出结果,可以确认消息发送并且被消费了:

2018-07-17 16:56:16.320 [main] INFO  com.crw.mq.HelloActiveMQSender - ActiveMQ send : Hello Tue Jul 17 16:56:16 CST 2018
2018-07-17 16:56:16.355 [DefaultMessageListenerContainer-1] INFO  com.crw.mq.HelloActiveMQConsumer - ActiveMQ receiver : Hello Tue Jul 17 16:56:16 CST 2018
-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%