编辑: 迷音桑 2017-10-01
消息服务 MNS 最佳实践 最佳实践 广播拉取消息模型 如何实现一对多拉取消息消费模型 背景描述 阿里云消息服务MNS 已经提供队列(queue)和主题(topic)两种模型.

其中队列提供的是一对多的共 享消息消费模型,采用客户端主动拉取(Pull)模式;

主题模型提供一对多的广播消息消费模型,并且采 用服务端主动推送(Push)模式.上面两种模型基本能满足我们大多数应用场景. 推送模式的好处是即时性能比较好,但是需要暴露客户端地址来接收服务端的消息推送.有些情况下,比 如企业内网,我们无法暴露推送地址,希望改用拉取(Pull)的方式.虽然MNS不直接提供这种消费模型 ,但是我们可以结合主题和队列来实现一对多的拉取消息消费模型.具体方案如下: 解决方案 让主题将消息先推送到队列,然后由消费者从队列拉取消息.这样既可以做到1对多的广播消息,又不需要暴露 消费者的地址;

如下图所示: 消息服务 MNS 最佳实践

1 接口说明 最新的Java SDK(1.1.5)中的CloudPullTopic 默认支持上述解决方案.其中MNSClient 提供下面两个接口来 快速创建CloudPullTopic: 其中,TopicMeta 是创建topic的meta 设置, queueNameList里指定topic消息推送的队列名列表 ;

needCreateQueue表明queueNameList是否需要创建;

queueMetaTemplate是创建queue需要的queue meta 参数设置;

Demo代码 public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate) public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector queueNameList) CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);

MNSClient client = account.getMNSClient();

// build consumer name list. Vector consumerNameList = new Vector();

String consumerName1 = consumer001 ;

String consumerName2 = consumer002 ;

String consumerName3 = consumer003 ;

consumerNameList.add(consumerName1);

consumerNameList.add(consumerName2);

consumerNameList.add(consumerName3);

QueueMeta queueMetaTemplate = new QueueMeta();

queueMetaTemplate.setPollingWaitSeconds(30);

try{ //producer code: // create pull topic which will send message to

3 queues for consumer. 消息服务 MNS 最佳实践

2 String topicName = demo-topic-for-pull ;

TopicMeta topicMeta = new TopicMeta();

topicMeta.setTopicName(topicName);

CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);

//publish message and consume message. String messageBody = broadcast message to all the consumers:hello the world. ;

// if we sent raw message,then should use getMessageBodyAsRawString to parse the message body correctly. TopicMessage tMessage = new RawTopicMessage();

tMessage.setBaseMessageBody(messageBody);

pullTopic.publishMessage(tMessage);

// consumer code: //3 consumers receive the message. CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);

CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);

CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);

Message consumer1Msg = queueForConsumer1.popMessage(30);

if(consumer1Msg != null) { System.out.println( consumer1 receive message: + consumer1Msg.getMessageBodyAsRawString());

}else{ System.out.println( the queue is empty );

} Message consumer2Msg = queueForConsumer2.popMessage(30);

if(consumer2Msg != null) { System.out.println( consumer2 receive message: + consumer2Msg.getMessageBodyAsRawString());

下载(注:源文件不在本站服务器,都将跳转到源网站下载)
备用下载
发帖评论
相关话题
发布一个新话题