阻塞队列
阻塞队列是jdk1.5新特性,本质就是一个队列,当队列为空时,消费方阻塞等待,直到队列有新的对象;队列满了时,生产方阻塞等待,直到队列有空位时;它实现了多线程的排队等待。
队列,Queue接口,有先进先出的特性,与List,Set在同一级别,都继承了Collection接口,而BlockingQueue继承了Queue。
?
Queue提供了以下方法:
???? add,offer:add如果队列满了, 则抛出异常IIIegaISlabEepeplian,offer如果满则返回false
???? remove,poll:移除一个元素,为空时,remove抛异常NoSuchElementException,poll返回null,可以配合queue的size作循环
?
?
BlockingQueue提供了以下方法
????? put:类似add方法,但队列满了不抛异常,直接等待。
????? take:类似poll方法,队列空时不返回null,直接阻塞等待。
?
Blocking队列分为4种
LinkedBlockingQueue的容量是没有上限的(说的不准确,在不指定时容量为Integer.MAX_VALUE,不要然的话在put时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。
ArrayBlockingQueue在构造时需要指定容量,并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队列,此队列按 FIFO(先进先出)原则对元素进行排序
PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元素要具有比较能力。
DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。 下面是延迟接口:
- public?interface?Delayed?extends?Comparable<Delayed>?{???????long?getDelay(TimeUnit?unit);??}?
?
使用阻塞队列两个显著的好处就是:多线程操作共同的队列时不需要额外的同步,另外就是队列会自动平衡负载,即那边(生产与消费两边)处理快了就会被阻塞掉,从而减少两边的处理速度差距。
?
?
最后copy了一段代码示例
- public?class?BlockingQueueTest?{??????public?static?void?main(String[]?args)?{??????????Scanner?in?=?new?Scanner(System.in);??????????System.out.print("Enter?base?directory?(e.g.?/usr/local/jdk5.0/src):?");??????????String?directory?=?in.nextLine();??????????System.out.print("Enter?keyword?(e.g.?volatile):?");??????????String?keyword?=?in.nextLine();????????????final?int?FILE_QUEUE_SIZE?=?10;//?阻塞队列大小??????????final?int?SEARCH_THREADS?=?100;//?关键字搜索线程个数????????????//?基于ArrayBlockingQueue的阻塞队列??????????BlockingQueue<File>?queue?=?new?ArrayBlockingQueue<File>(??????????????????FILE_QUEUE_SIZE);????????????//只启动一个线程来搜索目录??????????FileEnumerationTask?enumerator?=?new?FileEnumerationTask(queue,??????????????????new?File(directory));??????????new?Thread(enumerator).start();????????????????????//启动100个线程用来在文件中搜索指定的关键字??????????for?(int?i?=?1;?i?<=?SEARCH_THREADS;?i++)??????????????new?Thread(new?SearchTask(queue,?keyword)).start();??????}??}??class?FileEnumerationTask?implements?Runnable?{??????//哑元文件对象,放在阻塞队列最后,用来标示文件已被遍历完??????public?static?File?DUMMY?=?new?File("");????????private?BlockingQueue<File>?queue;??????private?File?startingDirectory;????????public?FileEnumerationTask(BlockingQueue<File>?queue,?File?startingDirectory)?{??????????this.queue?=?queue;??????????this.startingDirectory?=?startingDirectory;??????}????????public?void?run()?{??????????try?{??????????????enumerate(startingDirectory);??????????????queue.put(DUMMY);//执行到这里说明指定的目录下文件已被遍历完??????????}?catch?(InterruptedException?e)?{??????????}??????}????????//?将指定目录下的所有文件以File对象的形式放入阻塞队列中??????public?void?enumerate(File?directory)?throws?InterruptedException?{??????????File[]?files?=?directory.listFiles();??????????for?(File?file?:?files)?{??????????????if?(file.isDirectory())??????????????????enumerate(file);??????????????else??????????????????//将元素放入队尾,如果队列满,则阻塞??????????????????queue.put(file);??????????}??????}??}??class?SearchTask?implements?Runnable?{??????private?BlockingQueue<File>?queue;??????private?String?keyword;????????public?SearchTask(BlockingQueue<File>?queue,?String?keyword)?{??????????this.queue?=?queue;??????????this.keyword?=?keyword;??????}????????public?void?run()?{??????????try?{??????????????boolean?done?=?false;??????????????while?(!done)?{??????????????????//取出队首元素,如果队列为空,则阻塞??????????????????File?file?=?queue.take();??????????????????if?(file?==?FileEnumerationTask.DUMMY)?{??????????????????????//取出来后重新放入,好让其他线程读到它时也很快的结束??????????????????????queue.put(file);??????????????????????done?=?true;??????????????????}?else??????????????????????search(file);??????????????}??????????}?catch?(IOException?e)?{??????????????e.printStackTrace();??????????}?catch?(InterruptedException?e)?{??????????}??????}??????public?void?search(File?file)?throws?IOException?{??????????Scanner?in?=?new?Scanner(new?FileInputStream(file));??????????int?lineNumber?=?0;??????????while?(in.hasNextLine())?{??????????????lineNumber++;??????????????String?line?=?in.nextLine();??????????????if?(line.contains(keyword))??????????????????System.out.printf("%s:%d:%s%n",?file.getPath(),?lineNumber,??????????????????????????line);??????????}??????????in.close();??????}??}?