1. Messaage Properties
ActiveMQ支持很多消息属性,具体可以参考
http://activemq.apache.org/activemq-message-properties.html
常见得一些属性说明:
1. queue得消息默认是持久化得
2. 消息得优先级默认是4.
3. 消息发送时设置了时间戳。
4. 消息的过期时间默认是永不过期,过期的消息进入DLQ,可以配置DLQ及其处理策略。
5. 如果消息是重发的,将会被标记出来。
6. JMSReplyTo标识响应消息发送到哪个queue.
7. JMSCorelationID标识此消息相关联的消息id,可以用这个标识把多个消息连接起来。
8. JMS同时也记录了消息重发的次数。默认是6次
9. 如果有一组相关联的消息需要处理,可以分组;只需要设置消息组的名字和这个消息的第几个消息。
10. 如果消息中一个事务环境,则TXID将会被设置。
11. 此外ActiveMQ在服务器端额外设置了消息入队和出队的时间戳。
12. ActiveMQ里消息属性的值,不仅可以用基本类型,还可以用List或Map类型
2. Advisory Message
Advisory Message是ActiveMQ自身的系统消息地址,可以监听该地址来获取activemq的系统消息。目前支持获取如下信息:
1: consumers, producers和connections的启动和停止
2. 创建和销毁temporary destinations
3. toppics 和queues 的消息过期
4. brokers发送消息给destination,但是没有consumers
5. connections启动和停止
说明:
1. 所有advisory的topic,前缀是:ActiveMQ.Advisory
2. 所有Advisory的消息类型是:‘Advisory’,所有的Advisory都有的消息属性有:originBrokerId,originBrokerName,originBrokerURL
3. 具体支持的topic和queue,请参照:
http://activemq.apache.org/advisory-message.html
Advisory功能默认是关闭的,打开Advisorie的具体实现如下:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" advisoryForConsumed="true"/> </policyEntries> </policyMap> </destinationPolicy>
开启之后启动ActiveMQ
查看控制台:
已经可以看到以ActiveMQ.Advisory为前缀的topic了,
监听ActiveMQ.Advisory.Producer.Queue.my-queue,实现如下:
package com.wangx.activemq; import com.wangx.activemq.util.MQUtil; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMessage; import javax.jms.*; public class MessageReceiver { /** * topic名字 */ private static final String QUEUENAME = "ActiveMQ.Advisory.Producer.Queue.my-queue"; public void receive() throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //获取Session //创建队列 Topic queue = session.createTopic(QUEUENAME); //创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue); //监听生产者信息 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //将类型转换成ActiveMQMessage ActiveMQMessage activeMQMessage = (ActiveMQMessage)message; try { //打印message System.out.println(activeMQMessage.getMessage()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); } public static void main(String[] args) throws JMSException { MessageReceiver messageReceiver = new MessageReceiver(); messageReceiver.receive(); } }
当生产者发送消息时,将会打印如下信息:
ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:DESKTOP-A6T5N2R-57210-1541398368987-1:1:0:0:82, originalDestination = null, originalTransactionId = null, producerId = ID:DESKTOP-A6T5N2R-57210-1541398368987-1:1:0:0, destination = topic://ActiveMQ.Advisory.Producer.Queue.my-queue, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1541399457800, brokerOutTime = 1541399457800, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1098f4d, dataStructure = ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:DESKTOP-A6T5N2R-57597-1541399457549-1:1:1:1, destination = queue://my-queue, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0}, redeliveryCounter = 0, size = 0, properties = {producerCount=1, originBrokerName=localhost, originBrokerURL=tcp://DESKTOP-A6T5N2R:61616, originBrokerId=ID:DESKTOP-A6T5N2R-57210-1541398368987-0:1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:DESKTOP-A6T5N2R-57210-1541398368987-1:1:0:0:83, originalDestination = null, originalTransactionId = null, producerId = ID:DESKTOP-A6T5N2R-57210-1541398368987-1:1:0:0, destination = topic://ActiveMQ.Advisory.Producer.Queue.my-queue, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1541399457890, brokerOutTime = 1541399457892, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1385d8ba, dataStructure = RemoveInfo {commandId = 0, responseRequired = true, objectId = ID:DESKTOP-A6T5N2R-57597-1541399457549-1:1:1:1, lastDeliveredSequenceId = -2}, redeliveryCounter = 0, size = 0, properties = {producerCount=0, originBrokerName=localhost, originBrokerURL=tcp://DESKTOP-A6T5N2R:61616, originBrokerId=ID:DESKTOP-A6T5N2R-57210-1541398368987-0:1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
总结Advisory的使用方式:
1. 要在配置文件里面开启Advisories.
2. 消息发送端没什么变化,不做多余改变或配置,
3. 消息接收端:
1)根据你要接收的消息类型,来设置不同的topic,当然也可以使用AdvisorySupport这个类来辅助创建,比如你想要得到消息生产者的信息,你可以:
Topic queue = session.createTopic(QUEUENAME);
Destination destination = AdvisorySupport.getProducerAdvisoryTopic(queue);
2)由于这个topic默认不是持久化的,所有要先看起接收端,然后再发送消息。
3) 接收消息的时候,接收到的消息类型是ActiveMQMessage,所以需要先转换成ActiveMQMessage,然后再通过getDataStructure方法来得到具体的信息对象。
代码如下:
//将类型转换成ActiveMQMessage ActiveMQMessage activeMQMessage = (ActiveMQMessage)message; try { //打印message System.out.println(activeMQMessage.getDataStructure()); session.commit(); } catch (JMSException e) { e.printStackTrace(); }
这样可以可以拿到相关信息,
控制台如下:
ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:DESKTOP-A6T5N2R-58264-1541401841935-1:1:1:1, destination = queue://my-queue, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0} RemoveInfo {commandId = 0, responseRequired = true, objectId = ID:DESKTOP-A6T5N2R-58264-1541401841935-1:1:1:1, lastDeliveredSequenceId = -2}
2. 延迟和定时消息投递
延迟和定时消息传递(Delay and schedule Message Delivery)
有时候我们不希望消息马上被broker投递出去,而是想要消息60s以后发送给消费者,或者是我们想要让消息每隔一段时间投递一次,一共投递指定的次数。。。类似这种需求,ActiveMQ提供了一种broker端消息定时调度机制。
我们只需要把几个描述消息定时调度的方式参数作为属性添加到消息,broker端的调度器酒睡按照我们想要的行为去处理消息。当然需要再xml中配置schedulerSupport属性为true,
一共四个属性
AMQ_SCHEDULED_DELAY: 延迟投递的时间
AMQ_SCHEDULED_PERIOD: 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT:重复投递次数
AMQ_SCHEDULED_CRON: Cron表达式
ActiveMQ也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,可以使用这个类来辅助设置,使用例子如下:延迟60s
在broker上设置schedulerSupport=”true”,然后使用如下代码设置:
TextMessage textMessage = session.createTextMessage("message" + i); long time = 30 * 1000; textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
这样,当生产者发送消息之后,消费者不会马上收到消息,而是会等待30s之后才会开始接收消息
延迟10s,投递3次,间隔5秒的例子
TextMessage textMessage = session.createTextMessage("message" + i); long delay = 10 * 1000; long period= 5 * 1000; int repeat = 3; textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); messageProducer.send(textMessage);
使用CRON表达式,每个小时发送一次
textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"0 * * * *");
CRON表达式的优先级高于另外三个参数,如果在设置了延时时间,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔period,就是说设置的是叠加效果,例如每小时都会发生消息被投递10次,延迟0秒开始,每次间隔1秒。