ActiveMQ(五)


今天本篇为ActiveMQQueue基础使用

 

 

    在我看来ActvieMQQueue是其常用的消息发送模式,其应用性比topic远要来的广(大牛勿喷,公司业务、公司行业决定了topic方式在我接触到的项目中使用并不广泛)。

 

 

1.Topicqueue的技术特点对比

 

Topic

Queue

中文全称

发布订阅消息

点对点

有无状态

topic是无状态的并且数据默认不落地。

queue数据默认会在服务器上以文件形式保存,比如Active MQ默认储存在$AMQ_HOME\data\kr-store\data下,亦可配置成DB存储。

完整性保障

不保证发布者发布的每条数据,订阅者都能接受到。

保证每条数据都能被接收者接收。

消息是否会丢失

一般来说发布者发布消息到某一个订阅消息时,只有正在监听该topic地址的订阅者能够接收到消息;如果没有订阅者在监听,该topic就丢失了。

消息发起人发送消息到目标Queue,接收者可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有接收者来取,也不会丢失,直到消息被接收。

消息发布接收策略

一对多的消息发布接收策略,监听同一个topic地址的多个订阅者都能收到发布者发送的消息。订阅者接收完通知服务器

点对点的消息发布接收策略,一个消息发起人发送的消息,只能有一个接受者接收。接收者接收完后,通知服务器已接收,服务器对queue里的消息采取删除或其他操作。

 Topicqueue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

从应用场景上来说topic更适合与电商中的广告推送,广撒传单不必关心是否有人拿到。而Queue更适合用来处理严谨事务,如客户邮件,重要消息发布等需要确认消息抵达的场景。

 

2.(转)效率对比(为之前个人学习抄录并未对来源摘入)

    通过增加监听客户端的并发数来验证,topic的消息推送,是否会因为监听客户端的并发上升而出现明显的下降,测试环境的服务器为ci环境的ActiveMQ,客户端为我的本机。

        从实测的结果来看,topic方式发送的消息,发送和接收的效率,在一个订阅者和100个订阅者的前提下没有明显差异,但在500个订阅者(线程)并发的前提下,效率差异很明显(由于500线程并发的情况下,我本机的cpu占用率已高达70-90%,所以无法确认是我本机测试造成的性能瓶颈还是topic消息发送方式存在性能瓶颈,造成效率下降如此明显)。

        Topic方式发送的消息与queue方式发送的消息,发送和接收的效率,在一个订阅者和100个订阅者的前提下没有明显差异,但在500个订阅者并发的前提下,topic方式的效率明显低于queue

        Queue方式发送的消息,在一个订阅者、100个订阅者和500个订阅者的前提下,发送和接收的效率没有明显变化。

Topic实测数据: 

 

发送者发送的消息总数

所有订阅者接收到消息的总数

消息发送和接收平均耗时

单订阅者

100

100

101ms

100订阅者

100

10000

103ms

500订阅者

100

50000

14162ms

 

Queue实测数据: 

 

发送者发送的消息总数

所有订阅者接收到消息的总数

消息发送和接收平均耗时

单订阅者

100

100

96ms

100订阅者

100

100

96ms

500订阅者

100

100

100ms

 

PS:这份仅供参考吧,在个人看来这份数据并没有太大的说服力,首先对queue来说无论有多少个消息接收者,MQ的消息发送总条数都是以消息发起人发起的条数为准,而topic不同的MQ发送总条数是发布者发布的条数与订阅者个数的乘积。

 

下面就是代码了:

首先是消息发起人(Sender):

public class Sender {
private static ConnectionFactory connectionFactory;
private static Connection connection;
private static Session session;
private static int total=100;

public Sender() throws JMSException {
connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}

public MessageProducer getMessageProducer(String stock, int dMode) throws JMSException {
MessageProducer producer = session.createProducer(session.createQueue("myQueue"));
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return producer;
}


public TextMessage setMessageByText(String text) throws JMSException {
TextMessage message = session.createTextMessage();
message.setText(text);
return message;
}
public TextMessage setMessageByMap(Map<String, Object> map) throws JMSException {
TextMessage message = session.createTextMessage();
for (String key : map.keySet()) {
Object o=map.get(key);
if(o instanceof Integer){
message.setIntProperty(key, (Integer)o);
}else if(o instanceof Boolean){
message.setBooleanProperty(key, (Boolean)o);
}else if(o instanceof Long){
message.setLongProperty(key, (Long)o);
}else if(o instanceof String){
message.setStringProperty(key, (String)o);
}else if(o instanceof Double){
message.setDoubleProperty(key, (Double)o);
}else if(o instanceof Short){
message.setShortProperty(key, (Short)o);
}else if(o instanceof Short){
message.setShortProperty(key, (Short)o);
}else if(o instanceof Float){
message.setFloatProperty(key, (Float)o);
}
}
return message;
}

public void sendMessage(MessageProducer producer,TextMessage message) throws JMSException{
producer.send(message);
}
public static void main(String[] args) throws JMSException {
Sender sender = new Sender();
MessageProducer producer = sender.getMessageProducer("test", DeliveryMode.NON_PERSISTENT);
int count=0;
while (true) {
TextMessage message;
if(count%2==0){
Map<String,Object> map=new HashMap<String,Object>();
map.put("name", "My message");
map.put("writer", "Bartholomew");
map.put("content", "this is ActiveMQ!"+count);
message= sender.setMessageByMap(map);
sender.sendMessage(producer,message);
System.out.println("发送第"+ (++count)+"条信息: " + message.toString());
}else{
message=sender.setMessageByText("hello world!"+count);
sender.sendMessage(producer,message);
System.out.println("发送第"+ (++count)+"条信息: " + message.getText());
}
if(total<=count){
break;
}

}
}
}

 接着是消息接收人的监听类:监听类是根据消息的发送情况来写的,请大家自己修改

public class MyMessageListener implements MessageListener{

@Override
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
String content;
try {
content = tm.getText();
if(content!=null){
System.out.println("Received message: " + content);
}else{
Enumeration<String> pnames =tm.getPropertyNames();
while(pnames.hasMoreElements()){
String o = (String)pnames.nextElement();
//message.getObjectProperty(o);
System.out.print(o+":"+message.getObjectProperty(o)+",");
}
System.out.println();
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

 最后是消息接收人

public class Receiver {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MyMessageListener());
}
}

 

关于运行,对queue来说并没有谁先跑谁后跑的规定。而且一条消息仅有一个接收人,大家可以随意决定执行顺序。

本站声明
本文转载自:http://bartholomew.iteye.com/blog/2239476     作者:bartholomew4     发布日期:2015-08-31     本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。


 
© 2014-2016 ITdaan.com 粤ICP备14056181号