读书人

storm-0.8.2源码分析之nimbus起步

发布时间: 2013-09-06 10:17:17 作者: rapoo

storm-0.8.2源码分析之nimbus启动
nimbus启动脚本启动逻辑

通过bin/stormnimbus会启动nimbus进程,类似hadoop的jobtracker。

bin/storm是一个python写的脚本,支持jar,kill,nimbus,supervisor,ui,drpc等等命令。

python的入口函数如下

def main():

if len(sys.argv) <= 1:

print_usage()

sys.exit(-1)

global CONFIG_OPTS

config_list, args = parse_config_opts(sys.argv[1:])

parse_config(config_list)

COMMAND = args[0]

ARGS = args[1:]

(COMMANDS.get(COMMAND, unknown_command))(*ARGS)

最后这条语句将根据不同的命令调用其对应的方法,启动nimbus这里调用是

nimbus(klass="backtype.storm.daemon.nimbus"),在nimbus方法内部会调用exec_storm_class(所有命令所调用的方法内部都会调用这个),根据指定的入口类backtype.storm.daemon.nimbus去启动程序。exec_storm_class内部其实就是通过exec执行java 参数 包名.类名 来启动对应的程序。

nimbus的clojure入口

nimbus从脚本中执行的时候,会根据之前传递的backtype.storm.daemon.nimbus这个类,去定位到backtype/storm/daemon/nimbus.clj这个clojure的文件(关于clojure的命名空间与实际文件的映射其实和java中是一样的的),其中-main是nimbus.clj的入口函数,相当c++/java中的main入口。

-main非常简单,如下所示

(defn -main []

(-launch (standalone-nimbus)))

下面将依次分析,standalong-numbus,以及-launch args各自做了些什么。

standalong-numbus是个函数,在这个函数内部是实现INimbus的接口,大致过程是用到了clojure.core.protocols中的reify来定义了匿名的类型实现了java中的package backtype.storm.scheduler这个包中的INimbus这个接口(clojure中接口的对应物称为协议)

下面代码中参数列表中的第一个参数this,是类似c++/java中的类的成员函数的第一个隐藏参数this,也就是通过this确定到底是使用这个协议的哪个实现。

先看standalone-nimbus的代码如下

(defn standalone-nimbus []

(reify INimbus

(prepare [this conf local-dir]

)

(allSlotsAvailableForScheduling [this supervisors topologiestopologies-missing-assignments]

(->> supervisors

(mapcat (fn [^SupervisorDetails s]

(for [p (.getMeta s)]

(WorkerSlot. (.getId s)p))))

set ))

(assignSlots [this topology slots]

)

(getForcedScheduler [this]

nil )

(getHostName [this supervisors node-id]

(if-let [^SupervisorDetails supervisor (get supervisors node-id)]

(.getHost supervisor)))

))

allSlotsAvailableForScheduling 按clojure的语义进行转下,代码如下

(set ((mapcat (fn [^SupervisorDetails s]

(for [p (.getMeta s)]

(WorkerSlot. (.getId s)p)))) supervisors)

)

也就是说通过遍历Collection<SupervisorDetails> 类型的变量supervisors取id和meta,用来初始化WorkerSolt对象,mapcat将WorkerSolt形成集合,之后将这个集合作为参数传递给set,使之返回符合接口要求的Collection<WorkerSlot>

getHostName 是取主机名的接口,supervisors 的类型是Map<String,SupervisorDetails> ,node-id的类型是String。其过程是从supervisors中通过node-id去找符合条件的SupervisorDetails对象,如果找到了,就调用SupervisorDetails的getHost方法取得主机名,否者返回的是nil

(getHostName [this supervisors node-id]

(if-let [^SupervisorDetails supervisor (get supervisors node-id)]

(.getHost supervisor)))

)

其他几个接口standalone-nimbus中没有去实现。

:这里涉及到了clojure与java backtype.storm.scheduler包中几个类的交互。

-launch 将之前的standalone-nimbus做为参数传递给-launch,其代码如下:

(defn -launch [nimbus]

(launch-server! (read-storm-config) nimbus))

配置文件读取

首先,先介绍下(read-storm-config)的引用的过程,nimbus.clj的头部定义命名空间的时候,有一句(:use [backtype.storm.daemon common])引入了common.clj。而common.clj中在定义自身名字空间的时候也引用了其他的其他名字空间(:use [backtype.storm log config util]) ,read-storm-config函数就是在config.clj中实现的,其定义如下:

(defn read-storm-config []

(clojurify-structure (Utils/readStormConfig)))

