读书人

storm-0.8.2源码分析之nimbus运作过程

发布时间: 2013-10-31 12:03:52 作者: rapoo

storm-0.8.2源码分析之nimbus运行过程(一)
nimbus运行过程

这部分将详细讲述启动后,nimbus是如何运行的。在之前讲述nimbus启动过程,有提到nimbus.clj中有个通过defserverfn定义的核心函数service-handler,是实现了Thrift的service Nimbus代码生成的Nimbus.Iface这个句柄类,这个句柄类需要用户自己去实现内部每个函数的处理流程。在rpc client调用相应的函数接口,rpc server就会调用用户实现的句柄类中对应的函数接口,因此这里就需要我们去关注,在不同接口被调用后,是怎么处理的。特别是topology被上传到nimbus这个任务是如何被解析与分发到supervisor。

从头到尾一次分析这个核心函数,会一点点分析,避免一大块牵扯的东西过多。

(defserverfn service-handler [conf inimbus]

(.prepare inimbus conf (master-inimbus-dir conf))

(log-message "Starting Nimbus with conf " conf)

(let [nimbus (nimbus-data conf inimbus)]

...

)

....)

第一句调用prepare的是standalone-nimbus函数实现的INimbus接口类中的prepare方法,这个方法是个空的实现。
第二句是打印log,也不用关心

之后,是let 的定义,nimbus是一个变量,这个变量是的值,是(nimbus-data conf inimbus)的返回值,也就是一个map。这个这个map包含了之后处理所需要的数据,需要我们仔细去跟踪内部到底有些什么东西。

nimbus-data

nimbus-data函数的定义如下所示

(defn nimbus-data [conf inimbus]

(let [forced-scheduler (.getForcedScheduler inimbus)]

{:conf conf

:inimbus inimbus

:submitted-count (atom 0)

:storm-cluster-state (cluster/mk-storm-cluster-state conf)

:submit-lock (Object.)

:heartbeats-cache (atom {})

:downloaders (file-cache-map conf)

:uploaders (file-cache-map conf)

:uptime (uptime-computer)

:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))

:timer (mk-timer :kill-fn (fn [t]

(log-error t"Error when processing event")

(halt-process!20 "Error when processing an event")

))

:scheduler (mk-scheduler conf inimbus)

}))

let的bindings定义的变量forced-scheduler的值是个nil,因为INimbus中实现的getForcedScheduler方法是个空实现。

再看看let 块的执行体,它是一个map。其中:conf 的值是一个storm 配置的map,:inimbus的值是INimbus接口的匿名实现也就是一个适配器。submitted-count 的值是一个atom类型的值,初始化为0。:storm-cluster-state的值是什么,需要看看cluster/mk-storm-cluster-state这个函数内部是做了什么。

cluster/mk-storm-cluster-state这个函数内部匿名实现了StormClusterState协议,而实现时又用到了ClusterState协议。这个函数主要用于获取集群的各种状态,底层zookeeper的操作上层封装在了ClusterState协议的实现,而真正底层访问zookeeper使用apache curator这个开源库。

