读书人

metamorphosis-3-metaQ初始

发布时间: 2013-12-13 00:50:19 作者: rapoo

metamorphosis-3-metaQ初步

Java客户端例子

使用maven,引用metaq的java client非常简单:

<dependency>
??<groupId>com.taobao.metamorphosis</groupId>
??<artifactId>metamorphosis-client-extension</artifactId>
??<version>1.4.3</version>
</dependency>

?

?也可以引用 client-extend

<dependency>
??<groupId>com.taobao.metamorphosis</groupId>
??<artifactId>metamorphosis-client-extension</artifactId>
??<version>1.4.3</version>
</dependency>

?

?

直接用git 将metamorphosis-example?clone ?下载页面:https://github.com/killme2008/metamorphosis-example?。

请注意,1.4.3及以上版本的java客户端只能连接1.4.3及以上版本的MetaQ服务器,而1.4.3之前的老客户端则没有限制。主要是因为1.4.3引入了发布和订阅topic的分离,1.4.3的新客户端只能查找到新版本的broker

消息会话工厂类

在使用消息生产者和消费者之前,需要用到消息会话工厂类——MessageSessionFactory,由这个工厂帮你创建生产者或者消费者。除了这些,MessageSessionFactory还默默无闻地在后面帮你做很多事情,包括:

1.服务的查找和发现,通过diamond和zookeeper帮你查找日常的meta服务器地址列表,diamond可以忽略。

2.连接的创建和销毁,自动创建和销毁到meta服务器的连接,并做连接复用,也就是到同一台meta的服务器在一个工厂内只维持一个连接。

3.消费者的消息存储和恢复。

4.协调和管理各种资源,包括创建的生产者和消费者的。

因此,我们首先需要创建一个会话工厂类,MessageSessionFactory仅是一个接口,它的实现类常用的是MetaMessageSessionFactory:

?

MessageSessionFactory?sessionFactory?=?new?MetaMessageSessionFactory(new?MetaClientConfig());?

请注意,MessageSessionFactory应当尽量复用,也就是作为应用中的单例来使用,简单的做法是交给spring之类的容器帮你托管

?

消息生产者 ?

?

?

package?com.taobao.metamorphosis.example;

import?java.io.BufferedReader;
import?java.io.InputStreamReader;

import?com.taobao.metamorphosis.Message;
import?com.taobao.metamorphosis.client.MessageSessionFactory;
import?com.taobao.metamorphosis.client.MetaClientConfig;
import?com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import?com.taobao.metamorphosis.client.producer.MessageProducer;
import?com.taobao.metamorphosis.client.producer.SendResult;
import?com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;

public?class?Producer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????final?MetaClientConfig?metaClientConfig?=?new?MetaClientConfig();
????????final?ZKConfig?zkConfig?=?new?ZKConfig();
????????//设置zookeeper地址
????????zkConfig.zkConnect?=?"127.0.0.1:2181";
????????metaClientConfig.setZkConfig(zkConfig);
????????//?New?session?factory,强烈建议使用单例
????????MessageSessionFactory?sessionFactory?=?new?MetaMessageSessionFactory(metaClientConfig);
????????//?create?producer,强烈建议使用单例
????????MessageProducer?producer?=?sessionFactory.createProducer();
????????//?publish?topic
????????final?String?topic?=?"matatest";
????????producer.publish(topic);

????????BufferedReader?reader?=?new?BufferedReader(new?InputStreamReader(System.in));
????????String?line?=?null;
????????while?((line?=?reader.readLine())?!=?null)?{
????????????//?send?message
????????????SendResult?sendResult?=?producer.sendMessage(new?Message(topic,?line.getBytes()));
????????????//?check?result
????????????if?(!sendResult.isSuccess())?{
????????????????System.err.println("Send?message?failed,error?message:"?+?sendResult.getErrorMessage());
????????????}
????????????else?{
????????????????System.out.println("Send?message?successfully,sent?to?"?+?sendResult.getPartition());
????????????}
????????}
????}

}

消息生产者的接口是MessageProducer,你可以通过它来发送消息。创建生产者很简单,通过MessageSessionFactory的createProducer方法即可以创建一个生产者。在Meta里,每个消息对象都是Message类的实例,Message表示一个消息对象,它包含这么几个属性:

读书人网 >操作系统

热点推荐