其中Utils/readStormConfig表示使用了java的backtype.storm.utils包中的Utils类的readStormConfig方法,其定义如下:

publicstatic Map readStormConfig() {

Map ret = readDefaultConfig();

String confFile = System.getProperty("storm.conf.file");

Map storm;

if (confFile==null || confFile.equals("")) {

storm = findAndReadConfigFile("storm.yaml", false);

} else {

storm = findAndReadConfigFile(confFile, true);

}

ret.putAll(storm);

ret.putAll(readCommandLineOpts());

return ret;

}

这个方法就是真正去解析配置的入口,解析过程:先读取默认的defualts.yaml的配置,对于源码来说该文件是在conf目录下,对于release版本则是该文件打到了storm-0.8.2.jar内。

其次,再解析用户配置的storm.yaml中的配置项,如果strom.yaml中有配置项与默认配置文件的配置项有冲突,则会覆盖掉默认配置项。最后,取系统环境变量中设置的storm.options的值,这一般都是没有的,因此这步可以跳过。

:storm的配置文件用到了yaml这种配置格式,可参考其官方http://www.yaml.org/

再回过头看看(Utils/readStormConfig)取得的配置的map做为参数传递给clojurify-structure做了些什么。

(defn clojurify-structure [s]

(prewalk (fn [x]

(cond (instance? Map x) (into {}x)

(instance? List x) (vec x)

true x))

s))

这个函数的功能是将外部数据结构转换成clojure内部的数据结构,在这里,传进来的是java的map,那么会将其转换成clojure的map。另外,prewalk ([f form])的作用简单来说从集合中取值,该值之后会由经过函数计算后的值来替换掉。

因此, (launch-server!(read-storm-config) nimbus)) 中的(read-storm-config) 就是读取storm的配置文件并返回clojure中的map结构。

启动server

启动server的代码的调度是封装在launch-server这个函数内,先看这个函数

(defn launch-server! [conf nimbus]

(validate-distributed-mode! conf)

(let [service-handler (service-handler conf nimbus)

options (-> (TNonblockingServerSocket. (int (confNIMBUS-THRIFT-PORT)))

(THsHaServer$Args.)

(.workerThreads 64)

(.protocolFactory(TBinaryProtocol$Factory.))

(.processor(Nimbus$Processor. service-handler))

)

server (THsHaServer. options)]

(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdownservice-handler) (.stop server))))

(log-message "Starting Nimbus server...")

(.serve server)))

先看看第一条语句(validate-distributed-mode!conf)的作用是从底层从配置中读取storm.cluster.mode的值,判断storm.cluster.mode的值配置的是否合法,即,是"local"返回true,是"distributed"返回false,如果是其他的话则抛出异常。之后,validate-distributed-mode!内部判断如果storm.cluster.mode的值是"local"则抛出异常。总结起来简单的说,这个函数是判断在defaults.yaml中的配置项storm.cluster.mode的值是否是"distributed",如果不是的话则会抛出异常(因为,defaults.yaml默认配置的是"distributed")。

;common.clj

(defn validate-distributed-mode! [conf]

(if(local-mode? conf) ;如果是"local"模式会抛出异常

(throw

(IllegalArgumentException. "Cannot start server in localmode!"))))

;config.clj

(defn local-mode? [conf]

(let [mode (conf STORM-CLUSTER-MODE)]; 从配置的map中取storm.cluster.mode的值

(condp = mode

"local" true

"distributed" false

(throw (IllegalArgumentException. ;不是上面两种,抛出异常

(str "Illegal cluster modein conf: " mode)))

)))

看看launch-server!函数中很重要的一句,service-handler(service-handler conf nimbus)

在启动流程的分析过程中,对于这条语句会先简单分析,之后单独拿出来再做详细分析。

(service-handler conf nimbus) 的service-handler其实是一个函数,这个函数不是直接定义的,而是通过defserverfn来定义的,可以在nimbus.clj中找到

(defserverfn service-handler [conf inimbus]...) 其中...是body太长,这里就省略了。

body内部的主要功能是使用clojure的关键字reify去匿名实现一些接口。

内部实现的最主要的接口是package backtype.storm.generated包中的Nimbus类内部的Iface接口。另外还实现了两个接口,分别是package backtype.storm.daemon中的Shutdownable接口,和common.clj中申明的DaemonCommon协议。

这里需要引申说一下,storm是使用thrift作为网络通讯中间件,它提供了序列化和RPC功能,还支持多语言通讯。因此storm是支持多语言客户端的。

使用thrift工具,对编写的.thrift文件生成指定语言的代码,之后只需要实现.thrift中生成的service对应的接口的代码,组织个server启动下,就完成了RPC服务端的功能了。

