JUC——多线程同步控制工具

一、重入锁 ReentrantLock

  • 重入锁可以完全替代关键字 synchronized。在 JDK 1.5 之前,重入锁的性能远高于关键字 synchronized;JDK 1.6 开始,两者的性能差距并不大
  • 重入锁有着显式的操作过程,对逻辑控制有更强的灵活性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ReenterLock implements Runnable {

static ReentrantLock lock = new ReentrantLock();
static int i = 0;

@Override
public void run() {
for (int j = 0; j < 100000; j++) {
lock.lock(); // 如果锁被占用则等待
try {
i++;
} finally {
lock.unlock();
}
}
}

public static void main(String[] args) throws InterruptedException {
ReenterLock instance = new ReenterLock();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i); // 200000
}
}
  • 重入锁可以在一个线程反复进入
1
2
3
4
5
6
7
8
9
lock.lock();
lock.lock();
try {
i++;
} finally {
lock.unlock();
lock.unlock();
// lock.unlock(); // java.lang.IllegalMonitorStateException
}
  • 重入锁可以中断响应:等待锁的过程中,程序可以根据需要取消对锁的请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class IntLock implements Runnable {

public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;

public IntLock(int lock) {
this.lock = lock;
}

@Override
public void run() {
try {
// 产生一个死锁
if (lock == 1) {
lock1.lockInterruptibly(); // 等待锁时可以被中断
ThreadUtil.sleep(500);
lock2.lockInterruptibly();
} else {
lock2.lockInterruptibly();
ThreadUtil.sleep(500);
lock1.lockInterruptibly();
}
} catch (InterruptedException e) {
System.out.println(lock + " occur " + e);
} finally {
if (lock1.isHeldByCurrentThread()) {
lock1.unlock();
}
if (lock2.isHeldByCurrentThread()) {
lock2.unlock();
}
System.out.println(lock + " thread exit");
}
}

public static void main(String[] args) {
Thread t1 = new Thread(new IntLock(1));
Thread t2 = new Thread(new IntLock(2));
t1.start();
t2.start();
ThreadUtil.sleep(3000);
t2.interrupt(); // t2放弃对lock1的申请,同时释放lock2,抛出异常
}
}
/*
2 occur java.lang.InterruptedException
2 thread exit
1 thread exit
*/
  • 锁申请等待限时:给定一个等待时间,让线程自动放弃请求锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class TimeLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock();

@Override
public void run() {
try {
String name = Thread.currentThread().getName();
if (lock.tryLock(2, TimeUnit.SECONDS)) { // 超时没有获得锁就返回false
System.out.println(name + " get lock success, " + System.currentTimeMillis());
ThreadUtil.sleep(4000);
} else {
System.out.println(name + " get lock failed, " + System.currentTimeMillis());
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}

public static void main(String[] args) {
TimeLock instance = new TimeLock();
Thread t1 = new Thread(instance, "t1");
Thread t2 = new Thread(instance, "t2");
t1.start();
ThreadUtil.sleep(1000);
t2.start();
}
}
/*
t1 get lock success, 1689251605357
t2 get lock failed, 1689251608357
*/
  • tryLock() 可以不带参数运行:当前线程会尝试获得锁,如果锁未被其他线程占用,则申请成功,并立即返回 true,否则立即返回 false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class TryLock implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();

@Override
public void run() {
String name = Thread.currentThread().getName();
if ("t1".equals(name)) {
while (true) {
if (lock1.tryLock()) {
try {
ThreadUtil.sleep(500);
if (lock2.tryLock()) {
System.out.println(name + " done");
lock2.unlock();
return;
}
} finally {
lock1.unlock();
}
}
}
} else {
while (true) {
if (lock2.tryLock()) {
try {
ThreadUtil.sleep(500);
if (lock1.tryLock()) {
System.out.println(name + " done");
lock1.unlock();
return;
}
} finally {
lock2.unlock();
}
}
}
}
}

public static void main(String[] args) {
TryLock instance = new TryLock();
Thread t1 = new Thread(instance, "t1");
Thread t2 = new Thread(instance, "t2");
t1.start();
t2.start();
}
}
/*
t2 done
t1 done
*/
  • 大多数情况下,锁的申请都是非公平的:线程 A 和 B 同时申请锁,系统会从这个锁的等待队列中随机挑选一个
  • 根据系统的调度,一个线程会倾向于再次获取己经持有的锁,这种分配方式是高效的,但是无公平性可言
  • 公平锁会按照申请时间的先后顺序,依次赋予锁,保证了不会产生饥饿现象
  • 要实现公平锁必然要求系统维护一个有序队列,因此公平锁的实现成本比较高,性能也非常低下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class FairLock implements Runnable {
public static ReentrantLock fairLock = new ReentrantLock(true);

@Override
public void run() {
while (true) {
try {
fairLock.lock();
System.out.println(Thread.currentThread().getName() + " get lock");
} finally {
fairLock.unlock();
}
}
}

public static void main(String[] args) {
FairLock instance = new FairLock();
Thread t1 = new Thread(instance, "t1");
Thread t2 = new Thread(instance, "t2");
t1.start();
t2.start();
}
}
/*
t1 get lock
t2 get lock
t1 get lock
t2 get lock
*/
  • 在重入锁的实现中,主要包含三个要素:
  • 原子状态:原子状态使用 CAS 操作来存储当前锁的状态,判断锁是否己经被别的线程持有了
  • 等待队列:所有没有请求到锁的线程,会进入等待队列进行等待。待有线程释放锁后,系统就能从等待队列中唤醒一个线程,继续工作
  • 阻塞原语park()unpark(),用来挂起和恢复线程。没有得到锁的线程将会被挂起

