读书人

java concurrent包懂得

发布时间: 2012-11-15 15:16:13 作者: rapoo

java concurrent包理解

<!--[if !supportLists]-->1、? <!--[endif]-->简介

了解java并发之前先了解java内存模型,java内存有主内存和工作内存,比有个对象Person,有实例变量name,那么Person的实例中name属性就是在主内存中,如果多个线程同时操作Person,那么每个线程会有Person属性name的副本放在每个线程的工作内存中,每个工作内存修改后会同步到主内存中,但是这里就有问题:一致性问题和可见性问题,导致数据丢失或脏数据。

为了解决这个问题,引入了同步机制synchronized,是多个线程同时只有一个线程可以操作共享变量(主内存对象)

?

<!--[if !supportLists]-->2、? <!--[endif]-->在java5后sun引入了concurrent包的一些同步机制,要了解这个首先了解AbstractQueuedSynchronizer

?

<!--[if !supportLists]-->3、? <!--[endif]-->AbstractQueuedSynchronizer了解

实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架,此类的设计目标是成为依靠单个原子int 值来表示状态的大多数同步器的一个有用基础。子类必须定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制

以上是java API的描述,简单就是提供线程阻塞和同步的对象,子类需要实现tryAcquire,tryRelease,tryAcquireShared,tryReleaseShared,isHeldExclusively等方法

?

下面是常用方法介绍

public final void acquire(int?arg)

以独占模式获取对象,忽略中断。通过至少调用一次tryAcquire(int)来实现此方法,并在成功时返回。否则在成功之前,一直调用tryAcquire(int)将线程加入队列,线程可能重复被阻塞或不被阻塞。可以使用此方法来实现Lock.lock()方法。

参数:

arg - acquire 参数。此值被传送给 tryAcquire(int),但它是不间断的,并且可以表示任何内容。

?

protected boolean tryAcquire(int?arg)

试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。

此方法总是由执行acquire 的线程来调用。如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。可以用此方法来实现Lock.tryLock()方法。

默认实现将抛出 UnsupportedOperationException。

参数:

arg - acquire 参数。该值总是传递给acquire 方法的那个值,或者是因某个条件等待而保存在条目上的值。该值是不间断的,并且可以表示任何内容。

返回:

如果成功,则返回true。在成功的时候,此对象已经被获取。

抛出:

IllegalMonitorStateException - 如果正在进行的获取操作将在非法状态下放置此同步器。必须以一致的方式抛出此异常,以便同步正确运行。

UnsupportedOperationException - 如果不支持独占模式

?

public final boolean release(int?arg)

以独占模式释放对象。如果tryRelease(int)返回 true,则通过消除一个或多个线程的阻塞来实现此方法。可以使用此方法来实现Lock.unlock()方法

参数:

arg - release 参数。此值被传送给 tryRelease(int),但它是不间断的,并且可以表示任何内容。

返回:

从tryRelease(int)返回的值

?

protected boolean tryRelease(int?arg)

试图设置状态来反映独占模式下的一个释放。

此方法总是由正在执行释放的线程调用。

默认实现将抛出 UnsupportedOperationException。

参数:

arg - release 参数。该值总是传递给release 方法的那个值,或者是因某个条件等待而保存在条目上的当前状态值。该值是不间断的,并且可以表示任何内容。

返回:

如果此对象现在处于完全释放状态,从而使等待的线程都可以试图获得此对象,则返回true;否则返回false。

?

public final void acquireShared(int?arg)

以共享模式获取对象,忽略中断。通过至少先调用一次tryAcquireShared(int)来实现此方法,并在成功时返回。否则在成功之前,一直调用tryAcquireShared(int)将线程加入队列,线程可能重复被阻塞或不被阻塞。

参数:

arg - acquire 参数。此值被传送给 tryAcquireShared(int),但它是不间断的,并且可以表示任何内容。

?

?

public final boolean releaseShared(int?arg)

以共享模式释放对象。如果tryReleaseShared(int)返回 true,则通过消除一个或多个线程的阻塞来实现该方法。

参数:

arg - release 参数。此值被传送给 tryReleaseShared(int),但它是不间断的,并且可以表示任何内容。

protected int tryAcquireShared(int?arg)

试图在共享模式下获取对象状态。此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它。

此方法总是由执行acquire 线程来调用。如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。

默认实现将抛出 UnsupportedOperationException。

参数:

arg - acquire 参数。该值总是传递给acquire 方法的那个值,或者是因某个条件等待而保存在条目上的值。该值是不间断的,并且可以表示任何内容。

返回:

在失败时返回负值;如果共享模式下的获取成功但其后续共享模式下的获取不能成功,则返回 0;如果共享模式下的获取成功并且其后续共享模式下的获取可能够成功,则返回正值,在这种情况下,后续等待线程必须检查可用性。(对三种返回值的支持使得此方法可以在只是有时候以独占方式获取对象的上下文中使用。)在成功的时候,此对象已被获取。

抛出:

IllegalMonitorStateException - 如果正在进行的获取操作将在非法状态下放置此同步器。必须以一致的方式抛出此异常,以便同步正确运行。