在storm中.thrift文件位于src/storm.thrift,使用thrift工具生成的代码位于src\jvm\backtype\storm\generated,也就是都属于jvm目录下的packagebacktype.storm.generated。src/storm.thrift内有一个service Nimbus接口,因此在生成backtype.storm.generated包中的Nimbus类内部有接口Iface需要实现,这个就是RPC接口,server实现这些接口,客户端就可以直接调用这些接口,可以像本地调用函数一样调用。

再看分析下(defserverfn service-handler [conf inimbus] ...) 的功能,这条语句相当于是通过宏包裹了函数,并对原有函数执行体进行异常捕捉。可以看到defserverfn的宏定义如下所示:

;common.clj中的宏defserverfn的实现如下所示

(defmacro defserverfn [name & body]

`(let [exec-fn# (fn ~@body)]

(defn ~name [& args#]

(try-cause

(apply exec-fn# args#)

(catch InterruptedException e#

(throw e#))

(catch Throwable t#

(log-error t# "Error on initialization of server " ~(strname))

(halt-process! 13 "Error on initialization")

)))))

;以service-handler带人到宏内,简化后相当于是

;defn server-handler [& args]

;applay (fn [conf inimbus] ... ) [&args]

;其中,server-handler是函数名,而& args就是 confnimbus,三个点号就是原有的函数执行体。

回到launch-server中,看下options最后存的是什么。

options (-> (TNonblockingServerSocket.(int (conf NIMBUS-THRIFT-PORT)))

(THsHaServer$Args.)

(.workerThreads 64)

(.protocolFactory(TBinaryProtocol$Factory.))

(.processor(Nimbus$Processor. service-handler))

)

先解释下->的用法:

在clojure中->的作用是将参数一次传递给下一个form,作为这个form的第一个参数。如果存在多个form这将第一个form计算的结果传递个下一个form,作为下一个form的第一个参数。

因此options后面的代码真正展开就就是如下调用形式:

(.processor

(.protocolFactory

(.workerThreads

(THsHaServer$Args.

(TNonblockingServerSocket. (int(conf NIMBUS-THRIFT-PORT))))

64)

(TBinaryProtocol$Factory.))

(Nimbus$Processor. service-handler))

这里是涉及到了thriftjava库中的相关代码的调用,因此将引入thritf的代码方便解释流程。首先创建一个packageorg.apache.thrift.transport中的TNonblockingServerSocket对象,将给对象作为参数传递给package org.apache.thrift.server中的THsHaServer的内部类Args的构造函数,这样就得到一个Args对象。对Args对象调用workerThreads 方法,设置woker线程数为64,并返回原来的Args对象。然后调用Args的基类AbstractServerArgs(位于TServer.java中)中的protocolFactory方法,并创建一个package org.apache.thrift.protocol中的TBinaryProtocol的内部类Factory的对象,将Factory对象作为AbstractServerArgs.protocolFactory方法的参数,内部是设置inputProtocolFactory,outputProtocolFactory为传递进去的factory,然后返回this,也就是返回Args对象。

最后,调用Args的基类AbstractServerArgs的processor方法,内部就是传递进去的processor参数(由Nimbus$Processor. service-handler这条语句生成),生成processor的工厂类的对象,之后返回this,也就是Args对象。

因此options就是保存了Args对象。

再来看看let内的最后一条语句 server(THsHaServer. options)。其中(THsHaServer. options)是调用了packageorg.apache.thrift.server中的THsHaServer的构造函数来生成一个THsHaServer对象,构造函数是需要一个Args对象,而options保存的就是Args对象。因此server保存了一个THsHaServer对象。

let块的表达式的第一条语句如下:

(.addShutdownHook (Runtime/getRuntime)(Thread. (fn [] (.shutdown service-handler) (.stop server))))

这句的调用转为成java其实就是Runtime.getRuntime.addShutdownHook(Thread hook),而参数hook,就是上面的(Thread.(fn [] (.shutdown service-handler) (.stop server)))

这样的话,就容易理解了,它的作用是,增加JVM停止时要处理的事件,当JVM停止时会调用这些注册的hook。

在看看let块的表达式的最后一条语句(.serveserver) ,这就是启动thrift server。serve方法是在THsHaServer的基类AbstractNonblockingServer中,serve方法的功能就是接收连接处理调用。

其实,熟悉java中如何启动一个thriftserver,对于上诉的在clojure启动thrift server也会比较清楚。如果不清楚在java中是如何启动thriftserver的,可以自行去参考thrift中自带的例子。

读书人网 >云计算

热点推荐