Fork me on GitHub

浅析java并发包(一):原子类和锁

前言

jdk 1.5开始,新增了java.util.concurrent包(以下简称j.u.c),是一系列关于并发的工具类,是java并发编程必知必会的内容。今儿准备简单介绍一些常用的工具类。

原子类

java.util.concurrent.atomic包提供了一些专门为线程安全设计的java操作原子类。 见名知意,类名基本以 Atomic 开头,表示这些类都是线程安全的。

明明提供线程安全的锁,为什么还要提供这些基础的类呢?

这就必须说明,这些原子类与通过synchronized加锁实现的对象不同,以往通过synchronized等加锁的实现我们认为是一种悲观锁的体现, 即不论谁来操作,都假设最坏的情况,必须加锁独占,让其他需要操作的线程挂起等待锁释放。这种情况是具有比较大的开销的,线程抢占锁的花费的时间代价非常高。

所以这里就体现了原子类存在的强大意义。原子类的底层代码是利用了现代CPU的CAS指令来完成赋值操作的。 CAS是乐观锁技术,原称:Compare and Swap,比较并交换。操作之前会比较内存值V与预期值A是否一致,并且仅当V == A时,才会把V赋值为新值B。

举个例子,AtomicIntegerincrementAndGet操作:

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

其调用了sun.misc.Unsafe的方法,这里的compareAndSwapInt是一个本地方法,起调用了CPU的CAS指令。

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

java.util.concurrent.locks包下提供了很方方便加锁的工具。其下最知名的应该是ReentrantLock,但是提到ReentrantLock之前,最需要提的是AbstractQueuedSynchronizer

AQS

AbstractQueuedSynchronizer简称AQS,见名知意,抽象的基于队列的同步器,本质上是提供并定义了一套多线程访问共享资源的模板。 这个模板应用广泛,大部分锁的实现都基于此:

源码中的描述:

The wait queue is a variant of a “CLH” (Craig, Landin, and Hagersten) lock queue.

CLH是一种自旋锁,提供先来先服务的公平性,其基于链表,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

AQS提供了两种资源共享方式:Exclusive(独占) 和 Shared(共享)。

我们开发中自定义队列同步器很少,更多的实现都在jdk中。

ReentrantLock

ReentrantLock是开发中比较常用的显示锁。其性能在jdk1.5的时候是比synchronized关键字高出许多的,但是jdk1.6之后二者实际上不分伯仲。 但是ReentrantLock还是有其独特的特性:轮询锁、定时锁、可中断锁、公平锁等,并且使用更加灵活。

其基本使用方式:

Lock lock = new ReentrantLock();

...

lock.lock(); // 上锁
try {
    // 处理共享资源
} finally {
    lock.unlock(); // 解锁
} 

轮询锁与定时锁

轮询锁与定时锁是由tryLock方法实现的,与无条件的锁获取方式相比,它具有跟完善的错误回复机制。

轮询锁之转账问题

假设在银行系统把账户A的钱转给账户B,必须保证多线程并发安全,加锁是必不可少的,但是在获取多个锁的情况下,如果通过内置锁,很可能发生死锁问题。

解决方式:通过tryLock尝试同时获取多个锁,如果不能同时获取,就回退重试。

账户类,每个账户都持有一把锁:

@Data
@Builder
class Account {

    private BigDecimal balance; // 账户余额
    private String name; // 账户名称

    public Lock lock;

    void debit(BigDecimal amount) {
        balance = balance.subtract(amount);
    }

    void credit(BigDecimal amount) {
        balance = balance.add(amount);
    }

    @Override
    public String toString() {
        return new StringBuilder(name).append("的余额为:").append(balance).toString();
    }
}

使用tryLock尝试获取锁,进行两个账户的转账:

public class TransferService {

    public boolean transferAccount(Account fromAcct, Account toAcct, BigDecimal amount, long timeout) throws InterruptedException {
        long stopTime = System.currentTimeMillis() + timeout;

        while (true) {
            if (fromAcct.lock.tryLock()) {
                try {
                    if (toAcct.lock.tryLock()) {
                        try {
                            if (fromAcct.getBalance().compareTo(amount) < 0) {
                                throw new RuntimeException("转账账户余额不足");
                            } else {
                                fromAcct.debit(amount);
                                toAcct.credit(amount);
                                System.out.println("转账人:" + fromAcct.toString());
                                System.out.println("被转账人:" + toAcct.toString());
                                return true;
                            }
                        } finally {
                            toAcct.lock.unlock();
                        }
                    } else {
                        System.out.println("toAcct.lock.tryLock() false");
                    }
                } finally {
                    fromAcct.lock.unlock();
                }
            } else {
                System.out.println("fromAcct.lock.tryLock() false");
            }

            if (System.currentTimeMillis() > stopTime) {
                return false;
            }

            Thread.sleep(1000L);
        }
    }

