Fork me on GitHub

浅析java并发包(四):闭锁与栅栏

前言

闭锁(CountDownlatch)和栅栏(CyclicBarrier)功能看起来相似,都是同步工具类, 看起来都是可以阻塞一个操作,等待其依赖的一组操作完成之后再执行。但是害死有所区别。

闭锁

闭锁(CountDownlatch):允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。

注意这里有两种线程,一种是等待的线程,一种是执行线程。

它的应用场景比如:

  • 确保某个计算在其需要的所有资源都初始化后继续执行。即,这个计算依赖于资源初始化,必须等待所有依赖资源初始化后执行。
  • 确保某个服务在其依赖的所有其他服务启动之后才能启动。
  • 等待直到某个操作的所有参与者都就绪后再继续执行。

闭锁示例

举例说明,比如有三个玩家,游戏开始必须在三个玩家就绪后才能开始。 这里用两个类来模拟这种情况,分别是玩家(Player)和游戏(Game).

玩家:

public class Player implements Runnable {

    private CountDownLatch downLatch;
    private String name;

    public Player(CountDownLatch downLatch, String name) {
        this.downLatch = downLatch;
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println("玩家:" + name + "正来赶来游戏场地的路上!");
        try {
            Thread.sleep(new Random().nextInt(10000));
        } catch (InterruptedException ie) {
        }
        System.out.println("玩家:" + name + "已准备就绪!");
        this.downLatch.countDown();

    }

}

游戏:

public class Game implements Runnable {
    private CountDownLatch downLatch;

    public Game(CountDownLatch downLatch) {
        this.downLatch = downLatch;
    }

    @Override
    public void run() {
        System.out.println("游戏尚未开始,正在等待玩家就绪...");
        try {
            this.downLatch.await();
        } catch (InterruptedException ie) {
        }
        System.out.println("所有玩家已就绪,游戏开始");
        this.downLatch.countDown();

    }


}

运行测试程序:

public static void main(String[] args) {
    ExecutorService executor = Executors.newCachedThreadPool();
    CountDownLatch latch = new CountDownLatch(3);

    Player p1 = new Player(latch, "A");
    Player p2 = new Player(latch, "B");
    Player p3 = new Player(latch, "C");
    Game game = new Game(latch);

    executor.execute(p1);
    executor.execute(p2);
    executor.execute(p3);
    executor.execute(game);
    executor.shutdown();
}

结果:

玩家:A正来赶来游戏场地的路上!
玩家:C正来赶来游戏场地的路上!
游戏尚未开始,正在等待玩家就绪...
玩家:B正来赶来游戏场地的路上!
玩家:C已准备就绪!
玩家:A已准备就绪!
玩家:B已准备就绪!
所有玩家已就绪,游戏开始

可见,Game线程和所有Player都持有同一把闭锁,Game线程等待所有Player闭锁释放后才能继续执行,否则则一直等待。

关键方法浅析

原理分析:CountDownLatch里初始化会存有一个正数计算器,每次做countDown()操作时会把计算器减1,await()需要等待 计数器为0时才能释放,否则一直阻塞。

构造器给定一个初始化的计数值:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync是闭锁内部类,继承了AQS,并重写了tryAcquireShared(int acquires)tryReleaseShared(int releases)方法, 可见其采用共享锁实现。

await()内部使用AQSacquireSharedInterruptibly(int arg)

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

AQS中的acquireSharedInterruptibly(int arg)

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

关键点在这个tryAcquireShared方法中,闭锁内部类Sync重写了这个方法:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

这里的getState就是计数器的个数,当个数为0时可以获取到共享锁。

再看countDown(),其目的是减少计数器个数。

public void countDown() {
    sync.releaseShared(1);
}

其内部也是调用AQSreleaseShared(int arg)方法来释放共享锁同步状态:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

这里的tryReleaseShared是内部类Sync重写的:

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

可见此处计算器进行了减1操作,知道状态为0时释放锁。

栅栏

栅栏(CyclicBarrier)类似于闭锁,但其还是不同。

