前言
jdk 1.5
开始,新增了java.util.concurrent
包(以下简称j.u.c
),是一系列关于并发的工具类,是java并发编程必知必会的内容。今儿准备简单介绍一些常用的工具类。
原子类
java.util.concurrent.atomic
包提供了一些专门为线程安全设计的java操作原子类。 见名知意,类名基本以 Atomic
开头,表示这些类都是线程安全的。
明明提供线程安全的锁,为什么还要提供这些基础的类呢?
这就必须说明,这些原子类与通过synchronized
加锁实现的对象不同,以往通过synchronized
等加锁的实现我们认为是一种悲观锁的体现, 即不论谁来操作,都假设最坏的情况,必须加锁独占,让其他需要操作的线程挂起等待锁释放。这种情况是具有比较大的开销的,线程抢占锁的花费的时间代价非常高。
所以这里就体现了原子类存在的强大意义。原子类的底层代码是利用了现代CPU的CAS
指令来完成赋值操作的。 CAS
是乐观锁技术,原称:Compare and Swap
,比较并交换。操作之前会比较内存值V与预期值A是否一致,并且仅当V == A时,才会把V赋值为新值B。
举个例子,AtomicInteger
的incrementAndGet
操作:
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
其调用了sun.misc.Unsafe
的方法,这里的compareAndSwapInt
是一个本地方法,起调用了CPU的CAS指令。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
锁
java.util.concurrent.locks
包下提供了很方方便加锁的工具。其下最知名的应该是ReentrantLock
,但是提到ReentrantLock
之前,最需要提的是AbstractQueuedSynchronizer
。
AQS
AbstractQueuedSynchronizer
简称AQS
,见名知意,抽象的基于队列的同步器,本质上是提供并定义了一套多线程访问共享资源的模板。 这个模板应用广泛,大部分锁的实现都基于此:
源码中的描述:
The wait queue is a variant of a “CLH” (Craig, Landin, and Hagersten) lock queue.
CLH
是一种自旋锁,提供先来先服务的公平性,其基于链表,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。
AQS
提供了两种资源共享方式:Exclusive
(独占) 和 Shared
(共享)。
我们开发中自定义队列同步器很少,更多的实现都在jdk中。
ReentrantLock
ReentrantLock
是开发中比较常用的显示锁。其性能在jdk1.5的时候是比synchronized
关键字高出许多的,但是jdk1.6之后二者实际上不分伯仲。 但是ReentrantLock
还是有其独特的特性:轮询锁、定时锁、可中断锁、公平锁等,并且使用更加灵活。
其基本使用方式:
Lock lock = new ReentrantLock();
...
lock.lock(); // 上锁
try {
// 处理共享资源
} finally {
lock.unlock(); // 解锁
}
轮询锁与定时锁
轮询锁与定时锁是由tryLock
方法实现的,与无条件的锁获取方式相比,它具有跟完善的错误回复机制。
轮询锁之转账问题
假设在银行系统把账户A的钱转给账户B,必须保证多线程并发安全,加锁是必不可少的,但是在获取多个锁的情况下,如果通过内置锁,很可能发生死锁问题。
解决方式:通过tryLock
尝试同时获取多个锁,如果不能同时获取,就回退重试。
账户类,每个账户都持有一把锁:
@Data
@Builder
class Account {
private BigDecimal balance; // 账户余额
private String name; // 账户名称
public Lock lock;
void debit(BigDecimal amount) {
balance = balance.subtract(amount);
}
void credit(BigDecimal amount) {
balance = balance.add(amount);
}
@Override
public String toString() {
return new StringBuilder(name).append("的余额为:").append(balance).toString();
}
}
使用tryLock
尝试获取锁,进行两个账户的转账:
public class TransferService {
public boolean transferAccount(Account fromAcct, Account toAcct, BigDecimal amount, long timeout) throws InterruptedException {
long stopTime = System.currentTimeMillis() + timeout;
while (true) {
if (fromAcct.lock.tryLock()) {
try {
if (toAcct.lock.tryLock()) {
try {
if (fromAcct.getBalance().compareTo(amount) < 0) {
throw new RuntimeException("转账账户余额不足");
} else {
fromAcct.debit(amount);
toAcct.credit(amount);
System.out.println("转账人:" + fromAcct.toString());
System.out.println("被转账人:" + toAcct.toString());
return true;
}
} finally {
toAcct.lock.unlock();
}
} else {
System.out.println("toAcct.lock.tryLock() false");
}
} finally {
fromAcct.lock.unlock();
}
} else {
System.out.println("fromAcct.lock.tryLock() false");
}
if (System.currentTimeMillis() > stopTime) {
return false;
}
Thread.sleep(1000L);
}
}
public static void main(String[] args) {
Lock lock1 = new ReentrantLock();
Lock lock2 = new ReentrantLock();
final Account fromAcct = Account.builder().lock(lock1).name("老王").balance(new BigDecimal(1000)).build();
final Account toAcct = Account.builder().lock(lock2).name("老李").balance(new BigDecimal(1000)).build();
TransferService service = new TransferService();
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
service.transferAccount(fromAcct, toAcct, new BigDecimal(50), 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10; i++) {
new Thread(runnable).start();
}
}
}
如下运行结果,可以看到尝试获取锁多次。如果修改调用参数里的timeout 值,可能会出现转账失败的情况。
fromAcct.lock.tryLock() false
转账人:老王的余额为:950
被转账人:老李的余额为:1050
转账人:老王的余额为:900
fromAcct.lock.tryLock() false
被转账人:老李的余额为:1100
转账人:老王的余额为:850
被转账人:老李的余额为:1150
fromAcct.lock.tryLock() false
转账人:老王的余额为:800
被转账人:老李的余额为:1200
fromAcct.lock.tryLock() false
转账人:老王的余额为:750
被转账人:老李的余额为:1250
fromAcct.lock.tryLock() false
fromAcct.lock.tryLock() false
转账人:老王的余额为:700
被转账人:老李的余额为:1300
fromAcct.lock.tryLock() false
fromAcct.lock.tryLock() false
转账人:老王的余额为:650
被转账人:老李的余额为:1350
fromAcct.lock.tryLock() false
fromAcct.lock.tryLock() false
转账人:老王的余额为:600
被转账人:老李的余额为:1400
fromAcct.lock.tryLock() false
转账人:老王的余额为:550
被转账人:老李的余额为:1450
转账人:老王的余额为:500
被转账人:老李的余额为:1500
定时锁的使用
关键方法tryLock(long timeout, TimeUtil unit)
,即申请获取锁设置等待时间,在此等待时间内尝试获取锁,如果锁被其他线程占有,则返回false. 这种方式可以有效的避免死锁的发生。
public class Service {
public ReentrantLock lock = new ReentrantLock();
public void waitMethod() {
try {
if (lock.tryLock(3, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + "获得锁的时间:" + System.currentTimeMillis());
Thread.sleep(10000);
} else {
System.out.println(Thread.currentThread().getName() + "没有获得锁");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
public class Run_tryLock_param {
public static void main(String[] args) {
final Service service = new Service();
Runnable runnable = new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + " 调用waitMethod时间:" + System.currentTimeMillis());
service.waitMethod();
}
};
Thread threadA = new Thread(runnable);
threadA.setName("A");
threadA.start();
Thread threadB = new Thread(runnable);
threadB.setName("B");
threadB.start();
}
}
如上所示,运行结果:
A 调用waitMethod时间:1534690399458
B 调用waitMethod时间:1534690399458
A获得锁的时间:1534690399459
B没有获得锁
可中断锁
java的内置锁synchronized
就是不可中断的锁, 其不可中断的阻塞机制使得实现可取消的任务变得复杂。
Lock
是可中断锁,lockInterruptibly
可以使得在获得锁的同时保持对中断的响应。
lockInterruptibly
基本使用方式:
public void method() throws InterruptedException {
lock.lockInterruptibly();
try {
//do something
}
finally {
lock.unlock();
}
}
读写锁
ReentrantReadWriteLock
读写锁,它除了实现了Lock
接口外,同时也实现了ReadWriteLock
接口。
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
总所周知,ReentrantLock
是排它锁,无论什么操作(read or write),其同一时间只能一个线程访问。但是实际应用中,读操作往往是占据最多的场景,那么绝大部分读取的场景能否采用共享锁呢?ReentrantReadWriteLock
就能满足这一点,它允许读读共享,但是读写,写写是排他的操作。
应用场景,见java doc api提供了一个读写锁的生动的使用范例:
Here is a code sketch showing how to perform lock downgrading after updating a cache (exception handling is particularly tricky when handling multiple locks in a non-nested fashion):
`
java class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() { rwl.readLock().lock(); // 1.加读锁 if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock(); // 2.释放读锁,因为要加写锁,不支持锁升级,只能先解读锁再加写锁 rwl.writeLock().lock();// 3.加写锁 try { // Recheck state because another thread might have // acquired write lock and changed state before we did. if (!cacheValid) { // 见上面英文解释,简单的说避免重复写入数据 data = … cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock rwl.readLock().lock();// 4.锁降级,写锁变读锁 } finally { rwl.writeLock().unlock(); // Unlock write, still hold read 5.见英文,解写锁,仍然能读 } }
try {
use(data);
} finally {
rwl.readLock().unlock(); // 6.最终释放读锁
}
} } `
为什么要有锁降级?
因为锁降级的过程能避免在写锁释放,加读锁的过程中,此时读取的数据不会被其他线程竞争到写锁更新数据导致脏读问题。
总结
j.u.c
是实现java高并发必知必会的重要内容。在很多web相关框架中都有体现,也是java程序猿进阶的必备技能之一,多学多用。