    public static void main(String[] args) {
        Lock lock1 = new ReentrantLock();
        Lock lock2 = new ReentrantLock();
        final Account fromAcct = Account.builder().lock(lock1).name("老王").balance(new BigDecimal(1000)).build();
        final Account toAcct = Account.builder().lock(lock2).name("老李").balance(new BigDecimal(1000)).build();

        TransferService service = new TransferService();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    service.transferAccount(fromAcct, toAcct, new BigDecimal(50), 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 0; i < 10; i++) {
            new Thread(runnable).start();
        }

    }

}

如下运行结果,可以看到尝试获取锁多次。如果修改调用参数里的timeout 值,可能会出现转账失败的情况。

fromAcct.lock.tryLock() false
转账人:老王的余额为:950
被转账人:老李的余额为:1050
转账人:老王的余额为:900
fromAcct.lock.tryLock() false
被转账人:老李的余额为:1100
转账人:老王的余额为:850
被转账人:老李的余额为:1150
fromAcct.lock.tryLock() false
转账人:老王的余额为:800
被转账人:老李的余额为:1200
fromAcct.lock.tryLock() false
转账人:老王的余额为:750
被转账人:老李的余额为:1250
fromAcct.lock.tryLock() false
fromAcct.lock.tryLock() false
转账人:老王的余额为:700
被转账人:老李的余额为:1300
fromAcct.lock.tryLock() false
fromAcct.lock.tryLock() false
转账人:老王的余额为:650
被转账人:老李的余额为:1350
fromAcct.lock.tryLock() false
fromAcct.lock.tryLock() false
转账人:老王的余额为:600
被转账人:老李的余额为:1400
fromAcct.lock.tryLock() false
转账人:老王的余额为:550
被转账人:老李的余额为:1450
转账人:老王的余额为:500
被转账人:老李的余额为:1500
定时锁的使用

关键方法tryLock(long timeout, TimeUtil unit),即申请获取锁设置等待时间,在此等待时间内尝试获取锁,如果锁被其他线程占有,则返回false. 这种方式可以有效的避免死锁的发生。

public class Service {
    public ReentrantLock lock = new ReentrantLock();

    public void waitMethod() {
        try {
            if (lock.tryLock(3, TimeUnit.SECONDS)) {
                System.out.println(Thread.currentThread().getName() + "获得锁的时间:" + System.currentTimeMillis());
                Thread.sleep(10000);
            } else {
                System.out.println(Thread.currentThread().getName() + "没有获得锁");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

public class Run_tryLock_param {
    public static void main(String[] args) {
        final Service service = new Service();
        Runnable runnable = new Runnable() {
            public void run() {
                System.out.println(Thread.currentThread().getName() + " 调用waitMethod时间:" + System.currentTimeMillis());
                service.waitMethod();
            }
        };

        Thread threadA = new Thread(runnable);
        threadA.setName("A");
        threadA.start();

        Thread threadB = new Thread(runnable);
        threadB.setName("B");
        threadB.start();
    }
}

如上所示,运行结果:

A 调用waitMethod时间:1534690399458
B 调用waitMethod时间:1534690399458
A获得锁的时间:1534690399459
B没有获得锁

可中断锁

java的内置锁synchronized就是不可中断的锁, 其不可中断的阻塞机制使得实现可取消的任务变得复杂。

Lock是可中断锁,lockInterruptibly可以使得在获得锁的同时保持对中断的响应。

lockInterruptibly基本使用方式:

public void method() throws InterruptedException {
    lock.lockInterruptibly();
    try {  
     //do something
    }
    finally {
        lock.unlock();
    }  
}

读写锁

ReentrantReadWriteLock读写锁,它除了实现了Lock接口外,同时也实现了ReadWriteLock接口。

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

总所周知,ReentrantLock是排它锁,无论什么操作(read or write),其同一时间只能一个线程访问。但是实际应用中,读操作往往是占据最多的场景,那么绝大部分读取的场景能否采用共享锁呢?ReentrantReadWriteLock就能满足这一点,它允许读读共享,但是读写,写写是排他的操作。

应用场景,见java doc api提供了一个读写锁的生动的使用范例:

Here is a code sketch showing how to perform lock downgrading after updating a cache (exception handling is particularly tricky when handling multiple locks in a non-nested fashion): `java class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() { rwl.readLock().lock(); // 1.加读锁 if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock(); // 2.释放读锁,因为要加写锁,不支持锁升级,只能先解读锁再加写锁 rwl.writeLock().lock();// 3.加写锁 try { // Recheck state because another thread might have // acquired write lock and changed state before we did. if (!cacheValid) { // 见上面英文解释,简单的说避免重复写入数据 data = … cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock rwl.readLock().lock();// 4.锁降级,写锁变读锁 } finally { rwl.writeLock().unlock(); // Unlock write, still hold read 5.见英文,解写锁,仍然能读 } }

 try {
   use(data);
 } finally {
   rwl.readLock().unlock(); // 6.最终释放读锁
 }

} } ` 为什么要有锁降级?
因为锁降级的过程能避免在写锁释放,加读锁的过程中,此时读取的数据不会被其他线程竞争到写锁更新数据导致脏读问题。

总结

j.u.c是实现java高并发必知必会的重要内容。在很多web相关框架中都有体现,也是java程序猿进阶的必备技能之一,多学多用。

-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%