zookeeper项目使用几点小结
背景
??前段时间学习了zookeeper后,在新的项目中刚好派上了用场,我在项目中主要负责分布式任务调度模块的开发,对我自己来说是个不小的挑战。
??分布式的任务调度,技术上我们选择了zookeeper,具体的整个分布式任务调度的架构选择会另起一篇文章进行介绍。
?
??本文主要是介绍自己在项目中zookeeper的一些扩展使用,希望可以对大家有所帮助。
??项目中使用的zookeeper版本3.3.3,对应的文档地址:?http://zookeeper.apache.org/doc/trunk/
zookeeper学习记录?zookeeper学习记录(二)?扩展一:优先集群
先来点背景知识:
1.zookeeper中的server机器之间会组成leader/follower集群,1:n的关系。采用了paxos一致性算法保证了数据的一致性,就是leader/follower会采用通讯的方式进行投票来实现paxns。
2.zookeeper还支持一种observer模式,提供只读服务不参与投票,提升系统,对应文档:?http://zookeeper.apache.org/doc/trunk/zookeeperObservers.html
?
我们项目特性的决定了我们需要进行跨机房操作,比如杭州,美国,香港,青岛等多个机房之间进行数据交互。
跨机房之间对应的网络延迟都比较大,比如中美机房走海底光缆有ping操作200ms的延迟,杭州和青岛机房有70ms的延迟。?
?
为了提升系统的网络性能,我们在部署zookeeper网络时会在每个机房部署节点,多个机房之间再组成一个大的网络保证数据一致性。(zookeeper千万别再搞多个集群)
?
最后的部署结构就会是:
杭州机房 ?>=3台 (构建leader/follower的zk集群)青岛机房 ?>=1台 (构建observer的zk集群)美国机房 ?>=1台?(构建observer的zk集群)香港机房 ?>=1台?(构建observer的zk集群)
?一句话概括就是: 在单个机房内组成一个投票集群,外围的机房都会是一个observer集群和投票集群进行数据交互。 这样部署的一些好处,大家可以细细体会一下
针对这样的部署结构,我们会引入一个优先集群问题: 比如在美国机房的机器需要优先去访问本机房的zk集群,访问不到后才去访问杭州机房。?默认在zookeeper3.3.3的实现中,认为所有的节点都是对等的。并没有对应的优先集群的概念,单个机器也没有对应的优先级的概念。
扩展代码:(比较暴力,采用反射的方式改变了zk client的集群列表)先使用美国机房的集群ip初始化一次zk client通过反射方式,强制在初始化后的zk client中的server列表中又加入杭州机房的机器列表
ZooKeeper zk = null; try { zk = new ZooKeeper(cluster1, sessionTimeout, new AsyncWatcher() { public void asyncProcess(WatchedEvent event) { //do nothing } }); if (serveraddrs.size() > 1) { // 强制的声明accessible ReflectionUtils.makeAccessible(clientCnxnField); ReflectionUtils.makeAccessible(serverAddrsField); // 添加第二组集群列表 for (int i = 1; i < serveraddrs.size(); i++) { String cluster = serveraddrs.get(i); // 强制获取zk中的地址信息 ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk); List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils .getField(serverAddrsField, cnxn); // 添加第二组集群列表 serverAddrs.addAll(buildServerAddrs(cluster)); } } }扩展二:异步Watcher处理
??最早在看zookeeper的代码时,一直对它的watcher处理比较满意,使用watcher推送数据可以很方便的实现分布式锁的功能。
zookeeper的watcher实现原理也挺简单的,就是在zookeeper client和zookeeper server上都保存一份对应的watcher对象。每个zookeeper机器都会有一份完整的node tree数据和watcher数据,每次leader通知follower/observer数据发生变更后,每个zookeeper server会根据自己节点中的watcher事件推送给响应的zookeeper client,每个zk client收到后再根据内存中的watcher引用,进行回调。
?
这里会有个问题,就是zk client在处理watcher时,回凋的过程是一个串行的执行过程,所以单个watcher的处理慢会影响整个列表的响应。?
可以看一下ClientCnxn类中的EventThread处理,该线程会定时消费一个queue的数据,挨个调用processEvent(Object event) 进行回调处理。
?
扩展代码:
?
public abstract class AsyncWatcher implements Watcher { private static final int DEFAULT_POOL_SIZE = 30; private static final int DEFAULT_ACCEPT_COUNT = 60; private static ExecutorService executor = new ThreadPoolExecutor( 1, DEFAULT_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue( DEFAULT_ACCEPT_COUNT), new NamedThreadFactory( "Arbitrate-Async-Watcher"), new ThreadPoolExecutor.CallerRunsPolicy()); public void process(final WatchedEvent event) { executor.execute(new Runnable() {//提交异步处理 @Override public void run() { asyncProcess(event); } }); } public abstract void asyncProcess(WatchedEvent event);}?
说明:zookeeper针对watcher的调用是以单线程串行的方式进行处理,容易造成堵塞影响,monitor的数据同步及时性AsyncWatcher为采取的一种策略为当不超过acceptCount=60的任务时,会采用异步线程的方式处理。如果超过60任务,会变为原先的单线程串行的模式扩展三:重试处理
这个也不多说啥,看一下相关文档就清楚了
?
http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandlinghttp://wiki.apache.org/hadoop/ZooKeeper/FAQ#A3需要特殊处理下ConnectionLoss的异常,一种可恢复的异常。
重试处理:
public interface ZooKeeperOperation<T> { public T execute() throws KeeperException, InterruptedException;}/** * 包装重试策略 */ public <T> T retryOperation(ZooKeeperOperation<T> operation) throws KeeperException, InterruptedException { KeeperException exception = null; for (int i = 0; i < maxRetry; i++) { try { return (T) operation.execute(); } catch (KeeperException.SessionExpiredException e) { logger.warn("Session expired for: " + this + " so reconnecting due to: " + e, e); throw e; } catch (KeeperException.ConnectionLossException e) { //特殊处理Connection Loss if (exception == null) { exception = e; } logger.warn("Attempt " + i + " failed with connection loss so " + "attempting to reconnect: " + e, e); retryDelay(i); } } throw exception; }注意点:Watcher原子性在使用zookeeper的过程中,需要特别注意一点就是注册对应watcher事件时,如果当前的节点已经满足了条件,比如exist的watcher,它不会触发你的watcher,而会等待下一次watcher条件的满足。
它的watcher是一个一次性的监听,而不是一个永久的订阅过程。所以在watcher响应和再次注册watcher过程并不是一个原子操作,编写多线程代码和锁时需要特别注意
总结??zookeepr是一个挺不错的产品,源代码写的也非常不错,大量使用了queue和异步Thread的处理模式,真是一个伟大的产品。
1 楼 AliKevin2011 2011-10-15 agapple,你好,感谢你分享zookeeper。1.你说的 paxns 应该是paxos吧?(不知道是不是我理解错误,当然这是吹毛求疵,我只是希望你的文字更精确,别介意哦。)
2.还有我个人了解zookeeper中确切的说应该使用的是zab(zookeeper automic broadcast)当然是paxos优化版本。比如消除了“羊群效应”,正如你前篇文字提到的当节点数据变化时候,不会按照paxos算法将消息发送到所有的client,而是只发送到序号为下一个序号的client,从而缓解服务压力。
我也是刚刚了解zookeeper,我理解不对之处多多指出。还是感谢你的分享。
2 楼 agapple 2011-10-16 AliKevin2011 写道agapple,你好,感谢你分享zookeeper。
1.你说的 paxns 应该是paxos吧?(不知道是不是我理解错误,当然这是吹毛求疵,我只是希望你的文字更精确,别介意哦。)
2.还有我个人了解zookeeper中确切的说应该使用的是zab(zookeeper automic broadcast)当然是paxos优化版本。比如消除了“羊群效应”,正如你前篇文字提到的当节点数据变化时候,不会按照paxos算法将消息发送到所有的client,而是只发送到序号为下一个序号的client,从而缓解服务压力。
我也是刚刚了解zookeeper,我理解不对之处多多指出。还是感谢你的分享。
恩,多谢你的建议。
1. 的确是paxos,是我的笔误,多谢指出。
2. 其实Watcher并不是paxos算法中的一部分。paxos中只包括提出决议和决议通过,这些都是发生在zookeeper内部过程。而Watcher只是在paxos算法完成后,是zookeeper提供的一些便利性工具,正因为这样的callback才允许我们实现了分布式锁机制提供了可能性
如果你看过上一篇分布式lock文章就知道,这里并没有一种百分百可靠&有效的lock方法,选择EPHEMERAL和PERSISTENT同样存在一些问题。基于EPHEMERAL的实现,通过heartbeat并不能100%确保(比如网络断了,jvm依然存在)