二、重入锁的等待 Condition

  • Condition 对象与 wait()notify() 方法的作用大致相同,可以让线程在合适的时间等待,或在某一个特定的时刻得到通知,它与重入锁相关联
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ReenterLockCondition implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
// 生成一个与当前重入锁绑定的Condition实例
public static Condition condition = lock.newCondition();

@Override
public void run() {
try {
lock.lock();
condition.await(); // 线程等待,释放lock
System.out.println("Thread running");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
Thread t1 = new Thread(new ReenterLockCondition());
t1.start();
ThreadUtil.sleep(2000);
// 通知线程继续执行
lock.lock();
condition.signal();
lock.unlock();
}
}
  • Condition 接口提供的基本方法如下:
1
2
3
4
5
6
7
8
9
10
11
// 使当前线程等待,同时释放当前锁。可被中断。使用前当前线程需获取锁
// 当其他线程中使用signal()或signalAll()时,线程尝试重新获得锁并继续执行
void await() throws InterruptedException;
// 不会在等待过程中响应中断
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
// 从当前对象的等待队列中随机唤醒一个线程。使用前当前线程需获取锁
void signal();
void signalAll();
  • JDK 内部,重入锁和 Condition 对象被广泛使用,如 ArrayBlockingQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁
try {
while (count == items.length) // 当前队列已满
notFull.await(); // 等待队列有足够空间
enqueue(e); // 被通知时,插入元素
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁
try {
while (count == 0) // 当前队列为空
notEmpty.await(); // 等待队列有数据
return dequeue(); // 被通知时,返回元素
} finally {
lock.unlock();
}
}

三、信号量 Semaphore

  • 信号量是对锁的扩展:无论是 synchronized 还是 ReentrantLock,一次只允许一个线程访问一个资源,而信号量可以指定多个线程同时访问某一个资源
1
2
3
4
5
6
7
8
9
10
// 信号量的准入数, 是否公平
public Semaphore(int permits) {...}
public Semaphore(int permits, boolean fair) {...}

// 一个线程每次只能申请一个许可
public void acquire() throws InterruptedException {...} // 获得准入许可,没有时等待
public void acquireUninterruptibly() {...} // 不响应中断
public boolean tryAcquire() {...} // 获得准入许可,直接返回不等待
public boolean tryAcquire(long timeout, TimeUnit unit) {...}
public void release() {...} // 释放许可
  • 案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SemapTest implements Runnable {

Semaphore sem = new Semaphore(5);

@Override
public void run() {
try {
sem.acquire();
System.out.println(Thread.currentThread().getId() + " in");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + " done");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
sem.release();
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(20);
final SemapTest instance = new SemapTest();
for (int i = 0; i < 20; i++) {
exec.submit(instance);
}
}
}
/*
21 in
24 in
22 in
23 in
20 in
21 done
24 done
20 done
23 done
22 done
*/

