多线程摘录 002
??? }
}
对于只有单个需要代理的对象, 可以很方便的应用上述的"代理"方式, 对于多个相互独立的对象而已, 也可以使用
public class VisualComponent {
??? private final List<KeyListener> keyListeners = new CopyOnWriteArrayList<KeyListener>();
??? private final List<MouseListener> mouseListeners = new CopyOnWriteArrayList<MouseListener>();
??? public void add(..);
??? public void remove(...);
}
但是如果这些对象有相互关系, 就出问题了
public class NumberRange { //显然setLower, setUpper, isInRange三个方法的调用没有原子性保障, 而且isInRange同时依赖到两个相关的值, 很容易导致结果错误. 需要增加某些锁定来保证. 或者运行的情况下, 把两个对象合并为一个
如何给线程安全的类增加方法
1) 修改源代码, 最好. 需了解原来的线程安全的策略
2) 继承类, 并增加synchronized方法.
3) 用Decorator模式, 把类包起来, 增加新方法, 然而需要注意锁定的对象. 比如下面是不对的
public class ListHelper<E> {
??? //锁定对象为返回的list
??? public List<E> list = Collections.synchronizedList(new ArrayList<E>());
??? ...
??? public synchronized boolean putIfAbsent(E x) { //锁定对象为ListHelper对象
??????? boolean absent = !list.contains(x);
??????? if (absent)
??????????? list.add(x);
??????? return absent;
??? }
}
因为两个同步的锁定不是同一个对象, 因此不同的线程可以分别操作putIfAbsent方法和list对象固有的方法. 如果要让这个类正确运行, 需要修正如下
public class ListHelper<E> {
??? //锁定对象为list
??? public List<E> list =Collections.synchronizedList(new ArrayList<E>());
??? ...
??? public boolean putIfAbsent(E x) {
??????? synchronized (list) { //锁定对象为list
??????????? boolean absent = !list.contains(x);
??????????? if (absent)
??????????????? list.add(x);
??????????? return absent;
??????? }
??? }
}
使用synchronized collection的原子性的问题
比如下面的代码:
public class VectorHelper{public static Object getLast(Vector list) { int lastIndex = list.size() - 1; return list.get(lastIndex);}public static void deleteLast(Vector list) { int lastIndex = list.size() - 1; list.remove(lastIndex);}}非常简单, 很多代码都是这样写的, 但是, 这段代码如果要正常运行, 必须是在假设Vertor对象是不被多个线程共享的情况下. 因为虽然Vector本身是线程安全的, 但VectorHelper不是, 并且getLast和deleteLast同时依赖于Vector.size()来判断最后一个元素. 很容易造成ArrayIndexOutOfBoundsException. 如果Vector被多个线程共享, 最简单的就是加上同步, 然后对一个集合的同步会带来很大的性能代价, 因为阻止了其他线程对集合的访问, 特别是当集合很大并且处理的任务非常大的情况下.
另一种变通的方法是, 在遍历集合之前, 复制一份集合的引用. 当然集合复制也有细微的性能代价.
被隐藏的Iterator
public class HiddenIterator { @GuardedBy("this") private final Set<Integer> set = new HashSet<Integer>(); public synchronized void add(Integer i) { set.add(i); } public synchronized void remove(Integer i) { set.remove(i); } public void addTenThings() { Random r = new Random(); for (int i = 0; i < 10; i++) add(r.nextInt()); }}能想象这么简单的代码会在什么地方出错么? 答案是最后一行, ??? ??? }
输出结果:
key: 1, value: null
key: 3, value: three
key: 2, value: two
很短的代码, 一下执行完了, 没有再报错, 可以发现remove操作没有影响到遍历抛出异常. 然而, key: 1, value: null 这行输出说明了我们还留着对这个key的引用.
ConcurrentHashMap有几种新的风格的方法, put-if-absent, remove-if-equal, replace-if-equal, 比如上面的 m.remove("1", "one"); 就是, 如果改为 m.remove("1", "two"); 那么["1","one"]这一对就被保留下来了
CopyOnWriteArrayList, 顾名思义, 就能猜到每次都这个类做了改动, 就会copy一份原来的数组, 在改动后替换原来的数组, 再返回给调用者
BlockingQueue是基于 生产者-消费者 的模式构建的, 包括LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue,SynchronousQueue 4个实现类. 看一点示例代码:
??? public static void putAndOffer() throws InterruptedException {
??? ??? BlockingQueue<String> q = new ArrayBlockingQueue<String>(1);
??? ??? q.offer("1");
??? ??? System.out.println("offer A done");
??? ??? q.offer("2");
??? ??? System.out.println("offer B done");
??? ???
??? ??? q.clear();
??? ??? q.put("A");
??? ??? System.out.println("put A done");
??? ??? q.put("B");
??? ??? System.out.println("put B done");
??? }
输出结果:
offer A done
offer B done
put A done
我们指定了BlockingQueue的容量为1, offer方法是不阻塞的, 所以q.offer("2");直接返回false, 而put方法是阻塞的, 所以System.out.println("put B done");一直没有执行.
PriorityBlockingQueue: 对于加入队列的元素是可排序的
SynchronousQueue: 不能算是queue的实现, 但是模仿了queue的行为, 更像是worker模式的实现. 因为它内部维护了一组用于处理元素的线程, 并且直接把加入队列的元素分配给处理该元素的线程. 如果没有可用线程, 就阻塞了.
BlockingQueue实例代码
下面是摘录的代码, 使用BlockingQueue, 分配N个线程执行文件索引的任务, 很经典:
public class FileCrawler implements Runnable { //这个是producer角色 private final BlockingQueue<File> fileQueue; private final FileFilter fileFilter; private final File root; ... public FileCrawler(File root, BlockingQueue queue, FileFilter filter){ ... } public void run() { try { crawl(root); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } /*把该目录下所有的文件夹都放进一个queue, 如果满了, 就阻塞*/ private void crawl(File root) throws InterruptedException { File[] entries = root.listFiles(fileFilter); if (entries != null) { for (File entry : entries) if (entry.isDirectory()) crawl(entry); else if (!alreadyIndexed(entry)) fileQueue.put(entry); //把找到的目录放入队列 } }}public class Indexer implements Runnable { //这个是consumer角色 private final BlockingQueue<File> queue; public Indexer(BlockingQueue<File> queue) { // queue在indexer和FileCrawler之间共享 this.queue = queue; } public void run() { try { while (true) indexFile(queue.take()); //从queue中获取一个File对象进行索引 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }}
//这个是客户端的方法, 可以假设roots是在界面设置的几个目录public static void startIndexing(File[] roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND); FileFilter filter = new FileFilter() { public boolean accept(File file) { return true; } }; for (File root : roots) new Thread(new FileCrawler(queue, filter, root)).start(); //此处是否可以考虑使用线程池更有效? 而且, 如果有些目录层次非常深, //就会有某些线程运行时间非常长, 相反有些线程非常快就执行完毕. //最恶劣的情况是可能其他线程都完成了, 而退化到只有一个线程中运行, //成为"单线程"程序? for (int i = 0; i < N_CONSUMERS; i++) new Thread(new Indexer(queue)).start(); //此处是否可以考虑使用线程池更有效?}基于上述这种"单线程"的考虑, 不谋而合的, JDK6引入了一种"Work Stealing"的模式, 即N个线程中运行, 每个线程处理自己的事情, 一旦自己手头的事情处理, 那么就去尝试"偷"来其他线程的任务来运行. 这样一来, 系统就会时刻保持多个线程中处理任务, 而不是出现"一人忙活,大家凉快"的情况
转自:http://hi.baidu.com/iwishyou2/blog/item/552e162adab77f305243c116.html