ActiveMQ (二) 使用Queue或者Topic发送/接受消息
本篇主要讲解在未使用其他框架(Spring)整合情况下,独立基于ActiveMQ,使用JMS规范进行消息通信。
一.JMS回顾
Java Message Service (JMS)是sun提出来的为J2EE提供企业消息处理的一套规范,JMS目前有2套规范还在使用JMS <chsdate year="1899" month="12" day="30" islunardate="False" isrocdate="False" w:st="on">1.0.2</chsdate>b和1.1.
1.1已经成为主流的JMS Provider事实上的标准了.
1.1主要在session上面有一些重要改变,比如支持建立同一session上的transaction,让他支持同时发送P2P(Queue)消息和接受
Topic消息。
在JMS中间主要定义了2种消息模式Point-to-Point
(点对点), Publich/Subscribe Model (发布/订阅者),
其中在Publich/Subscribe
模式下又有Nondurable
subscription和durable subscription (持久化订阅)2种消息处理方式。
下面是JMS规范基本的接口和实现
JMS Common Interface PTP-Specific Interface Pub/Sub-specific
interfaces
ConnectionFactory
QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection
TopicConnection
Destination Queue
Topic
Session
QueueSession
TopiSession
MessageProducer
QueueSender
TopicPublisher
MessageConsumerQueueReceiver/QueueBrwer
TopicSubscriber
二.使用Queue
-
下面以ActiveMQ example的代码为主进行说明
-
使用ActiveMQ的Connection,ConnectionFactory
建立连接,注意这里没有用到pool
//建立Connection
- protectedConnectioncreateConnection()throwsJMSException,Exception{
- ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(user,pwd,url);
- Connectionconnection=connectionFactory.createConnection();
- if(durable&&clientID!=null){
- connection.setClientID(clientID);
- }
- connection.start();
- returnconnection;
- }
//建立Session
protectedSessioncreateSession(Connectionconnection)throwsException{
- Sessionsession=connection.createSession(transacted,ackMode);
- returnsession;
- }
2。发送消息的代码
//建立QueueSession
protectedMessageProducercreateProducer(Sessionsession)throwsJMSException{
- Destincationdestination=session.createQueue("queue.hello");
- MessageProducerproducer=session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- if(timeToLive!=0)
- producer.setTimeToLive(timeToLive);
- returnproducer;
- }
//使用Producer发送消息到Queue
producer.send(message);
3。接受消息,在JMS规范里面,你可以使用
-
QueueReceiver/QueueBrowser直接接受消息,但是更多的情况下我们采用消息通知方式,即实现MessageListener接口
- publicvoidonMessage(Messagemessage){
- //processmessage
- }
-
- //setMessageListner,receivemessage
- Destincationdestination=session.createQueue("queue.hello");
- consumer=session.createConsumer(destination);
- consumer.setMessageListener(this);
以上就是使用jms queue发送接受消息的基本方式
三 Topic
1. 建立连接
java
代码
- protectedConnectioncreateConnection()throwsJMSException,Exception{
- ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(user,pwd,url);
- Connectionconnection=connectionFactory.createConnection();
-
//如果你要使用DurableSubScription方式,你必须为connection设置一个ClientID
- if(durable&&clientID!=null){
- connection.setClientID(clientID);
- }
- connection.start();
- returnconnection;
- }
2. 建立Session
java
代码
- protectedSessioncreateSession(Connectionconnection)throwsException{
- Sessionsession=connection.createSession(transacted,ackMode);
- returnsession;
- }
创建Producer
发送消息到Topic
//createtopiconsession
- topic=session.createTopic("topic.hello");
- producer=session.createProducer(topic);
- //sendmessage
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(message);
创建Consumer接受消息(基本上和Queue相同)
- Destincationdestination=session.createTopic("topic.hello");
- MessageConsumerconsumer=session.createConsumer(destination);
- consumer.setMessageListener(this);
-
-
//如果你使用的是DurableSubscription方式,你必须在建立connection的时候
-
//设置ClientID,而且建立comsumer的时候使用createDurableSubscriber方法,为他指定一个consumerName。
- //connection.setClientID(clientId);
- //consumer=session.createDurableSubscriber((Topic)destination,consumerName);
四:连接ActiveMQ的方式
ActiveMQConnectionFactory 提供了多种连接到Broker的方式activemq.apache.org/uri-protocols.html
常见的有
vm://host:port //vm
tcp://host:port //tcp
ssl://host:port //SSL
stomp://host:port //stomp协议可以跨语言,目前有很多种stomp
client 库(java,c#,c/c++,ruby,python...);
activemq例子代码发送Message消息
完整的示例程序:
发送TextMessage
public class SendMessage {
private static final String url =
"tcp://localhost:61616";;
private static final String QUEUE_NAME =
"choice.queue";
protected String expectedBody =
"<hello>world!</hello>";
public void sendMessage() throws JMSException{
Connection connection = null;
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session =
connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(expectedBody);
message.setStringProperty("headname", "remoteB");
producer.send(message);
}catch(Exception e){
e.printStackTrace();
}finally{
connection.close();
}
}
*********************************************************************
发送BytesMessage
public class SendMessage {
private String url =
"tcp://localhost:61616";
public void sendMessage() throws JMSException{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage message = session.createBytesMessage();
byte[] content =
getFileByte("d://test.jar");
message.writeBytes(content);
try{
producer.send(message);
System.out.println("successful send message");
}catch(Exception e){
e.printStackTrace();
e.getMessage();
}finally{
session.close();
connection.close();
}
}
private byte[] getFileByte(String filename){
byte[] buffer = null;
FileInputStream fin = null;
try {
File file = new File(filename);
fin = new FileInputStream(file);
buffer = new byte[fin.available()];
fin.read(buffer);
} catch
分享到:
相关推荐
Apache ActiveMQ Queue Topic 详解 教程 加入代码解释说明
一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...
Queue与Topic的比较,学习JMS和activemq必须看的资料之一
springboot2整合activemq的demo内含queue消息和topic消息,需要使用者修改application.yml的连接地址信息就可以了,端口号是默认的可以不用修改
本人在学习activemq,然后 测试完成的demo, 包含了queue,topic,持久化到mysql,订阅模式,包好用
ActiveMQ的两种消息模式的例子,queue和topic,注释很详细,告诉你每步做什么,非springboot整合
文章目录ActiveMQ简介1、ActiveMQ简介2、ActiveMQ下载SpringBoot整合ActiveMQ1、新建SpringBoot项目2、项目结构3、相关配置信息4、ActiveMQ配置类Queue队列模式1、队列生产者2、队列消费者3、测试效果Topic模式1、...
主要介绍了详解Springboot整合ActiveMQ(Queue和Topic两种模式),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
springboot整合activemq案例,queue,topic两种模式 定时运行和controller请求运行两种方式
ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS ...activemq的queue和topic JMS中定义了两种消息模型: 点对点(point to point, queue) 发布/订阅(publish/subscribe,topic)。
启动应用即可测试,可帮助快速了解kafka、activemq 两者在 Queue topic producer consumer 使用异同点,demo仅仅是最简化代码,演示通信和使用,无法对两者的集群模式进行测试,如果有对集群模式有兴趣,可自行扩展...
spring boot activemq集成示例,包含queue和topic消息的发送、接收,连接池的支持。
把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。 JMS 支持两种消息传递模型: 点对点(point-to-point,简称 PTP)...
# Springboot-activeMQ 本项目基于Spring boot这一平台,整合流行的开源消息队列中间件ActiveMQ,实现一个向ActiveMQ添加和读取消息的功能。...3.主题类型topic,创建主题,生产者发送主题消息,以及消费着消费主题消息
ActiveMQ是java开发的消息中间件服务。可以支持多种协议(AMQP,MQTT,OpenWire,Stomp),默认的是OpenWire。而python与ActiveMQ的通信使用的是Stomp协议。而如果你的服务没有开启则需要配置开启。 首先需要安装python...
1.一个是admin,用来显示和管理所有的queue、topic、connection等等。 2.一个是demo,有一些使用jms和activemq的简单例子。 3.还有一个fileserver,用来支持通过activemq发送文件时的中转服务器。blob message时配置...
#默认情况下activemq提供的是queue模式 true是可以使用topic,false是仅使用queue模式 spring.jms.pub-sub-domain: true # 设置连接的activemq服务器 spring.activemq.broker-url=failover:(tcp://10.0.1.227:61616,...
activeMQ JMS 3种创建方式 公共方式 QUEUE TOPIC
spring 中使用activemq,queue 、topic的同步接收,异步接收的实例,使用的是maven的项目工程!
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。?特点:?1、支持多种语言...