四、读写锁 ReadWriteLock

  • ReadWriteLock 是 JDK 5 提供的读写分离锁,可以有效地减少锁竞争
  • 读写锁允许多个线程同时读,但写写操作和读写操作间依然是需要相互等待和持有锁的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ReadWriteLockTest {
static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
static int value;

static void handleRead(Lock lock) {
try {
lock.lock();
ThreadUtil.sleep(1000);
System.out.println(Thread.currentThread().getName() + " read at " +
System.currentTimeMillis() + ", value: " + value);
} finally {
lock.unlock();
}
}

static void handleWrite(Lock lock, int newValue) {
try {
lock.lock();
ThreadUtil.sleep(1000);
value = newValue;
System.out.println(Thread.currentThread().getName() + " write at " +
System.currentTimeMillis() + ", value: " + value);
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
Runnable readRunnable = () -> handleRead(readLock);
Runnable writeRunnable = () -> handleWrite(writeLock, RandomUtil.randomInt());

for (int i = 0; i < 10; i++) {
new Thread(readRunnable).start();
}
for (int i = 0; i < 2; i++) {
new Thread(writeRunnable).start();
}
}
}
/*
Thread-2 read at 1689596379046, value: 0
Thread-6 read at 1689596379046, value: 0
Thread-1 read at 1689596379046, value: 0
Thread-3 read at 1689596379046, value: 0
Thread-9 read at 1689596379046, value: 0
Thread-8 read at 1689596379046, value: 0
Thread-5 read at 1689596379046, value: 0
Thread-4 read at 1689596379046, value: 0
Thread-7 read at 1689596379046, value: 0
Thread-0 read at 1689596379046, value: 0
Thread-11 write at 1689596380052, value: -1587147727
Thread-10 write at 1689596381054, value: 658881926
*/

五、倒计数器 CountDownLatch

  • 倒计数器 CountDownLatch 通常用来控制线程等待,它可以让某一个线程等待直到倒计数结束,再开始执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CountDownLatchTest implements Runnable {
static CountDownLatch latch = new CountDownLatch(10); // 计数器的计时个数

@Override
public void run() {
ThreadUtil.sleep(RandomUtil.randomInt(10) * 1000L);
latch.countDown(); // 倒计数器减1
System.out.println(Thread.currentThread().getName() + " done at " + System.currentTimeMillis());
}

public static void main(String[] args) throws InterruptedException {
CountDownLatchTest instance = new CountDownLatchTest();;

ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
exec.submit(instance);
}
latch.await(); // 等待计数器的个数为0,才执行后续代码
System.out.println("Main done at " + System.currentTimeMillis());
exec.shutdown();
}
}
/*
pool-1-thread-6 done at 1689597337020
pool-1-thread-9 done at 1689597338020
pool-1-thread-3 done at 1689597339020
pool-1-thread-7 done at 1689597339020
pool-1-thread-2 done at 1689597340020
pool-1-thread-4 done at 1689597341020
pool-1-thread-10 done at 1689597344020
pool-1-thread-8 done at 1689597345020
pool-1-thread-1 done at 1689597345020
pool-1-thread-5 done at 1689597345020
Main done at 1689597345020
*/

六、循环栅栏 CyclicBarrier

  • 循环栅栏 CyclicBarrier 通常用来阻止线程继续执行,让线程在栅栏外等待,达到一定数量后再执行。其计数器可以反复使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class CyclicBarrierTest implements Runnable {
// 计数器个数,计数器一次计数完成后执行的操作
static CyclicBarrier barrier = new CyclicBarrier(10, () -> {
System.out.println("in barrier");
});

@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " in at " + System.currentTimeMillis());
barrier.await();

ThreadUtil.sleep(RandomUtil.randomInt(5) * 1000L);
System.out.println(Thread.currentThread().getName() + " done at " + System.currentTimeMillis());
barrier.await();
} catch (InterruptedException e) {
// 等待时线程被中断
e.printStackTrace();
} catch (BrokenBarrierException e) {
// 栅栏破损,系统无法等到所有线程到齐
e.printStackTrace();
}
}

public static void main(String[] args) {
CyclicBarrierTest instance = new CyclicBarrierTest();
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(instance);
threads[i].start();
//if (i == 5) {
// // 此时会得到一个InterruptedException和9个BrokenBarrierException
// threads[0].interrupt();
//}
}
}
}
/*
Thread-0 in at 1689599996399
Thread-4 in at 1689599996399
Thread-3 in at 1689599996399
Thread-2 in at 1689599996399
Thread-1 in at 1689599996399
Thread-6 in at 1689599996399
Thread-5 in at 1689599996399
Thread-8 in at 1689599996399
Thread-7 in at 1689599996399
Thread-9 in at 1689599996399
in barrier
Thread-0 done at 1689599996420
Thread-6 done at 1689599997420
Thread-2 done at 1689599998420
Thread-4 done at 1689599998420
Thread-9 done at 1689599998420
Thread-7 done at 1689599998420
Thread-1 done at 1689599999420
Thread-5 done at 1689599999420
Thread-3 done at 1689600000420
Thread-8 done at 1689600000420
in barrier
*/