UnsupportedOperationException - 如果不支持共享模式

<!--[if !supportLists]-->4、? <!--[endif]-->CountDownLatch

?

/**

?* 原理是内部维护一个大小i的信号量,

?* 使用await方法会一直等待直到信号量为0,

?* 使用countDown方法会时信号量-1,当信号量为0时await取消阻塞

?* @author jin.xiong

?*

?*/

public class CountDownLatch

{

??? private static final class Sync extends AbstractQueuedSynchronizer

??? {

?

??? /**

??? ?* 获取状态

??? ?* @return

??? ?*/

??????? int getCount()

??????? {

??????????? return getState();

??????? }

?

??????? /**

???????? * 当state为0是才可以获取锁,否则一直等待

???????? */

??????? public int tryAcquireShared(int i)

??????? {

??????????? return getState() != 0 ? -1 : 1;

??????? }

?

??????? /**

???????? * 把stage通过CAS减一

???????? */

??????? public boolean tryReleaseShared(int i)

??????? {

??????????? int j;

? ??????????int k;

??????????? do

??????????? {

??????????????? j = getState();

??????????????? if(j == 0)

??????????????????? return false;

??????????????? k = j - 1;

??????????? } while(!compareAndSetState(j, k));

??????????? return k == 0;

??????? }

?

??????? Sync(int i)

??????? {

??????????? setState(i);

??????? }

??? }

?

?

??? public CountDownLatch(int i)

??? {

??????? if(i < 0)

??????? {

??????????? throw new IllegalArgumentException("count < 0");

??????? } else

??????? {

??????????? sync = new Sync(i);

??????????? return;

??????? }

??? }

?

??? public void await()

??????? throws InterruptedException

??? {

??????? sync.acquireSharedInterruptibly(1);

??? }

?

??? public boolean await(long l, TimeUnittimeunit)

??????? throws InterruptedException

??? {

??????? return sync.tryAcquireSharedNanos(1,timeunit.toNanos(l));

??? }

?

??? public void countDown()

??? {

??????? sync.releaseShared(1);

??? }

?

??? public long getCount()

??? {

??????? return (long)sync.getCount();

??? }

?

??? public String toString()

??? {

??????? return (new StringBuilder()).append(super.toString()).append("[Count = ").append(sync.getCount()).append("]").toString();

??? }

???

???

??? public? static void main(String[] args) throws InterruptedException{

??? final CountDownLatchdown = new CountDownLatch(10);

??? System.out.println(down.getCount());

???

??? new Thread(){

??? ??? public void run(){

??? ??????? try {

????????????????? down.await();

????????????? } catch (InterruptedException e) {

????????????? }

??? ??????? System.out.println("CountDownLatch Stage 为0了?? " +down.getCount() );

??? ??? }

??? }.start();

??? for(int i=0;i<10;i++){

??? ??? Thread.sleep(1000);

??? ??? System.out.println(i);

??? ??? down.countDown();

??? }

??? System.out.println(down.getCount());

??? }

?

??? private final Sync sync;

}

?

?

/**

?* 计数信号量,原理内部维护一个大小为i的许可数量

?* acquire(s)方法把当前许可号-s

?* release(s)方法吧当前许可号+s

?* @author jin.xiong

?*

?*/

public class Semaphore

??? implements Serializable

