`
luotuoass
  • 浏览: 639346 次
文章分类
社区版块
存档分类
最新评论

ActiveMQ (二) 使用Queue或者Topic发送/接受消息

 
阅读更多
ActiveMQ () 使用Queue或者Topic发送/接受消息

本篇主要讲解在未使用其他框架(Spring)整合情况下,独立基于ActiveMQ,使用JMS规范进行消息通信。

.JMS回顾
Java Message Service (JMS)
sun提出来的为J2EE提供企业消息处理的一套规范,JMS目前有2套规范还在使用JMS <chsdate year="1899" month="12" day="30" islunardate="False" isrocdate="False" w:st="on">1.0.2</chsdate>b1.1. 1.1已经成为主流的JMS Provider事实上的标准了.
1.1
主要在session上面有一些重要改变,比如支持建立同一session上的transaction,让他支持同时发送P2P(Queue)消息和接受
Topic
消息。

JMS中间主要定义了2种消息模式Point-to-Point (点对点), Publich/Subscribe Model (发布/订阅者) 其中在Publich/Subscribe 模式下又有Nondurable subscriptiondurable subscription (持久化订阅)2种消息处理方式。

下面是JMS规范基本的接口和实现
JMS Common Interface PTP-Specific Interface Pub/Sub-specific interfaces
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopiSession
MessageProducer QueueSender TopicPublisher
MessageConsumerQueueReceiver/QueueBrwer TopicSubscriber

.使用Queue

  1. 下面以ActiveMQ example的代码为主进行说明

  1. 使用ActiveMQConnectionConnectionFactory 建立连接,注意这里没有用到pool

//建立Connection

  1. protectedConnectioncreateConnection()throwsJMSException,Exception{

  2. ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(user,pwd,url);

  3. Connectionconnection=connectionFactory.createConnection();

  4. if(durable&&clientID!=null){

  5. connection.setClientID(clientID);

  6. }

  7. connection.start();

  8. returnconnection;

  9. }

//建立Session

protectedSessioncreateSession(Connectionconnection)throwsException{

  1. Sessionsession=connection.createSession(transacted,ackMode);

  2. returnsession;

  3. }

2。发送消息的代码
//
建立QueueSession

protectedMessageProducercreateProducer(Sessionsession)throwsJMSException{

  1. Destincationdestination=session.createQueue("queue.hello");

  2. MessageProducerproducer=session.createProducer(destination);

  3. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

  4. if(timeToLive!=0)

  5. producer.setTimeToLive(timeToLive);

  6. returnproducer;

  7. }

//使用Producer发送消息到Queue
producer.send(message);

3。接受消息,JMS规范里面,你可以使用

  1. QueueReceiver/QueueBrowser直接接受消息,但是更多的情况下我们采用消息通知方式,即实现MessageListener接口

  2. publicvoidonMessage(Messagemessage){

  3. //processmessage

  4. }

  5. //setMessageListner,receivemessage

  6. Destincationdestination=session.createQueue("queue.hello");

  7. consumer=session.createConsumer(destination);

  8. consumer.setMessageListener(this);


以上就是使用jms queue发送接受消息的基本方式


Topic

1. 建立连接

java 代码

  1. protectedConnectioncreateConnection()throwsJMSException,Exception{

  2. ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(user,pwd,url);

  3. Connectionconnection=connectionFactory.createConnection();

  4. //如果你要使用DurableSubScription方式,你必须为connection设置一个ClientID

  5. if(durable&&clientID!=null){

  6. connection.setClientID(clientID);

  7. }

  8. connection.start();

  9. returnconnection;

  10. }

2. 建立Session

java 代码

  1. protectedSessioncreateSession(Connectionconnection)throwsException{

  2. Sessionsession=connection.createSession(transacted,ackMode);

  3. returnsession;

  4. }

创建Producer 发送消息到Topic

//createtopiconsession

  1. topic=session.createTopic("topic.hello");

  2. producer=session.createProducer(topic);

  3. //sendmessage

  4. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

  5. producer.send(message);

创建Consumer接受消息(基本上和Queue相同)

  1. Destincationdestination=session.createTopic("topic.hello");

  2. MessageConsumerconsumer=session.createConsumer(destination);

  3. consumer.setMessageListener(this);

  4. //如果你使用的是DurableSubscription方式,你必须在建立connection的时候

  5. //设置ClientID,而且建立comsumer的时候使用createDurableSubscriber方法,为他指定一个consumerName

  6. //connection.setClientID(clientId);

  7. //consumer=session.createDurableSubscriber((Topic)destination,consumerName);

:连接ActiveMQ的方式
ActiveMQConnectionFactory
提供了多种连接到Broker的方式activemq.apache.org/uri-protocols.html

常见的有
vm://host:port //vm
tcp://host:port //tcp
ssl://host:port //SSL
stomp://host:port //stomp
协议可以跨语言,目前有很多种stomp client (java,c#,c/c++,ruby,python...);

activemq例子代码发送Message消息

完整的示例程序:

发送TextMessage

public class SendMessage {

private static final String url = "tcp://localhost:61616";;

private static final String QUEUE_NAME = "choice.queue";

protected String expectedBody = "<hello>world!</hello>";

public void sendMessage() throws JMSException{

Connection connection = null;

try{

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

connection = connectionFactory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createQueue(QUEUE_NAME);

MessageProducer producer = session.createProducer(destination);

TextMessage message = session.createTextMessage(expectedBody);

message.setStringProperty("headname", "remoteB");

producer.send(message);

}catch(Exception e){

e.printStackTrace();

}finally{

connection.close();

}

}

*********************************************************************

发送BytesMessage

public class SendMessage {

private String url = "tcp://localhost:61616";

public void sendMessage() throws JMSException{

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

Connection connection = connectionFactory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createQueue("test.queue");

MessageProducer producer = session.createProducer(destination);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

BytesMessage message = session.createBytesMessage();

byte[] content = getFileByte("d://test.jar");

message.writeBytes(content);

try{

producer.send(message);

System.out.println("successful send message");

}catch(Exception e){

e.printStackTrace();

e.getMessage();

}finally{

session.close();

connection.close();

}

}

private byte[] getFileByte(String filename){

byte[] buffer = null;

FileInputStream fin = null;

try {

File file = new File(filename);

fin = new FileInputStream(file);

buffer = new byte[fin.available()];

fin.read(buffer);

} catch

分享到:
评论

相关推荐

    Apache ActiveMQ Queue Topic 详解

    Apache ActiveMQ Queue Topic 详解 教程 加入代码解释说明

    一个jms activemq Topic 消息实例

    一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...

    Queue与Topic的比较

    Queue与Topic的比较,学习JMS和activemq必须看的资料之一

    springboot2整合activemq的demo内含queue消息和topic消息

    springboot2整合activemq的demo内含queue消息和topic消息,需要使用者修改application.yml的连接地址信息就可以了,端口号是默认的可以不用修改

    spring集成activemq演示queue和topic 持久化

    本人在学习activemq,然后 测试完成的demo, 包含了queue,topic,持久化到mysql,订阅模式,包好用

    ActiveMQ的队列、topic模式

    ActiveMQ的两种消息模式的例子,queue和topic,注释很详细,告诉你每步做什么,非springboot整合

    Springboot整合ActiveMQ(Queue和Topic两种模式)

    文章目录ActiveMQ简介1、ActiveMQ简介2、ActiveMQ下载SpringBoot整合ActiveMQ1、新建SpringBoot项目2、项目结构3、相关配置信息4、ActiveMQ配置类Queue队列模式1、队列生产者2、队列消费者3、测试效果Topic模式1、...

    详解Springboot整合ActiveMQ(Queue和Topic两种模式)

    主要介绍了详解Springboot整合ActiveMQ(Queue和Topic两种模式),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

    springboot整合activemq案例

    springboot整合activemq案例,queue,topic两种模式 定时运行和controller请求运行两种方式

    Springboot-activeMQ

    ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS ...activemq的queue和topic  JMS中定义了两种消息模型:  点对点(point to point, queue)  发布/订阅(publish/subscribe,topic)。

    消息队列学习(springboot+kafka+activemq)

    启动应用即可测试,可帮助快速了解kafka、activemq 两者在 Queue topic producer consumer 使用异同点,demo仅仅是最简化代码,演示通信和使用,无法对两者的集群模式进行测试,如果有对集群模式有兴趣,可自行扩展...

    spring-boot-activemq-demo

    spring boot activemq集成示例,包含queue和topic消息的发送、接收,连接池的支持。

    Spring 实现远程访问详解——jms和activemq

    把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。 JMS 支持两种消息传递模型: 点对点(point-to-point,简称 PTP)...

    JAVA编程之Spring boot-activeMQ示例

    # Springboot-activeMQ 本项目基于Spring boot这一平台,整合流行的开源消息队列中间件ActiveMQ,实现一个向ActiveMQ添加和读取消息的功能。...3.主题类型topic,创建主题,生产者发送主题消息,以及消费着消费主题消息

    python 发送和接收ActiveMQ消息的实例

    ActiveMQ是java开发的消息中间件服务。可以支持多种协议(AMQP,MQTT,OpenWire,Stomp),默认的是OpenWire。而python与ActiveMQ的通信使用的是Stomp协议。而如果你的服务没有开启则需要配置开启。 首先需要安装python...

    activemq-web-console-5.11.2

    1.一个是admin,用来显示和管理所有的queue、topic、connection等等。 2.一个是demo,有一些使用jms和activemq的简单例子。 3.还有一个fileserver,用来支持通过activemq发送文件时的中转服务器。blob message时配置...

    java springboot整合activemq工程

    #默认情况下activemq提供的是queue模式 true是可以使用topic,false是仅使用queue模式 spring.jms.pub-sub-domain: true # 设置连接的activemq服务器 spring.activemq.broker-url=failover:(tcp://10.0.1.227:61616,...

    activeMQ JMS 3种创建方式

    activeMQ JMS 3种创建方式 公共方式 QUEUE TOPIC

    activemq+spring实例

    spring 中使用activemq,queue 、topic的同步接收,异步接收的实例,使用的是maven的项目工程!

    ActiveMQ详细入门使用教程_java_MQ_

    MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。?特点:?1、支持多种语言...

Global site tag (gtag.js) - Google Analytics