七、线程阻塞工具 LockSupport

  • LockSupport 可以在线程内任意位置让线程阻塞。与 Thread.suspend() 方法相比,它弥补了 resume() 方法导致线程无法继续执行的情况;和 Object.wait() 方法相比,它不需要先获得某个对象的锁,也不会抛出 InterruptedException 异常
  • LockSupport 有 parkNanos()parkUntil() 等方法,可以实现限时的等待
  • LockSupport 内部使用类似信号量的机制,它为每个线程准备了一个许可,默认不可用。如果许可可用,park() 方法会立即返回,并消费这个许可;如果许可不可用,就会阻塞;unpark() 方法会使这个许可变为可用(和信号量不同,只能拥有一个许可,不能累加)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class LockSupportTest {
static Object obj = new Object();

static class ChangeObjThread extends Thread {
public ChangeObjThread(String name) {
super.setName(name);
}

@Override
public void run() {
synchronized (obj) {
System.out.println(getName() + " in at " + System.currentTimeMillis());
LockSupport.park(); // 阻塞当前线程
System.out.println(getName() + " out at " + System.currentTimeMillis());
}
}
}

public static void main(String[] args) throws InterruptedException {
Thread t1 = new ChangeObjThread("t1");
Thread t2 = new ChangeObjThread("t2");
t1.start();
Thread.sleep(1000);
t2.start();
LockSupport.unpark(t1);
LockSupport.unpark(t2); // unpark发生在park之前,导致park立即返回
//t2.start(); // 线程永久挂起
t1.join();
t2.join();
}
}
/*
t1 in at 1689681084053
t1 out at 1689681085053
t2 in at 1689681085053
t2 out at 1689681085053
*/
  • park() 方法挂起的线程不会像 suspend() 方法那样显示 RUNNABLE 状态
  • 如果使用 park(Object) 方法,可以为当前线程设置一个阻塞对象,该对象也会出现在 Dump 中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// park();
"main" #1 prio=5 os_prio=0 tid=0x0000000002f72800 nid=0x1864 waiting on condition [0x0000000002a0f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
at com.lb.juc.t02.Main.main(Main.java:13)

// park(new Main());
"main" #1 prio=5 os_prio=0 tid=0x0000000002a12800 nid=0x5128 waiting on condition [0x000000000290f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006eb84f298> (a com.lb.juc.t02.Main)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at com.lb.juc.t02.Main.main(Main.java:13)
  • park() 方法支持中断,但不会抛出 InterruptedException 异常,而是直接返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class LockSupportTest2 {
static Object obj = new Object();

static class ChangeObjThread extends Thread {
public ChangeObjThread(String name) {
super.setName(name);
}

@Override
public void run() {
synchronized (obj) {
System.out.println(getName() + " in");
LockSupport.park(); // 阻塞当前线程
if (Thread.interrupted()){
System.out.println(getName() + " interrupted");
}
System.out.println(getName() + " done");
}
}
}

public static void main(String[] args) throws InterruptedException {
Thread t1 = new ChangeObjThread("t1");
Thread t2 = new ChangeObjThread("t2");
t1.start();
Thread.sleep(1000);
t2.start();
t1.interrupt();
LockSupport.unpark(t2);
}
}
/*
t1 in
t1 interrupted
t1 done
t2 in
t2 done
*/

八、RateLimiter 限流

  • Guava 是 Google 下的一个核心库,提供了一大批设计精良、使用方便的工具类
  • 任何应用和模块组件都有一定的访问速率上限,如果请求速率突破了这个上限,不但多余的请求无法处理,甚至会压垮系统使所有的请求均无法有效处理。因此,对请求进行限流是非常必要的。RateLimiter 正是这么一款限流工具
  • 一种简单的限流算法就是给出一个单位时间,然后使用一个计数器 counter 统计单位时间内收到的请求数量,当请求数量超过上限时,余下的请求丢弃或者等待。但这种算法很难控制边界时间上的请求

bad-limiter

  • 常用的限流算法有两种:漏桶算法和令牌桶算法
  • 漏桶算法利用一个缓存区,当有请求进入系统时,无论请求的速率如何,都先在缓存区内保存,然后以固定的流速流出缓存区进行处理
  • 漏桶算法的特点:无论外部请求压力如何,漏桶算法总是以固定的流速处理数据。漏桶的容积和流出速率是该算法的两个重要参数

leaky-bucket

  • 令牌桶算法是一种反向的漏桶算法,桶中存放的不再是请求,而是令牌。处理程序只有拿到令牌后,才能对请求进行处理。如果没有令牌,那么处理程序要么丢弃请求,要么等待可用的令牌
  • 为了限制流速,该算法会在每个单位时间产生一定量的令牌存入桶中(不会超过桶的容量上限)

token-bucket

  • RateLimiter 采用了令牌桶算法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class RateLimiterTest {
static RateLimiter limiter = RateLimiter.create(2); // 每秒处理两个请求(500ms产生一个令牌)

public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
//limiter.acquire(); // 过剩的流量会在此等待,直到有机会执行
if (!limiter.tryAcquire()) { // 过剩的流量直接丢弃(没请求到令牌直接放回false,不阻塞)
continue;
}
System.out.println(i + " " + System.currentTimeMillis());
}
}
}
/*
0 1689682821586
1 1689682822050
2 1689682822549
3 1689682823061
4 1689682823549
5 1689682824048
*/