读书人

Akka2施用探索5(Typed Actors)

发布时间: 2012-09-22 21:54:54 作者: rapoo

Akka2使用探索5(Typed Actors)

Akka 中的有类型 Actor 是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。

有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。 对普通actor来说,你拥有一个外部API (public接口的实例) 来将方法调用异步地委托给其实现类的私有实例。

有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约, 你不需要定义你自己的消息, 它的劣势在于对你能做什么和不能做什么进行了一些限制,比如 你不能使用 become/unbecome.

有类型Actor是使用 JDK Proxies 实现的,JDK Proxies提供了非常简单的api来拦截方法调用。

注意

和普通Akka actor一样,有类型actor也一次处理一个消息。

什么时候使用有类型的Actor

有类型的Actor很适合用在连接actor系统和非actor的代码,因为它可以使你能在外部编写正常的OO模式的代码。但切记不可滥用。

工具箱

返回有类型actor扩展 Returns the Typed Actor Extension
TypedActorExtension extension =
TypedActor.get(system); //system is an instance of ActorSystem

判断一个引用是否是有类型actor代理 Returns whether the reference is a Typed Actor Proxy or not
TypedActor.get(system).isTypedActor(someReference);

返回一个外部有类型actor代理所代表的Akka actor Returns the backing Akka Actor behind an external Typed Actor Proxy
TypedActor.get(system).getActorRefFor(someReference);

返回当前的ActorContext//Returns the current ActorContext,
此方法仅在一个TypedActor 实现的方法中有效 // method only valid within methods of a TypedActor implementation
ActorContext context = TypedActor.context();

返回当前有类型actor的外部代理//Returns the external proxy of the current Typed Actor,
此方法仅在一个TypedActor 实现的方法中有效// method only valid within methods of a TypedActor implementation
Squarer sq = TypedActor.<Squarer>self();


返回一个有类型Actor扩展的上下文实例//Returns a contextual instance of the Typed Actor Extension
这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor//this means that if you create other Typed Actors with this,
//they will become children to the current Typed Actor.
TypedActor.get(TypedActor.context());

具体例子及说明

