读书人

Twitter Kestrel怎么使用Netty以及Net

发布时间: 2012-06-27 14:20:09 作者: rapoo

Twitter Kestrel如何使用Netty以及Netty scala压测代码

Twitter的核心队列Kestrel使用Netty作为通信模块,从另一个角度证明了Netty的性能和健壮。

Netty是否比MINA强?从底层实现,两者几乎差不多,但Netty的优势是从架构上采用事件通知机制,真正的将异步模式引入来解决各种场景。响应时间可能会加长,但优势在于系统之间的依赖减弱,自身处理能力的决定因素自封闭(瓶颈可以直接根据自身业务处理资源消耗情况估计出来)

?

我们看看Twitter是怎么用Netty。Twitter很多项目都是用scala写的,scala是很简洁的语言,直接运行在jvm上。可以直接调用Java类。下边的代码都是来自Twitter的核心队列项目Kestrel。这个项目很有意思,可能以后还会讨论,这里先说说怎么用Netty。


NettyHandler.scala是处理Netty网络事件的基类,其他具体协议实现类,MemcacheHandler和TextHandler都继承NettyHandler。NettyHandler应用Netty的ChannelUpStreamHandler接口,这个接口处理上行请求。同时继承KestrelHandler。KestrelHandler处理Kestrel消息队列的行为,包括getItem、setItem等等。


Twitter Kestrel怎么使用Netty以及Netty scala压测代码
?

NettyHandler主要方法是handleUpstream。处理上行请求:MessageEvent,ChannelStatEvent,等等。这些实现基本上参照Netty官网给的sample很容易实现。方法不长,才40多行,用scala写出来,有点小清新:)

def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {    event match {      case m: MessageEvent =>        // 具体实现由协议实现类MemcacheHandler等实现        handle(m.getMessage().asInstanceOf[M])      case e: ExceptionEvent =>        // 异常处理        e.getCause() match {          case _: ProtocolError =>            handleProtocolError()          case e: ClosedChannelException =>            finish()          case e: IOException =>            log.debug("I/O Exception on session %d: %s", sessionId, e.toString)          case e =>            log.error(e, "Exception caught on session %d: %s", sessionId, e.toString)            handleException(e)        }        e.getChannel().close()      case s: ChannelStateEvent =>        // 目前状态为connected但statevent.getValue is null,中断连接        if ((s.getState() == ChannelState.CONNECTED) && (s.getValue() eq null)) {          finish()        } else if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {          // 创建连接          channel = s.getChannel()          remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]          if (clientTimeout.isDefined) {            channel.getPipeline.addFirst("idle", new IdleStateHandler(Kestrel.kestrel.timer, 0, 0, clientTimeout.get.inSeconds.toInt))          }          channelGroup.add(channel)          // don't use `remoteAddress.getHostName` because it may do a DNS lookup.          log.debug("New session %d from %s:%d", sessionId, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort)        }      case i: IdleStateEvent =>        // 增加idel监控        log.debug("Idle timeout on session %s", channel)        channel.close()      case e =>        // 其他消息继续发出upstream事件        context.sendUpstream(e)    }  }

?

MemcacheHandler和TextHandler是协议具体的实现。继承NettyHandler。因为Memcached协议比较简单,所以协议实现类就不多说了。阅读这些代码主要的障碍还是在于Java程序员对于某些scala的语法不习惯。我这里介绍个简单但是常用的:Scala的泛型。Scala创始人Martin Odersky曾说过,泛型正是他想要创建Scala语言的最重要因素之一。当然Java1.5以后已经引入了泛型,我们对这个东东已经很熟悉了。看看Twitter怎么使用Scala泛型。比教科书上生动很多。和Java使用<>指定泛型类似,NettyHandler中Scala的泛型M,放在[]里。

?

abstract class NettyHandler[M](

? val channelGroup: ChannelGroup,

? queueCollection: QueueCollection,

? maxOpenTransactions: Int,

? clientTimeout: Option[Duration])

extends KestrelHandler(queueCollection, maxOpenTransactions) with?

?

ChannelUpstreamHandler {

...

? def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {

? ? event match {

? ? ? case m: MessageEvent =>

? ? ? ? handle(m.getMessage().asInstanceOf[M])

? }

...

}

?

在NettyHandler中,任何MessageEvent都被转换为泛型M,并交给子类处理。TextHandler和MemcacheHandler是这样给自己的泛型定义的。

class TextHandler( ...) extends NettyHandler[TextRequest](...)?

class MemcacheHandler(...) extends NettyHandler[MemcacheRequest](...)?

?

接下来我们自己写一个Scala程序。

Netty服务器压测代码网上有不少版本,基本思路就是实现一个简单的echo handler。还可以添加了一个server主动push的部分。代码用scala实现,可以作为朋友们学习scala的例子。

?

?

import org.jboss.netty.channel._import org.jboss.netty.buffer._import org.jboss.netty.bootstrap.ServerBootstrapimport java.util._import java.util.concurrent._import java.io._import java.net._import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import scala.collection.mutableobject NettyLoadServer {def main(args: Array[String]): Unit = {val testServer = new NettyLoadServer();testServer.loadTest();}}class NettyLoadServer {var channel: Channel = nullprivate var remoteAddress: InetSocketAddress = nullval channels = new mutable.ListBuffer[Channel];var number = 0;class LoadTestHandler extends SimpleChannelHandler with ChannelUpstreamHandler {        override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent)        {            e.getCause().printStackTrace();            channels -= e.getChannel()            e.getChannel().close();        }        override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {            e.getChannel().write(e.getMessage());        }                override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent) {            e match {        case s: ChannelStateEvent =>                if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {                    channel = s.getChannel()                        remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]                        channels += channel                                                          System.out.println("New session from " + remoteAddress.getAddress.getHostAddress +                        ":" + remoteAddress.getPort)                        }                case e =>                // ignore            }            super.handleUpstream(ctx, e);        }}  class ChannelManagerThread extends Thread { override def run() { while (true) { try {System.out.println("channels.size() = " + channels.count(c => c.isInstanceOf[Channel]));for(s <- channels) {var cb = new DynamicChannelBuffer(256); cb.writeBytes("abcd1234".getBytes()); s.write(cb); }Thread.sleep(500); }catch { case e => e.printStackTrace();} } } } def loadTest() {try {val factory = new NioServerSocketChannelFactory(Executors       .newCachedThreadPool(), Executors.newCachedThreadPool()); val bootstrap = new ServerBootstrap(factory); val handler = new LoadTestHandler(); val pipeline = bootstrap.getPipeline(); pipeline.addLast("loadtest", handler); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(8007)); val cmt = new ChannelManagerThread(); cmt.start(); } catch {case e => e.printStackTrace();}}}

附件里是我的scala sbt工程。

?

压测client推荐使用Jboss自己的Benchmark:

http://anonsvn.jboss.org/repos/netty/subproject/benchmark/

?

用ab也可以:

ab -n 20000 -c 20000 -k -t 999999999 -r http://192.168.1.2:8007/

?

补充:Twitter还有很多很有意思的项目,希望有兴趣的朋友一起来研究学习。

?

读书人网 >软件架构设计

热点推荐