读书人

JGroups(二)

发布时间: 2012-08-09 15:59:21 作者: rapoo

JGroups(2)

转载自:http://whitesock.iteye.com/blog/199269

?

2 API
2.1 Interfaces
2.1.1 Transport
??? Transport接口只定义了最简单的方法,用于发送和接收消息。其定义如下:

Java代码??JGroups(二)
  1. public?interface?Transport?{??
  2. ????void?send(Message?msg)?throws?Exception;??
  3. ????Object?receive(long?timeout)?throws?Exception;??
  4. }??

2.1.2 MessageListener
??? 如果说Transport接口是以pull的方式接收消息,那么MessageListener则是以push的方式处理消息。当收到消息时,receive方法会被调用。getState() 和setState()方法用于在实例间传递状态。其定义如下:

Java代码??JGroups(二)
  1. public?interface?MessageListener?{??
  2. ????void?receive(Message?msg);??
  3. ????byte[]?getState();??
  4. ????void?setState(byte[]?state);??
  5. }??

2.1.3 ExtendedMessageListener
??? ExtendedMessageListener继承自MessageListener,它定义了用来在实例间部分传递状态的方法。如果需要传递的状态数据量很大,那么通过配置协议栈,也可以指定使用流的方式传递状态。其定义如下:

Java代码??JGroups(二)
  1. public?interface?ExtendedMessageListener?extends?MessageListener?{??
  2. ????byte[]?getState(String?state_id);??
  3. ????void?setState(String?state_id,?byte[]?state);??
  4. ??
  5. ????void?getState(OutputStream?ostream);??
  6. ????void?setState(InputStream?istream);??
  7. ??
  8. ????void?getState(String?state_id,?OutputStream?ostream);??
  9. ????void?setState(String?state_id,?InputStream?istream);??
  10. }??

2.1.4 MembershipListener
??? 当收到view、suspicion message和block event 的时候,相应的方法会被调用。这个接口常用的方法是viewAccepted(),以便在新的实例加入(或者离开)到集群时得到通知。当JGroups推测某个实例可能崩溃时(此时该实例并未离开集群),suspect()方法会被调用,目前没有unsuspect()方法。当JGroups需要通知集群中的实例不要发送消息时,block()方法会被调用。这通常需要配置FLUSH协议,例如为了确保在进行状态传递的时候,没有实例在发送消息。在block()方法返回后,所有发送消息的线程都会被阻塞,知道FLUSH协议解除阻塞。需要注意的是,block()方法内不应该执行耗时的操作,否则整个FLUSH协议都会被阻塞。其定义如下:

Java代码??JGroups(二)
  1. public?interface?MembershipListener?{??
  2. ????void?viewAccepted(View?new_view);??
  3. ????void?suspect(Address?suspected_mbr);??
  4. ????void?block();??
  5. }??

2.1.5 ExtendedMembershipListener
??? ExtendedMembershipListener继承自MembershipListener。当FLUSH协议解除阻塞的时候,unblock()方法会被调用,所有发送消息的线程可以继续发送消息。其定义如下:

Java代码??JGroups(二)
  1. public?interface?ExtendedMembershipListener?extends?MembershipListener?{??
  2. ????void?unblock();??
  3. }??

2.1.6 ChannelListener
??? 可以通过调用JChannel接口的addChannelListener(ChannelListener listener)方法来添加ChannelListener。当Channel被连接或者关闭时,相应的方法会北调用。其定义如下:

Java代码??JGroups(二)
  1. public?interface?ChannelListener?{??
  2. ????void?channelConnected(Channel?channel);??
  3. ????void?channelDisconnected(Channel?channel);??
  4. ????void?channelClosed(Channel?channel);??
  5. ????void?channelShunned();??
  6. ????void?channelReconnected(Address?addr);??
  7. }??

2.1.7 Receiver
??? Receiver继承自MessageListener和MembershipListener。其定义如下:

Java代码??JGroups(二)
  1. public?interface?Receiver?extends?MessageListener,?MembershipListener?{??
  2. }??

2.1.8 ExtendedReceiver
??? ExtendedReceiver继承自Receiver、ExtendedMessageListener和ExtendedMembershipListener。其定义如下:

Java代码??JGroups(二)
  1. public?interface?ExtendedReceiver?extends?Receiver,?ExtendedMessageListener,?ExtendedMembershipListener?{??
  2. }??

?

2.2 Channel
2.2.1 Creating a channel
??? 最常见的创建Channel的方法是通过构造函数,此外也可以通过工厂方法。需要注意的是,集群中所有的实例必须有相同的协议栈。JChannel的构造函数之一如下:

Java代码??JGroups(二)
  1. public?JChannel(String?properties)?throws?ChannelException?{??
  2. ????this(ConfiguratorFactory.getStackConfigurator(properties));??
  3. }??

??? 以上的构造函数中,properties参数是冒号分割的字符串,用来配置协议栈。字符串的最左端的元素定义了最底层的协议。如果properties为null,那么将使用缺省的协议栈,即jgroups-all.jar中的udp.xml。以下是个properties参数的例子:

