读书人

storm amp; metaq 范例

发布时间: 2013-11-18 00:11:49 作者: rapoo

storm & metaq 实例

????????}

????????Integer maxSize = (Integer) conf.get(FETCH_MAX_SIZE);

????????if?(maxSize ==?null) {

????????????log.warn("Using default FETCH_MAX_SIZE");

????????????maxSize =?DEFAULT_MAX_SIZE;

????????}

????????this.id2wrapperMap =?new?ConcurrentHashMap();

????????this.messageQueue =?new?LinkedTransferQueue();

????????try?{

????????????this.collector = collector;

????????????this.setUpMeta(topic, maxSize);

????????}

????????catch?(final?MetaClientException e) {

????????????log.error("Setup meta consumer failed", e);

????????}

????}

????private?void?setUpMeta(final?String topic,?final?Integer maxSize)?throws?MetaClientException {

????????this.sessionFactory =?new?MetaMessageSessionFactory(this.metaClientConfig);

????????this.messageConsumer =?this.sessionFactory.createConsumer(this.consumerConfig);

????????this.messageConsumer.subscribe(topic, maxSize,?new?MessageListener() {

????????????public?void?recieveMessages(final?Message message) {

????????????????final?MetaMessageWrapper wrapper =?new?MetaMessageWrapper(message);

????????????????MetaqSpout.this.id2wrapperMap.put(message.getId(), wrapper);

????????????????MetaqSpout.this.messageQueue.offer(wrapper);

????????????????try?{

????????????????????wrapper.latch.await();

????????????????}

????????????????catch?(final?InterruptedException e) {

????????????????????Thread.currentThread().interrupt();

????????????????}

????????????????// 消费失败,抛出运行时异常

????????????????if?(!wrapper.success) {

????????????????????throw?new?RuntimeException("Consume message failed");

????????????????}

????????????}

????????????public?Executor getExecutor() {

????????????????return?null;

????????????}

????????}).completeSubscribe();

????}

????//关闭时调用,进行consumer的shutdown操作

????public?void?close() {

????????try?{

????????????this.messageConsumer.shutdown();

????????}

????????catch?(final?MetaClientException e) {

????????????log.error("Shutdown consumer failed", e);

????????}

????????try?{

????????????this.sessionFactory.shutdown();

????????}

????????catch?(final?MetaClientException e) {

????????????log.error("Shutdown session factory failed", e);

????????}

????}

????//消息发布

????public?void?nextTuple() {

????????if?(this.messageConsumer !=?null) {

????????????try?{

??????????????//进行消息拉取

????????????????final?MetaMessageWrapper wrapper =?this.messageQueue.poll(WAIT_FOR_NEXT_MESSAGE, TimeUnit.MILLISECONDS);

????????????????if?(wrapper ==?null) {

????????????????????return;

????????????????}

????????????????final?Message message = wrapper.message;

????????????????this.collector.emit(this.scheme.deserialize(message.getData()), message.getId());

????????????}

????????????catch?(final?InterruptedException e) {

??????????????e.printStackTrace();

????????????}

???????????

????????}

????????try?{

????????????Thread.sleep(100);

????????}?catch?(InterruptedException e) {

????????????e.printStackTrace();

????????}

????}

????//消息操作成功确认机制

????public?void?ack(final?Object msgId) {

????????if?(msgId?instanceof?Long) {

????????????final?long?id = (Long) msgId;

????????????final?MetaMessageWrapper wrapper =?this.id2wrapperMap.remove(id);

????????????if?(wrapper ==?null) {

????????????????log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));

????????????????return;

????????????}

????????????wrapper.success =?true;

????????????wrapper.latch.countDown();

????????}

????????else?{

????????????log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));

????????}

????}

????//消费失败时返回

????public?void?fail(final?Object msgId) {

????????if?(msgId?instanceof?Long) {

????????????final?long?id = (Long) msgId;

????????????final?MetaMessageWrapper wrapper =?this.id2wrapperMap.remove(id);

????????????if?(wrapper ==?null) {

????????????????log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));

????????????????return;

????????????}