(defn mk-storm-cluster-state[cluster-state-spec]

(let [[solo? cluster-state] (if (satisfies? ClusterStatecluster-state-spec)

[falsecluster-state-spec]

[true(mk-distributed-cluster-state cluster-state-spec)])

先看看几句,因为ClusterState协议的实现是匿名类型的实现,因此satisfies? 判断得到false,也就是使用[true (mk-distributed-cluster-state cluster-state-spec)] 对[solo?cluster-state]进行赋值。

后面是初始化一相关的callback,这很简单,代码如下

assignment-info-callback(atom {})

supervisors-callback (atom nil)

assignments-callback (atom nil)

storm-base-callback (atom {})

之后是生成state id的,调用的是mk-storm-cluster-state适配器中匿名实现StormClusterState协议中的register方法。register方法内部会生成id,并记录到map中,之后返回一个id作为注册id。

state-id(register

cluster-state

(fn [type path]

(let [[subtree & args](tokenize-path path)]

(condp = subtree

ASSIGNMENTS-ROOT (if(empty? args)

(issue-callback! assignments-callback)

(issue-map-callback! assignment-info-callback(first args)))

SUPERVISORS-ROOT(issue-callback! supervisors-callback)

STORMS-ROOT(issue-map-callback! storm-base-callback (first args))

;; this shouldnever happen

(halt-process! 30"Unknown callback for subtree " subtree args)

)

)))]

之后就是在zookeeper上创建一些列根目录,用于存放集群交互的各种信息,目录分别是:

/assignments

/storms

/supervisors

/workerbeats

/errors

再后面就是使用reify关键字匿名实现StormClusterState协议,在实现过程中依赖到了cluster-state,也就是ClusterState协议的实现。其中StormClusterState中的各个接口具体的作用,在之后分析过程中如果遇到,再回过头再解释。

现在回到nimbus-data接着往下分析

:submit-lock (Object.)

:heartbeats-cache (atom {})

:downloaders (file-cache-map conf)

submit-lock就是一个java的Object对象用于加锁解锁,heartbeats-cache是一个atom引用类型,这个引用所包含的的值是一个空的map。

downloaders是一个TimeCacheMap对象。downloaders的值是(file-cache-mapconf)

其中file-cache-map是一个函数,定义如下

(defn file-cache-map [conf]

(TimeCacheMap.

(int (conf NIMBUS-FILE-COPY-EXPIRATION-SECS))

(reify TimeCacheMap$ExpiredCallback

(expire [this id stream]

(.close stream)

))

))

TimeCacheMap是storm自己实现的,对外提供了ExpiredCallback接口类,这个接口类中只有一个方法是expire方法。在file-cache-map函数中匿名实现了这个接口。

TimeCacheMap对外展示的是一个类似map类,可以在生成这个对象的时候设置超时回调函数(也就是ExpiredCallback这个接口,超时处理,在这里就是关闭流),之后可以像map一样put,get进行操作,在超时时间到了之后,会调用用户设置的回到函数,将超时的数据传递到回调函数中进行处理。需要注意的是,这个类的超时不是很精确。

这里说下为什么会超时不是很精确

TimeCacheMap部分代码如下:

private LinkedList<HashMap<K,V>> _buckets;

_callback = callback;

final long expirationMillis = expirationSecs * 1000L;

final long sleepTime = expirationMillis / (numBuckets-1);

_cleaner = new Thread(new Runnable() {

public void run() {

try {

while(true) {

Map<K, V> dead =null;

Time.sleep(sleepTime);

synchronized(_lock) {

dead =_buckets.removeLast();

_buckets.addFirst(new HashMap<K, V>());

}

if(_callback!=null) {

for(Entry<K,V> entry: dead.entrySet()) {

_callback.expire(entry.getKey(), entry.getValue());

}

}

}

} catch (InterruptedExceptionex) {

}

}

});

_cleaner.setDaemon(true);

_cleaner.start();

_buckets是一个桶,桶里面有若干个子桶,也就是map。以默认值来看,_buckets是有3个子桶,而NIMBUS-FILE-COPY-EXPIRATION-SECS的超时时间是600秒,那么在计算得到的sleepTime就是300秒,在线程中每次进来会先睡300秒,之后去取最后一个子桶,并插入一个新的空的子桶到_buckets头部。之后是将超时的那个子桶中的所有数据换地给callback进行处理。

在看put的操作

public void put(K key, V value) {

synchronized(_lock) {

Iterator<HashMap<K, V>> it = _buckets.iterator();

HashMap<K, V> bucket = it.next();

bucket.put(key, value);

while(it.hasNext()) {

bucket = it.next();

bucket.remove(key);

}

}

}

put总是将数据插入到_buckets的第一个子桶中,并扫描后面的子桶是否有包含put进来的数据,如果有,则移除掉。

那么现在问题来了,如果在线程sleep时,插入一条数据到_buckets的第一个子桶中,它的超时时间将是在2*300至3*300这个时间范围内,因此这个超时时间是不保证精确的,只保证在指定的超时间到了之后会被回调,而非在到的时候立即触发回调。

回到nimbus-data接着看

:uploaders(file-cache-map conf)

:uptime (uptime-computer)

uploaders的值和前面的downloader的值是一样的,也是一个TimeCacheMap对象。

uptime的值是一个函数,是uptime-computer这个函数所返回的一个匿名函数,如下所示:

(defn uptime-computer []

(let [start-time (current-time-secs)]

(fn []

(time-delta start-time)

)))

uptime-computer就是用于计算时间差的,需要注意的是,在添加:uptime (uptime-computer)

这条记录的时候,start-time已经开始取当前时间了,那么在之后真正取:uptime的值开始计算的时候,就会得到时间差。

在service-handler中有条语句nimbus-uptime ((:uptime nimbus)) 这就会计算出,在设置nimbus这个map的时候时间点和当前时间的差。两层圆括号的意思是,先从map中取出值,因为值是一个函数,所以还要再用一对圆括号括起来执行。

回到nimbus-data接着看

:validator(new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))

:timer (mk-timer :kill-fn (fn [t]

(log-error t "Error whenprocessing event")

(halt-process!20 "Error when processing an event")

))

:scheduler (mk-scheduler conf inimbus)

validator的值是一个对象。类名是在default.yaml中所指定的nimbus.topology.validator:"backtype.storm.nimbus.DefaultTopologyValidator",这里的先取到类名,然后根据类名创建对象。

timer的值是一个map ,这个map中包含了定时器,队列,锁等数据。在之前分析supervisor启动过程中也有提到mk-timer,这里再来回顾一下。mk-timer和java中的Timer是很像的。mk-timer的做法是启动一个线程,循环从队列中去peek一个数据,这个数据是一个vector类型的数据,内有会有三个值,分别是时间,函数,和uuid。线程把当前时间和从队列预读的这个数据中的时间值进行比较,如果时间到了,则从队列中弹出这个数据,然后执行这个数据中的第二个值,也就是函数。在执行完之后,会sleep 1秒,这个sleep的实现是在storm自己提供的Time.java中。当sleep结束,会重复这个动作。

mk-timer最后会返回的值如下所示:

{:timer-thread timer-thread

:queue queue

:active active

:lock lock

:cancel-notifier notifier}

也就是定时器线程,队列,锁等数据。

scheduler的值是一个调度器。这个调度器可以有三种,默认的调度器,从INimbus中获取的调度器,以及是用户自定义的调度器。在这里取到的是默认的调度器。

到这,nimbus-data就分析完了。总的来说,小小的一个map,内部还是隐藏了很多的东西,在经过分析之后,之后这个nimbus-data在service-handler中被用到的时候,就知道取的什么值了。

读书人网 >云计算

热点推荐