Java代码??JGroups(二)
  1. String?props="UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):"?+??
  2. "PING(timeout=3000;num_initial_members=6):"?+??
  3. "FD(timeout=5000):"?+??
  4. "VERIFY_SUSPECT(timeout=1500):"?+??
  5. "pbcast.NAKACK(gc_lag=10;retransmit_timeout=3000):"?+??
  6. "UNICAST(timeout=300,600,1200):"?+??
  7. "FRAG:"?+??
  8. "pbcast.GMS(join_timeout=5000;shun=false;print_local_addr=true)";??

??? 此外,也可以用File和URL作为构造函数的参数,这种方式允许以本地或者远程的XML文件配置协议栈。XML文件的config节点中的每个子节点定义一个协议,第一个子节点定义了最底层的协议。每个子节点名都对应一个Java类名,缺省的协议名不必是全限定类名,它们位于org.jgroups.stack.protocols包中。如果是自定义的协议,那么则必须是全限定类名。每个协议可以有零个或多个属性,以name/value对的方式指定。以下是jgroups-all.jar中的udp.xml的内容:

Xml代码??JGroups(二)
  1. <config>??
  2. ????<UDP??
  3. ?????????mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}"??
  4. ?????????mcast_port="${jgroups.udp.mcast_port:45588}"??
  5. ?????????tos="8"??
  6. ?????????ucast_recv_buf_size="20000000"??
  7. ?????????ucast_send_buf_size="640000"??
  8. ?????????mcast_recv_buf_size="25000000"??
  9. ?????????mcast_send_buf_size="640000"??
  10. ?????????loopback="false"??
  11. ?????????discard_incompatible_packets="true"??
  12. ?????????max_bundle_size="64000"??
  13. ?????????max_bundle_timeout="30"??
  14. ?????????use_incoming_packet_handler="true"??
  15. ?????????ip_ttl="${jgroups.udp.ip_ttl:2}"??
  16. ?????????enable_bundling="true"??
  17. ?????????enable_diagnostics="true"??
  18. ?????????thread_naming_pattern="cl"??
  19. ??
  20. ?????????use_concurrent_stack="true"??
  21. ??
  22. ?????????thread_pool.enabled="true"??
  23. ?????????thread_pool.min_threads="2"??
  24. ?????????thread_pool.max_threads="8"??
  25. ?????????thread_pool.keep_alive_time="5000"??
  26. ?????????thread_pool.queue_enabled="true"??
  27. ?????????thread_pool.queue_max_size="1000"??
  28. ?????????thread_pool.rejection_policy="Run"??
  29. ??
  30. ?????????oob_thread_pool.enabled="true"??
  31. ?????????oob_thread_pool.min_threads="1"??
  32. ?????????oob_thread_pool.max_threads="8"??
  33. ?????????oob_thread_pool.keep_alive_time="5000"??
  34. ?????????oob_thread_pool.queue_enabled="false"??
  35. ?????????oob_thread_pool.queue_max_size="100"??
  36. ?????????oob_thread_pool.rejection_policy="Run"/>??
  37. ??
  38. ????<PING?timeout="2000"??
  39. ????????????num_initial_members="3"/>??
  40. ????<MERGE2?max_interval="30000"??
  41. ????????????min_interval="10000"/>??
  42. ????<FD_SOCK/>??
  43. ????<FD?timeout="10000"?max_tries="5"???shun="true"/>??
  44. ????<VERIFY_SUSPECT?timeout="1500"??/>??
  45. ????<BARRIER?/>??
  46. ????<pbcast.NAKACK?use_stats_for_retransmission="false"??
  47. ???????????????????exponential_backoff="150"??
  48. ???????????????????use_mcast_xmit="true"?gc_lag="0"??
  49. ???????????????????retransmit_timeout="50,300,600,1200"??
  50. ???????????????????discard_delivered_msgs="true"/>??
  51. ????<UNICAST?timeout="300,600,1200"/>??
  52. ????<pbcast.STABLE?stability_delay="1000"?desired_avg_gossip="50000"??
  53. ???????????????????max_bytes="1000000"/>??
  54. ????<VIEW_SYNC?avg_send_interval="60000"???/>??
  55. ????<pbcast.GMS?print_local_addr="true"?join_timeout="3000"??
  56. ????????????????shun="false"??
  57. ????????????????view_bundling="true"/>??
  58. ????<FC?max_credits="500000"??
  59. ????????????????????min_threshold="0.20"/>??
  60. ????<FRAG2?frag_size="60000"??/>??
  61. ????<!--pbcast.STREAMING_STATE_TRANSFER?/-->??
  62. ????<pbcast.STATE_TRANSFER??/>??
  63. ????<!--?pbcast.FLUSH??/-->??
  64. </config>??

??? 以上XML文件中,UDP协议的mcast_addr属性被指定使用jgroups.udp.mcast_addr系统属性,如果没有配置这个系统属性,那么使用缺省值228.10.10.10。

2.2.2 Setting options
??? 通过setOpt(int option, Object value)方法可以给Channel设置属性,目前支持的属性有:

读书人网 >软件架构设计

热点推荐