java并发学习之六:JCSP(Java Communicating Sequential Processes)实践
首先得描述下什么是JCSP:CSP是Communicating Sequential Processes的缩写
在ibm的developerworks 中国上学习java的并发编程时无意发现的,因为在之前的多线程编程实践中,调试一个多线程的bug都花了大量的时间,所以一直想找点工具或者技巧来学习一下
然后就发现了这个玩意儿,这是JCSP在ibm的地址:
并发专题
JCSP第一部分
JCSP第二部分
JCSP第三部分
JCSP的SVN地址
文中有这样一段话:
“确实想进行多线程编程的开发人员通常准备好了以下一个或两个解决方案(至少是一部分):
长时间艰苦地测试代码,找出所有出现的并发性问题,诚心地希望到应用程序真正运行地时候已经发现并修复了所有这类问题。
大量运行设计模式和为多线程编程建立的指导原则。但是,这类指导原则只在整个系统都按照它们的规范设计的时候才有效,没有设计规则能够覆盖所有类型的系统。
虽然知道的人不多,但是对于编写(然后验证)正确的多线程应用程序这一问题,还有第三个选项。使用称为通信顺序进程(Communicating Sequential Processes,CSP)的精确的线程同步的数学理论,可以在设计时最好地处理死锁和活动锁之类的问题。CSP 由 C.A.R. Hoare 与 20 世纪 70 年代后期设计,CSP 提供了有效的方法,证明用它的构造和工具构建的系统可以免除并发的常见问题。”
就因为这一段话,勾起来我对它的兴趣
详细看下去,大概明白了它的一个原理:将所有的任务抽象为CSProcess,这些任务之间通信用一种既定的协议:通道,任务之间除了通道之外无任何联系,然后除了通道传递的参数,其他的变量均为任务私有,所以完全地避免了往常多线程编码中共享变量的访问问题
先不考虑效率,先设想现在并发编程存在的问题:现在的并发开发,锁,happen-before规则,volatile,基本完全颠覆了我们往常的思维习惯:AB操作明明是连续发生的,但却要考虑中间经过了n多操作。我们需要考虑:A操作后面发生了什么操作,B操作必须发生在A操作之前,怎么保证,等等。这些都是很痛苦的事情,尤其是设计一个很大的并发的框架,像在《Concurrency in practice》里面说的,你在使用(修改,增强)一个并发的设计之前,你必须弄清楚它的锁策略。当一个并发的设计膨胀到一个很大的程度的时候,怎么办?
所以在某种程度上,需要考虑牺牲它的效率了(个人认为),JCSP很优雅地对并发的过程进行了抽象。将一个并发的设计分隔为一个个独立的而且明确的任务,并且能够简单有效地组合起来。当一个并发的设计到达一定程度的时候,可以将子模块用绝对效率的方式来优化,在大模块上用JCSP来关联,在追求效率的同时,也具有了稳健性,也不失为一种选择(尤其对于小企业,与其让一堆对并发只是模棱两可的人来瞎折腾,不如降低一点效率,让应用更稳健)
怀着这样的想法,就拿着JCSP写了点代码来感受一下,也是一个线程池,不过就换用JCSP来实现了
画了个大概的流程图
很简单的一个步骤
代码如下:(测试代码见线程池(三)的测试代码)
public class ThreadPoolTest3WithJCSP implements Executor {/** * 用于发送任务 */private ChannelOutput startCommandOut;public ThreadPoolTest3WithJCSP(final int threadNum) {//最繁琐的是创建通道,然后让通道的两边分别连到不同的CSProcess上ArrayList<CSProcess> processList = new ArrayList<CSProcess>();One2OneChannel channelUsedForController = StandardChannelFactory.getDefaultInstance().createOne2One();startCommandOut = channelUsedForController.out();One2OneChannel[] channelUsedForExecuter = StandardChannelFactory.getDefaultInstance().createOne2One(threadNum);ChannelOutput[] channeloutputUsedForExecuter = new ChannelOutput[threadNum];for(int i = 0;i<threadNum;i++){channeloutputUsedForExecuter[i] = channelUsedForExecuter[i].out();}StandardChannelIntFactory intFactory = new StandardChannelIntFactory();One2OneChannelInt channelUsedForAfterExecuted = intFactory.createOne2One();One2OneChannelInt[] channelUsedForNotifyTask = intFactory.createOne2One(threadNum);AltingChannelInputInt[] channelInputUsedForNotifyTask = new AltingChannelInputInt[threadNum];for(int i = 0;i<threadNum;i++){channelInputUsedForNotifyTask[i] = channelUsedForNotifyTask[i].in();}ThreadExecuteController controller = new ThreadExecuteController(threadNum, channelUsedForController.in(),channelUsedForAfterExecuted.in(), channeloutputUsedForExecuter);processList.add(controller);ExecuteProcess[] exeProcesses = new ExecuteProcess[threadNum];for(int i = 0;i<threadNum;i++){exeProcesses[i] = new ExecuteProcess(channelUsedForExecuter[i].in(), channelUsedForNotifyTask[i].out(), i);processList.add(exeProcesses[i]);}AfterThreadExecuteController atec = new AfterThreadExecuteController(channelInputUsedForNotifyTask, channelUsedForAfterExecuted.out());processList.add(atec);final CSProcess[] pArray = new CSProcess[processList.size()];for(int i = 0;i<pArray.length;i++)pArray[i] = processList.get(i);new Thread(new Runnable() {@Overridepublic void run() {new Parallel(pArray).run();}}).start();}@Overridepublic void execute(Runnable command) {startCommandOut.write(command);}//任务执行者static private class ExecuteProcess implements CSProcess{final ChannelInput in;final ChannelOutputInt out;final int index;public ExecuteProcess(ChannelInput i,ChannelOutputInt o,final int index) {in = i;out = o;this.index = index;}@Overridepublic void run() {while(true){Runnable r = (Runnable)in.read();r.run();System.out.println("complete");out.write(index);}}}//任务控制者static private class ThreadExecuteController implements CSProcess{private final int MAXTHREADNUM;final ChannelOutput[] executeCS;private Queue<ChannelOutput> waitingThreadQueue = new LinkedList<ChannelOutput>();private Queue<Runnable> waitingTask = new LinkedList<Runnable>();Alternative executeAlt;final int COMMANDIN = 0;final int COMPLETECSINDEX = 1;AltingChannelInput commandIn;AltingChannelInputInt completeCSIndex;public ThreadExecuteController(int max,AltingChannelInput commandIn,AltingChannelInputInt completeCSIndex,ChannelOutput[] executeCS) {MAXTHREADNUM = max;this.executeCS = executeCS;executeAlt = new Alternative(new Guard[]{commandIn,completeCSIndex});this.commandIn = commandIn;this.completeCSIndex = completeCSIndex;for(ChannelOutput co:executeCS){waitingThreadQueue.offer(co);}}public void run() {while(true){switch(executeAlt.select()){//如果是有新任务运行case COMMANDIN:{ChannelOutput o = waitingThreadQueue.poll();if(o == null){waitingTask.offer((Runnable)commandIn.read());}else{o.write(commandIn.read());}break;}//如果是有任务结束case COMPLETECSINDEX:{//本应该先看看是否有正好需要运行的任务//暂时不知道怎么让select立刻返回,所以只能直接从等待队列取Runnable r = waitingTask.poll();if(r == null){int i = completeCSIndex.read();waitingThreadQueue.offer(executeCS[i]);}else{int i = completeCSIndex.read();executeCS[i].write(r);}break;}}}}}//任务结束管理者(暂时没做啥事,就把结束的任务索引丢回给管理者)static private class AfterThreadExecuteController implements CSProcess{final ChannelOutputInt out;Alternative executedAlt;public AfterThreadExecuteController(AltingChannelInputInt[] indexes,ChannelOutputInt o) {out = o;executedAlt = new Alternative(indexes);}public void run() {while(true){//maybe do something elseout.write(executedAlt.select());}}}}
跑了一下,发现一下就死锁了~

管理者,阻塞在:executeCS[i].write(r);

执行者,阻塞在:o.write(commandIn.read());
貌似还是有死锁的问题啊,不过由于全是一个个锁围绕着,找问题简单多了,倒是文中所说的验证方法不知道是什么
大概调试了一下,基本上是没加缓冲区导致的,但效率差得惊人,大概是任务太简单了,然后都是线程切换的消耗,真正执行任务的计算反而少了,毕竟不是应用于这个场景的东西,也正常了。
本来打算看下源码的(因为之前以为里面有比较多的奥妙),调试之后发现,每个CSProcess都是用一个守护线程,然后通道都是阻塞的,基本上也没太多奇妙之处,文章中也说基本没使用1.5以后的同步相关的特性,不看也罢。只是这个抽象还是挺好的,很简单就能整理清楚自己的实现思路。