{

??? /**

??? ?* 公平的Sync

??? ?* 有两个判断条件,如果当前的线程不在等待FIFO线程队列的首部,将继续等待,且stage必须大于0

??? ?* @author jin.xiong

??? ?*

??? ?*/

??? static final class FairSync extends Sync

??? {

?

??????? protected int tryAcquireShared(int i)

??????? {

??????????? Thread thread = Thread.currentThread();

??????????? int j;

??????????? int k;

??????????? do

??????????? {

??????????????? Thread thread1 =getFirstQueuedThread();

??????????????? if(thread1 != null && thread1 != thread)

??????????????????? return -1;

??????????????? j = getState();

??????????????? k = j - i;

??????????? } while(k >= 0 && !compareAndSetState(j,k));

??????????? return k;

??????? }

?

??????? private static final long serialVersionUID = 2014338818796000944L;

?

??????? FairSync(int i)

??????? {

??????????? super(i);

??????? }

??? }

?

??? /**

???? * 非公平的Sync

???? * @author jin.xiong

???? *

???? */

??? static final class NonfairSync extends Sync

??? {

?

??????? protected int tryAcquireShared(int i)

??????? {

??????????? return nonfairTryAcquireShared(i);

??????? }

?

??????? private static final long serialVersionUID = -2694183684443567898L;

?

??????? NonfairSync(int i)

??????? {

??????????? super(i);

??????? }

??? }

?

??? static abstract class Sync extends AbstractQueuedSynchronizer

??? {

?

??? /**

??? ?* 获取状态

??? ?* @return

??? ?*/

??????? final int getPermits()

??????? {

??????????? return getState();

??????? }

?

??????? /**

???????? * 把状态-i

???????? * @param i

???????? * @return

???????? */

??????? final int nonfairTryAcquireShared(int i)

??????? {

??????????? int j;

??????????? int k;

??????????? do

??????????? {

??????????????? j = getState();

??????????????? k = j - i;

??????????? } while(k >= 0 && !compareAndSetState(j,k));

??????????? return k;

??????? }

?

??????? /**

???????? * 把状态+i

???????? */

??????? protected final boolean tryReleaseShared(int i)

??????? {

??????????? int j;

??????????? do

??????????????? j = getState();

??????????? while(!compareAndSetState(j, j + i));

??????????? return true;

??????? }

?

??????? /**

???????? * 状态-i 和nonfairTryAcquireShared作用一样

???????? * @param i

???????? */

??????? final void reducePermits(int i)

??????? {

??????????? int j;

??????????? int k;

??????????? do

??????????? {

??????????????? j = getState();

??????????????? k = j - i;

??????????? } while(!compareAndSetState(j, k));

??????? }

?

??????? /**

???????? * 把状态设为0

???? ????* @return

???????? */

??????? final int drainPermits()

??????? {

??????????? int i;

??????????? do

??????????????? i = getState();

??????????? while(i != 0 && !compareAndSetState(i,0));

??????????? return i;

??????? }

?

??????? private static final long serialVersionUID = 1192457210091910933L;

?

??????? Sync(int i)

??????? {

??????????? setState(i);

??????? }

??? }

?

?

??? /**

???? * 初始化容量为i的信号量

???? * @param i

???? */

??? public Semaphore(int i)

??? {

??????? sync = new NonfairSync(i);

??? }

?

??? public Semaphore(int i, boolean flag)

??? {

??????? sync = ((Sync) (flag ? ((Sync) (new FairSync(i))) :((Sync) (new NonfairSync(i)))));

??? }

?

??? public void acquire()

??????? throws InterruptedException

??? {

??????? sync.acquireSharedInterruptibly(1);

??? }

?

??? public void acquireUninterruptibly()

??? {

??????? sync.acquireShared(1);

??? }

?

??? public boolean tryAcquire()

??? {

??????? return sync.nonfairTryAcquireShared(1) >= 0;

??? }

?

??? public boolean tryAcquire(long l, TimeUnittimeunit)

??????? throws InterruptedException

??? {

??????? return sync.tryAcquireSharedNanos(1,timeunit.toNanos(l));

??? }

?

??? public void release()

??? {

??????? sync.releaseShared(1);

??? }

?

??? public void acquire(int i)

??????? throws InterruptedException

??? {

??????? if(i < 0)

??????? {

??????????? throw new IllegalArgumentException();

??????? } else

??????? {

??????????? sync.acquireSharedInterruptibly(i);

??????????? return;

??????? }

??? }

?

??? public void acquireUninterruptibly(int i)

??? {

??????? if(i < 0)

??????? {

??????????? throw new IllegalArgumentException();

??????? } else

??????? {

??????????? sync.acquireShared(i);

??????????? return;

??????? }

??? }

?

??? public boolean tryAcquire(int i)

??? {

??????? if(i < 0)

??????????? throw new IllegalArgumentException();

??????? else

??????????? return sync.nonfairTryAcquireShared(i) >= 0;

??? }

?

??? /**

???? * 获取信号量,如果没有则等待时间l

???? * @param i

???? * @param l

???? * @param timeunit

???? * @return

???? * @throws InterruptedException

???? */

??? public boolean tryAcquire(int i, long l, TimeUnittimeunit)

??????? throws InterruptedException

??? {

??????? if(i < 0)

??????????? throw new IllegalArgumentException();

??????? else

??????????? return sync.tryAcquireSharedNanos(i,timeunit.toNanos(l));

??? }

?

?? /**

??? * 释放i个信号量

??? * @param i

??? */

??? public void release(int i)

??? {

??????? if(i < 0)

??????? {

??????????? throw new IllegalArgumentException();

??????? } else

??????? {

??????????? sync.releaseShared(i);

??????????? return;

??????? }

??? }

?

??? public int availablePermits()

??? {

??????? return sync.getPermits();

??? }

?

??? /**

???? * 释放所有信号量

???? * @return

???? */

??? public int drainPermits()

??? {

??????? return sync.drainPermits();

??? }

?

??? protected void reducePermits(int i)

??? {

??????? if(i < 0)

??????? {

??????????? throw new IllegalArgumentException();

??????? } else

??????? {

??????????? sync.reducePermits(i);

??????????? return;

??????? }

??? }

?

??? public boolean isFair()

??? {

??????? return sync instanceof FairSync;

??? }

?

??? public final boolean hasQueuedThreads()

??? {

??????? return sync.hasQueuedThreads();

??? }

?

??? public final int getQueueLength()

??? {

??????? return sync.getQueueLength();

??? }

?

??? protected CollectiongetQueuedThreads()

??? {

??????? return sync.getQueuedThreads();

??? }

?

??? public String toString()

??? {

??????? return (new StringBuilder()).append(super.toString()).append("[Permits = ").append(sync.getPermits()).append("]").toString();

??? }

?

??? private static final long serialVersionUID = -3222578661600680210L;

??? private final Sync sync;

}

?

?

读书人网 >编程

热点推荐