????????????wrapper.success =?false;

????????????wrapper.latch.countDown();

????????}

????????else?{

????????????log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));

????????}

????}

?

2.2 MetaqBolt

?

2.2.1?接口说明

?

该部分代码修改自Github上的Metaq异步生产者实例。设计这个Bolt的原因是,部分业务有这种需求,当经过storm实时处理后,数据发送到下一个业务系统,当下一个业务系统也是从metaq拉取数据时,就需要我们把处理过的数据写入到metaq中去,所以有了这个接口。

其读取配置文件的过程与MetaqSpout相似,但是没有组(Group)的概念,只需指定地址、目录及Topic(前提是Metaq上有该Topic),则可以把数据写入metaq中。

?

2.2.1?上代码

?

该部分代码较简单,可以参考AsyncConsumer代码。

????//构造,传递配置路径

????public?MetaqBolt(String MetaqSpoutXml) {

????????super();

????????this.metaqspoutxml = MetaqSpoutXml;

????}

????//初始化操作

????public?void?prepare(Map?stormConf, TopologyContext context,

????????????OutputCollector collector) {

????????System.out.println("MetaqBolt???--??Start!");

????????this.collector = collector;

????????// 初始化metaq的一些设置,包括zk链接地址,根目录等

????????this.zkConfig.zkConnect = MetaqSpoutXml.zkConnect;// "192.168.2.240:2181";

????????this.zkConfig.zkRoot = MetaqSpoutXml.zkRoot;// "/meta";

????????this.topic = MetaqSpoutXml.topic;

????????this.metaClientConfig.setZkConfig(this.zkConfig);

????????try?{

????????????this.sessionFactory =?new?MetaMessageSessionFactory(

????????????????????this.metaClientConfig);

????????}?catch?(MetaClientException e) {

????????????e.printStackTrace();

????????}

????????this.producer =?this.sessionFactory.createProducer();

????????this.producer.publish(this.topic);// 发布topic

????}

????public?void?execute(Tuple input) {

????????String str = input.getString(0);

????????try?{

????????????this.sendResult = producer.sendMessage(new?Message(this.topic, str

????????????????????.getBytes()));

????????}?catch?(MetaClientException | InterruptedException e) {

????????????e.printStackTrace();

????????}

????????//当生产失败时打印失败数据

????????if?(!this.sendResult.isSuccess()) {

????????????System.err.println("Send message failed,error message:"

????????????????????+ sendResult.getErrorMessage());

????????}

????}

?

3?代码改动说明

?

关于此次代码变动较大,加了一个spout源的接口,一个bolt的数据落地接口,对topology进行了优化。

具体如下:

(1) 增加了MetaqSpout接口,实现从MetaQ中读取数据(重点

(2) 增加了MetaqBolt接口,实现新的数据落地接口,将数据写入MetaQ中

(3) 修改了Topology主类,实现了节点可配置,通过配置文件列表,即不同类型的spout及bolt可动态搭配,想要实现不同拓扑功能,不用修改代码,而只需修改配置即可(重点

?

?

?

?

?

?

4?关于Metaq的报错

?

?

?

4.1?报错

?

storm & metaq 范例
?

?

根据错误提示,总是没找到其原因。

?

4.2?解决

?
storm & metaq 范例
?

?

?

才发现metaq往zk注册的服务器ip是192.168.122.1不是我本机的ip,之前对metaq进行配置的时候,并没有进行hostName配置,因为metaq据说默认的注册ip是localhost所以就没有注意了,但是好像这种情况来看,他进行zk注册的时候使用的是其代码内部的预留ip进行注册。

我在metaq的server.ini中进行了hostName配置,这个问题就解决了。

代码未能上传,有需要邮件留给我

转自新浪blog

读书人网 >开源软件

热点推荐