api中介绍:

允许一组线程全部等待彼此达到共同屏障点的同步辅助。 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。

再引用java并发编程实战里的解释:

栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。 栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

所以关注点在于”栅栏”(共同屏障点)这个位置,这一组线程必须都到达栅栏以后才可以都继续执行。

栅栏示例

比如三个人共同商议6点钟去麦当劳碰头,等到三个人都到达以后,一起商讨去干什么。 这里用栅栏实现,只需要一个Person类:

public class Persion implements Runnable {

    private CyclicBarrier cyclicBarrier;
    private String name;

    public Persion(CyclicBarrier cyclicBarrier, String name) {
        this.name = name;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        System.out.println(name + "正来赶来麦当劳的路上...");
        try {
            Thread.sleep(new Random().nextInt(10000));
            System.out.println(name + "到达麦当劳...");
            cyclicBarrier.await();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name + "说:大家都到齐了,开始商量干什么吧!");
    }

}

运行程序:

public static void main(String[] args) {
    ExecutorService executor = Executors.newCachedThreadPool();
    CyclicBarrier barrier = new CyclicBarrier(3);

    Persion p1 = new Persion(barrier, "A");
    Persion p2 = new Persion(barrier, "B");
    Persion p3 = new Persion(barrier, "C");

    executor.execute(p1);
    executor.execute(p2);
    executor.execute(p3);
    executor.shutdown();
}

运行结果:

A正来赶来麦当劳的路上...
C正来赶来麦当劳的路上...
B正来赶来麦当劳的路上...
B到达麦当劳...
A到达麦当劳...
C到达麦当劳...
C说:大家都到齐了,开始商量干什么吧!
B说:大家都到齐了,开始商量干什么吧!
A说:大家都到齐了,开始商量干什么吧!

可见,所有线程都等到了其栅栏点才可以继续执行。

可能有时候需要我们的执行线程希望做到类似闭锁的情况,即到达栅栏点时,只执行一次处理。 CyclicBarrier有个这么个构造器(public CyclicBarrier(int parties, Runnable barrierAction))可以实现:

修改上面的Persion类:

public class Persion implements Runnable {

    private CyclicBarrier cyclicBarrier;
    private String name;

    public Persion(CyclicBarrier cyclicBarrier, String name) {
        this.name = name;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        System.out.println(name + "正来赶来麦当劳的路上...");
        try {
            Thread.sleep(new Random().nextInt(10000));
            System.out.println(name + "到达麦当劳...");
            cyclicBarrier.await();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

执行程序:

public static void main(String[] args) {
    ExecutorService executor = Executors.newCachedThreadPool();
    CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
        @Override
        public void run() {
            System.out.println("大家都到齐了,开始商量干什么吧!");
        }
    });

    Persion p1 = new Persion(barrier, "A");
    Persion p2 = new Persion(barrier, "B");
    Persion p3 = new Persion(barrier, "C");

    executor.execute(p1);
    executor.execute(p2);
    executor.execute(p3);
    executor.shutdown();
}

运行结果:

A正来赶来麦当劳的路上...
C正来赶来麦当劳的路上...
B正来赶来麦当劳的路上...
C到达麦当劳...
A到达麦当劳...
B到达麦当劳...
大家都到齐了,开始商量干什么吧!

关键方法浅析

构造器有两个:

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

parties是拦截的线程数,当拦截的线程数达到这个值之前,线程会一直等待。 barrierAction是当拦截的线程数达到parties时,由最后一个进入的线程执行此操作。

await()方法,所有的参与者在此等待:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);//不超时等待
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // 每次进度的线程,此count值减1,减到0时,触发barrierCommand任务
        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration(); // 此处唤醒所有等待线程
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

简单的说,dowait方法使得最后一个到达的线程到达之后,index == 0,执行Runnable任务,唤醒所有等待的线程。

总结

闭锁和栅栏是实现并发同步操作的两把利器,有所相似又各有不同,抓住其原理关键点就很好理解了。

-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%