package practise.akka.typedactorsimport akka.dispatch.Futureimport akka.japi.Option/** * 这个就是对外的接口,各函数就是Typed Actor的接口方法 */public interface Squarer {    void squareDontCare(int i); //fire-forget    Future<Integer> square(int i); //non-blocking send-request-reply    Option<Integer> squareNowPlease(int i);//blocking send-request-reply    int squareNow(int i); //blocking send-request-reply}

package practise.akka.typedactorsimport akka.dispatch.Futureimport akka.dispatch.Futuresimport akka.actor.TypedActorimport akka.japi.Optionimport akka.actor.ActorContextimport groovy.util.logging.Log4jimport akka.actor.ActorRef/** * 这个是接口实现。(实现akka.actor.TypedActor.Receiver接口就能接收actor发来的普通消息(非函数调用消息)。) */@Log4jclass SquarerImpl implements Squarer, akka.actor.TypedActor.Receiver {    private String name;    public SquarerImpl() {        this.name = "default";    }    public SquarerImpl(String name) {        this.name = name;    }    public void squareDontCare(int i) {        log.debug("squareDontCare,fire-and-forget只接收不返回结果,与ActorRef.tell完全一致----" + i)   //可以从线程号看出是异步处理的        int sq = i * i; //Nobody cares :(        //返回当前的ActorContext,        // 此方法仅在一个TypedActor 实现的方法中有效        ActorContext context = TypedActor.context();        println "context ---- " + context        //返回当前有类型actor的外部代理,        // 此方法仅在一个TypedActor 实现的方法中有效        Squarer mysq = TypedActor.<Squarer> self();        println "--self --" + mysq    }    public Future<Integer> square(int i) {        log.debug("square send-request-reply Future----" + i)   //可以从线程号看出是异步处理的        return Futures.successful(i * i, TypedActor.dispatcher());    }    public Option<Integer> squareNowPlease(int i) {        log.debug("squareNowPlease send-request-reply Option----" + i)   //可以从线程号看出是异步处理的        return Option.some(i * i);    }    public int squareNow(int i) {        log.debug("squareNow send-request-reply result----" + i)   //可以从线程号看出是异步处理的        return i * i;    }    @Override    void onReceive(Object o, ActorRef actorRef) {        log.debug("TypedActor收到消息----${o}---from:${actorRef}")    }}

package practise.akka.typedactorsimport akka.actor.ActorSystemimport akka.actor.TypedActorimport akka.actor.TypedPropsimport com.typesafe.config.ConfigFactoryimport akka.japi.Creatorimport groovy.util.logging.Log4jimport akka.actor.ActorContext/** * 这里创建Typed Actor. */@Log4jclass TypedActorsFactory {    ActorSystem system    private final String config = """akka {    loglevel = "${log?.debugEnabled ? "DEBUG" : "INFO"}"    actor.provider = "akka.remote.RemoteActorRefProvider"    remote.netty.hostname = "127.0.0.1"    remote.netty.port = 2552    remote.log-received-messages = on    remote.log-sent-messages = on}"""    TypedActorsFactory(String sysName) {        this.system = ActorSystem.create(sysName, ConfigFactory.parseString(config))    }    Squarer getTypedActorDefault() {        Squarer mySquarer =            TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class));        //这里创建的是代理类型        return mySquarer    }    Squarer getTypedActor(String name) {        Squarer otherSquarer =            TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class,                    new Creator<SquarerImpl>() {                        public SquarerImpl create() { return new SquarerImpl(name); }  //这里创建的是具体的实现类型                    }),                    name);  //这个name是actor的name:akka//sys@host:port/user/name        return otherSquarer    }}

下面用几个测试用例实验一下

package practise.akka.typedactorsimport akka.actor.ActorRefimport akka.actor.TypedActorimport akka.actor.UntypedActorContextimport akka.dispatch.Futureimport com.baoxian.akka.AkkaClientNoReplyimport com.baoxian.akka.AkkaServerAppclass TestTypedActors extends GroovyTestCase {    def testTypeActor() {        println("----")        TypedActorsFactory factory = new TypedActorsFactory("typedServer")//        Squarer squarer = factory?.getTypedActorDefault()   //创建代理        Squarer squarer = factory?.getTypedActor("serv")      //具体实现        squarer?.squareDontCare(10)        Future future = squarer?.square(10)        AkkaServerApp app = new AkkaServerApp("tmp", "127.0.0.1", 6666, "result")   //这是我自己构建的接收器        app.messageProcessor = {msg, UntypedActorContext context ->            log.info("结果为" + msg)        }        app.startup()        akka.pattern.Patterns.pipe(future).to(app.serverActor)    //Future的返回结果pipe到接收器中了,在log中能看到结果        println "----" + squarer?.squareNowPlease(10)?.get()        println "----" + squarer?.squareNow(10)        //返回有类型actor扩展        TypedActor.get(factory.system)        //返回一个外部有类型actor代理所代表的Akka actor        ActorRef actor = TypedActor.get(factory.system).getActorRefFor(squarer);        actor.tell("消息")     //这个消息将会在SquarerImpl的onReceive方法中接收到        sleep(1000 * 60 * 10)//        TypedActor.get(factory.system).stop(squarer);   //这将会尽快地异步终止与指定的代理关联的有类型Actor        TypedActor.get(factory.system).poisonPill(squarer);//这将会在有类型actor完成所有在当前调用之前对它的调用后异步地终止它    }    def testRemoteTypedActor() {        AkkaClientNoReply client = new AkkaClientNoReply("akka://typedServer@127.0.0.1:2552/user/serv")        client.send("远程消息")      //这将会在SquarerImpl的onReceive方法中接收到        sleep(1000)        client.shutdown()    }}

读书人网 >编程

热点推荐