线程阻塞
线程阻塞队列
文章分类:Java编程
?
- ??/* ?编写一个线程安全、大小固定的队列 ?提供阻塞式的方法put,若队列没有空间,则方法put会一直等待 ?提供阻塞式的方法take,若队列为空,则方法take会一直等待 ?启动30个线程操作该队列,每个线程进行一次put和一次take操作?? ?*/????/* ?已经按照顺序获得读锁和写锁了,但是如果启动30个线程的话,基本上每次都会死锁,线程都停在read_lock.wait()的位置, ?如果启动20个线程就只有一半的几率会死锁(其实都在等待read_lock的锁,不能说是死锁),但每一个线程take一次必然会put一次, ?或者反过来,按说是不会有都等待read_lock的情况 ?*/????package?com.huawei.test; ?? ??import?java.util.*; ?? ??public?class?Queue1 ?? { ?????final?int?SIZE?=?10;?//队列固定大小 ?? ???ArrayList?store?=?new?ArrayList(SIZE); ?? ?????Object?write_lock?=?new?Object();//用于对store的写操作,如get/add/set/remove ?? ???Object?read_lock?=?new?Object();?//用于对store只读操作,如取size ?? ?????public?Queue1(){} ?? ?????public?void?put?(Object?o)?//没有空间一直等待 ?? ???{ ?????????while(true){ ?? ???????????synchronized(read_lock){ ?? ???????????????try{ ?? ???????????????????if(store.size()?==?SIZE){ ?? ???????????????????????read_lock.wait();//如果队列已满,就释放锁 ?? ???????????????????}else{ ?? ???????????????????????synchronized(write_lock){ ?? ???????????????????????????Thread.sleep(50); ?? ???????????????????????????store.add(o);?//增加元素到队列 ?? ???????????????????????????System.out.println(Thread.currentThread().getName()?+?"****PUT::Size="?+?store.size()); ?? ???????????????????????????Thread.sleep(50); ?? ???????????????????????????read_lock.notifyAll();?//通知其他线程 ?? ???????????????????????????break; ?? ?????????????????????????} ?????????????????????} ?????????????????}catch(Exception?ex){ ?? ???????????????????????ex.printStackTrace(System.err); ?????????????????} ?????????????} ?????????} ?????} ?????????public?Object?take?()?//没有数据一直等待 ?? ???{ ?????????while(true){ ?? ???????????synchronized(read_lock){ ?? ???????????????try{ ?? ???????????????????if(store.size()?==?0){ ?? ???????????????????????read_lock.wait();//如果队列没有数据,就释放锁 ?? ???????????????????}else{ ?? ???????????????????????synchronized(write_lock){ ?? ???????????????????????????Thread.sleep(50); ?? ???????????????????????????Object?obj?=?store.remove(0);?//从队列头移走数据 ?? ???????????????????????????System.out.println(Thread.currentThread().getName()?+?"****Take::Size="?+?store.size()); ?? ???????????????????????????Thread.sleep(50); ?? ???????????????????????????read_lock.notifyAll();//通知其他线程 ?? ???????????????????????????return?obj; ?? ???????????????????????} ?????????????????????} ?????????????????}catch(Exception?ex){ ?? ???????????????????ex.printStackTrace(System.err); ?????????????????} ?????????????} ?????????} ?????} ?????????public?static?void?main(String[]?args){ ?? ???????Queue1?queue1?=?new?Queue1();?//创建一个队列 ?? ?????????for(int?i?=?0;?i?<?30;?i++){?//启动30个线程访问队列 ?? ???????????TestThread?thread?=?new?TestThread(queue1,i); ?? ???????????System.out.println(?"--Thread:"?+?i?+?"?Start!"?); ?? ???????????thread.start(); ?????????????try{ ?? ???????????????Thread.sleep(10);?//没隔十毫秒启动一个线程 ?? ???????????}catch(Exception?ex){ ?? ???????????????ex.printStackTrace(System.err); ?????????????} ?????????} ?????} ????} ??????class?TestThread?extends?Thread ?? { ?????Queue1?queue1?=?null; ?? ???int?sn?=?0; ?? ?????public?TestThread(Queue1?queue1,int?sn){ ?? ???????this.queue1?=?queue1; ?? ???????this.sn?=?sn; ?? ???????setName("Thread::"?+?sn);?//以序号作为线程名 ?? ???} ???????public?void?run(){ ?? ???????String?tmp?=?null; ?? ???????try{ ?? ???????????if(?sn?<?7){?//sn小于7的线程先put,后take ?? ???????????????tmp?=?"Thread-PUT::"?+?sn?+?"---put::"; ?? ???????????????queue1.put(tmp); ?????????????????Thread.sleep(10); ?? ???????????????tmp?=?"Thread-Take::"?+?sn?+?"---take::"; ?? ???????????????Object?obj?=?queue1.take(); ?????????????}else{?//sn大于7的线程先take,后put ?? ???????????????tmp?=?"Thread-Take::"?+?sn?+?"---take::"; ?? ???????????????Object?obj?=?queue1.take(); ?????????????????Thread.sleep(10); ?? ???????????????tmp?=?"Thread-PUT::"?+?sn?+?"---put::"; ?? ???????????????queue1.put(tmp); ?????????????} ?????????????System.out.println("Thread::"?+?sn?+?"?task?over!"); ?? ???????}catch(Exception?ex){ ?? ???????????ex.printStackTrace(System.err); ?????????} ???????} ??}??