JUC——线程基础

一、线程基础

  • 进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体
  • 线程就是轻量级进程,是程序执行的最小单位。使用多线程而不是用多进程去进行并发程序的设计,是因为线程间的切换和调度的成本远远小于进程

status

二、线程的基本操作

1. 新建线程(new)

  • 匿名类
  • 继承 Thread
  • 实现 Runnable
1
2
3
4
5
6
7
8
Thread t1 = new Thread() {
@Override
public void run() {
System.out.println("hi");
}
};
t1.start(); // 会新建一个线程并让这个线程执行run()方法
t1.run(); // 在当前线程中串行执行
  • 底层代码:Thread.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Thread implements Runnable {

private Runnable target;

public Thread(Runnable target) {
// ...
}

public void run() {
if (target != null) {
target.run();
}
}
}

2. 终止线程(stop

  • 一般来说,线程执行完毕就会结束,无须手工关闭。但有些后台线程可能会常驻系统,用以提供某些服务
  • stop() 停止线程的缺点:已废弃,过于暴力,强行把执行到一半的线程终止,并立即释放这个线程持有的锁,可能引起数据不一致的问题
  • stop() 案例:写线程写到一半时强行终止,锁被释放,另一个线程读到的数据就可能有问题

stop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class StopThreadUnsafe {

private static final User user = new User();

public static void main(String[] args) {
new ReadUserThread().start();
while (true) {
WriteUserThread writeUserThread = new WriteUserThread();
writeUserThread.start();
ThreadUtil.sleep(150);
writeUserThread.stop();
}

}

@Data
@ToString
static class User {
int id = 0;
String name = "0";
}

static class WriteUserThread extends Thread {
@Override
public void run() {
while (true) {
synchronized (user) {
int id = (int) (System.currentTimeMillis() / 1000);
user.setId(id);
ThreadUtil.sleep(100);
user.setName(String.valueOf(id));
}
Thread.yield(); // 当前线程主动放弃CPU时间片
}
}
}

static class ReadUserThread extends Thread {
@Override
public void run() {
while (true) {
synchronized (user) {
if (user.getId() != Integer.parseInt(user.getName())) {
System.out.println(user);
}
}
Thread.yield();
}
}
}
}
// StopThreadUnsafe.User(id=1688897205, name=1688897204)
// StopThreadUnsafe.User(id=1688897210, name=1688897209)
  • 建议方法:自行决定线程何时退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static class WriteUserThread extends Thread {
volatile boolean running = true;

public void stopMe() {
running = false;
}

@Override
public void run() {
while (true) {
if (!running) {
System.out.println("exit by stopMe()");
break;
}
synchronized (user) {
int id = (int) (System.currentTimeMillis() / 1000);
user.setId(id);
ThreadUtil.sleep(100);
user.setName(String.valueOf(id));
}
Thread.yield(); // 当前线程主动放弃CPU时间片
}
}
}

3. 线程中断(interrupt)

  • 线程中断并不会使线程立即退出,而是给线程发送一个通知,至于目标线程接到通知后如何处理,则完全由目标线程自行决定
  • 线程中断相关的方法:
1
2
3
public void Thread.interrupt();  // 通知目标线程中断,并设置中断标志位
public boolean Thread.isInterrupted(); // 判断线程是否中断,检查中断标志位
public static boolean Thread.interrupted(); // 判断线程是否中断,并清除当前中断标志位状态
  • 中断案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread() {
@Override
public void run() {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("thread interrupted");
break;
}
Thread.yield();
}
}
};

t1.start();
Thread.sleep(2000);
t1.interrupt();
}
  • Thread.sleep() 会让当前线程休眠若干时间,它会抛出 InterruptedException 中断异常,它不是运行时异常,因此程序必须捕获并处理
  • 中断 sleep 案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread() {
@Override
public void run() {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("thread interrupted");
break;
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// 此时会清除中断标记位
System.out.println("thread interrupted when sleep");
// 重新设置中断状态:可能需要后续处理来保证数据一致性和完整性,下次循环被中断
Thread.currentThread().interrupt();
}

Thread.yield();
}
}
};

t1.start();
Thread.sleep(2000);
t1.interrupt();
}
// thread interrupted when sleep
// thread interrupted

4. 等待和通知(wait/notity)

  • 任何对象都可以使用这两个方法:
1
2
3
public final void wait() throws InterruptedException {...}
public final native void wait(long timeout) throws InterruptedException;
public final native void notify();
  • 如果线程 A 调用了 obj.wait() 方法,线程 A 就会停止继续执行,转为等待状态。线程 A 会进入 obj 对象的等待队列,等待队列中可能有多个线程
  • 当调用了 obj.notify() 方法,就会从等待队列中随机选择一个线程,并将其唤醒
  • 当调用了 obj.notifyAll() 方法,则会唤醒这个等待队列中的所有等待线程
  • obj 相当于多个线程之间的通信手段

wait

  • obj.wait()obj.notify() 方法并不能随便调用,必须包含在对应的 synchronized(obj) 语句中,即方法执行前都需要获得目标对象的锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class WaitAndNotify {

final static Object obj = new Object();

static class T1 extends Thread {
@Override
public void run() {
synchronized (obj) {
System.out.println("t1 start at " + System.currentTimeMillis());
try {
obj.wait(); // 线程等待,释放锁
Thread.sleep(2000); // 被notify后,尝试获取锁
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t1 stop at " + System.currentTimeMillis());
}
}
}

static class T2 extends Thread {
@Override
public void run() {
synchronized (obj) {
System.out.println("t2 start at " + System.currentTimeMillis());
obj.notify(); // notify,等2s后释放锁
System.out.println("t2 notify at " + System.currentTimeMillis());
ThreadUtil.sleep(2000);
System.out.println("t2 stop at " + System.currentTimeMillis());
}
}
}

public static void main(String[] args) {
Thread t1 = new T1();
Thread t2 = new T2();
t1.start();
t2.start();
}
}
// t1 start at 1688903167355
// t2 start at 1688903167355
// t2 notify at 1688903167355
// t2 stop at 1688903169355
// t1 stop at 1688903171355
  • wait 和 sleep 区别:
    • wait 可被唤醒
    • wait 可释放目标对象的锁

5. 挂起和继续执行(suspend/resume

  • 已废弃:suspend() 方法在导致线程暂停的同时,并不会释放任何锁资源。直到对应的线程上执行了 resume() 操作,被挂起的线程才能继续
  • 案例:如果 resume() 方法意外的在 suspend() 方法前执行了,被挂起的线程就很难有机会被继续执行,且其占用的锁不会被释放

suspend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class BadSuspend {
static Object obj = new Object();

static class ChangeObjThread extends Thread {
public ChangeObjThread(String name) {
super.setName(name);
}

@Override
public void run() {
synchronized (obj) {
System.out.println(getName() + " in at " + System.currentTimeMillis());
Thread.currentThread().suspend();
}
}
}

public static void main(String[] args) throws InterruptedException {
Thread t1 = new ChangeObjThread("t1");
Thread t2 = new ChangeObjThread("t2");
t1.start();
Thread.sleep(1000);
t2.start();
t1.resume(); // t1释放锁
t2.resume(); // t2挂起前执行了resume
t1.join();
t2.join(); // 等待子进程结束,程序在此卡住,且永远占用obj的锁
}
}
// t1 in at 1688995419942
// t2 in at 1688995420942
  • 使用 jstack 打印系统线程信息,可以看到线程 t2 是被挂起的,但他的线程状态却是 RUNNABLE
1
2
3
4
5
6
7
$ jstack -l 3552
"t2" #21 prio=5 os_prio=0 tid=0x000000002c727800 nid=0x191c runnable [0x000000002f28f000]
java.lang.Thread.State: RUNNABLE
at java.lang.Thread.suspend0(Native Method)
at java.lang.Thread.suspend(Thread.java:1032)
at com.lb.juc.t02.BadSuspend$ChangeObjThread.run(BadSuspend.java:19)
- locked <0x00000006eb852c58> (a java.lang.Object)
  • 使用 wait()notify() 方法实现 suspend()resume() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class GoodSuspend {
static Object obj = new Object();

static class ChangeObjThread extends Thread {
volatile boolean suspended = false;

public void suspendMe() {
suspended = true;
}

public void resumeMe() {
suspended = false;
synchronized (this) {
notify();
}
}

@Override
public void run() {
while (true) {
synchronized (this) {
while (suspended) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
synchronized (obj) {
System.out.println("ChangeObjThread in at " + System.currentTimeMillis());
ThreadUtil.sleep(100);
}
Thread.yield();
}
}
}

static class ReadObjThread extends Thread {
@Override
public void run() {
while (true) {
synchronized (obj) {
System.out.println("ReadObjThread in at " + System.currentTimeMillis());
ThreadUtil.sleep(100);
}
Thread.yield();
}
}
}

public static void main(String[] args) {
ChangeObjThread t1 = new ChangeObjThread();
ReadObjThread t2 = new ReadObjThread();
t1.start();
t2.start();
ThreadUtil.sleep(1000);
t1.suspendMe();
ThreadUtil.sleep(2000);
t1.resumeMe();
}
}

6. 等待线程结束和谦让(join/yeild)

  • 如果一个线程的输入依赖于另一个或多个线程的输出,该线程就需要等待依赖线程执行完毕后,才能继续执行
1
2
3
4
// 无线等待,直到目标线程执行完毕
public final void join() throws InterruptedException {...}
// 给出最大等待时间
public final synchronized void join(long millis) throws InterruptedException {...}
  • join 案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class JoinThread {
volatile static int i = 0;

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread() {
@Override
public void run() {
for (i = 0; i < 10000; i++);
}
};
t1.start();
t1.join(); // 主线程等待t1线程执行完毕
System.out.println(i);
}
}
// 10000
  • join() 方法的本质是让调用线程 wait() 在其对象实例上。线程在退出前会调用 notifyAll() 方法通知所有等待线程继续执行
  • 不要在 Thread 对象实例上使用 wait()notify() 方法,可能影响系统 API 正常工作
1
2
3
while (isAlive()) {
wait(0);
}
  • yield() 是一个静态方法,执行后会让当前线程让出 CPU。当前线程让出 CPU 后,仍会进行 CPU 资源的争夺
1
public static native void yield();

三、volatile 与 Java 内存模型(JMM)

  • 用 volatile 声明一个变量,如果变量被修改,则应用程序范围内的所有线程都能看到这个改动
  • volatile 不能替代锁,也无法保证一些复合元素操作的原子性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Visibility {
static boolean ready;
static int number;

public static void main(String[] args) {
Thread t1 = new Thread() {
@Override
public void run() {
while(!ready);
System.out.println(number);
}
};
t1.start();
ThreadUtil.sleep(1000);
number = 47;
ready = true;
ThreadUtil.sleep(2000);
}
}
  • 上述代码在虚拟机的 Client 模式下(64 位没有该模式),由于 JIT 没有做优化,线程能够发现改动,并退出程序。但在 Server 模式下将无法退出

四、线程组

  • 可以将相同功能的线程放在同一个线程组中
  • 线程组也有 stop() 方法,它会停止组中的所有线程,但其和 Thread.stop() 方法有相同的问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ThreadGroupTest implements Runnable {

public static void main(String[] args) {
ThreadGroup group = new ThreadGroup("TestGroup");
Thread t1 = new Thread(group, new ThreadGroupTest(), "T1");
Thread t2 = new Thread(group, new ThreadGroupTest(), "T2");
t1.start();
t2.start();
System.out.println(group.activeCount()); // 活动线程总数(估计值)
group.list(); // 打印线程组信息
}

@Override
public void run() {
String groupAndName = Thread.currentThread().getThreadGroup().getName()
+ "@" + Thread.currentThread().getName();
while (true) {
ThreadUtil.sleep(2000);
System.out.println(groupAndName);
}
}
}
/*
2
java.lang.ThreadGroup[name=TestGroup,maxpri=10]
Thread[T1,5,TestGroup]
Thread[T2,5,TestGroup]
TestGroup@T1
TestGroup@T2
TestGroup@T2
TestGroup@T1
*/

五、守护线程

  • 守护线程会在后台默默完成一些系统性服务,比如垃圾回收线程、JIT 线程等。用户线程是系统的工作线程,它会完成程序应该完成的业务操作。如果用户线程结束,意味着程序执行完毕,守护线程也会随之结束
  • 设置守护线程需要在线程 start() 之前设置,否则会抛出异常,但程序和线程依然能正常执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class DaemonTest {

public static void main(String[] args) {
Thread t1 = new Thread() {
@Override
public void run() {
while (true) {
System.out.println("t1 running");
ThreadUtil.sleep(1000);
}
}
};
t1.setDaemon(true); // 不设置或在start之后设置:线程会一直执行
t1.start();
ThreadUtil.sleep(2000);
}
}
// t1 running
// t1 running

六、线程优先级

  • 优先级高的线程在竞争资源时更有优势。线程的优先级调度和底层操作系统有密切的关系,在各个平台上表现不一,无法精准控制,因此仍需在应用层解决线程调度问题
  • Java 中使用 1-10 表示线程优先级,一般使用内置的三个静态变量表示,数字越大则优先级越高
1
2
3
public final static int MIN_PRIORITY = 1;
public final static int NORM_PRIORITY = 5;
public final static int MAX_PRIORITY = 10;
  • 优先级测试案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class PriorityTest {
static class HighPriority extends Thread {
@Override
public void run() {
int count = 0;
while (true) {
synchronized (PriorityTest.class) {
if (++count > 1000000) {
System.out.println("HighPriority Finished");
break;
}
}
}
}
}

static class LowPriority extends Thread {
@Override
public void run() {
int count = 0;
while (true) {
synchronized (PriorityTest.class) {
if (++count > 1000000) {
System.out.println("LowPriority Finished");
break;
}
}
}
}
}

public static void main(String[] args) {
Thread t1 = new HighPriority();
Thread t2 = new LowPriority();
t1.setPriority(Thread.MAX_PRIORITY);
t2.setPriority(Thread.MIN_PRIORITY);
t2.start();
t1.start();
}
}
// HighPriority Finished
// LowPriority Finished

七、线程安全与 synchronized

  • volatile 无法保证线程安全,它只能确保一个线程修改数据后,其他线程能够看到这个改动。如果两个线程同时修改一个数据,就可能导致数据错误
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class IncreaseVol implements Runnable {

static volatile int i = 0;

@Override
public void run() {
for (int j = 0; j < 100000; j++) {
i++;
}
}

public static void main(String[] args) throws InterruptedException {
IncreaseVol instance = new IncreaseVol();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i); // 148624
}
}
  • synchronized 的作用是对同步的代码加锁,使得每次只有一个线程进入同步块,从而保证线程间的安全性
  • 使用方式:
    • 指定加锁对象:对给定对象加锁,进入同步代码前要获得给定对象的锁
    • 直接作用于实例方法:对当前实例加锁,进入同步代码前要获得当前实例的锁
    • 直接作用于静态方法:对当前类加锁,进入同步代码前要获得当前类的锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class IncreaseSync implements Runnable {

static int i = 0;

@Override
public void run() {
for (int j = 0; j < 100000; j++) {
synchronized (this) {
i++;
}
}
}

public static void main(String[] args) throws InterruptedException {
IncreaseSync instance = new IncreaseSync();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i); // 200000
}
}

八、隐蔽的错误

1. 无提示的错误案例

1
2
3
int v1 = 1073741827;
int v2 = 1431655768;
System.out.println((v1 + v2) / 2); // -894784850

2. 并发下的 ArrayList

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ArrayListMultiThread {
static ArrayList<Integer> list = new ArrayList<>();
static class AddThread implements Runnable {

@Override
public void run() {
for (int i = 0; i < 10000; i++) {
list.add(i);
}
}
}

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new AddThread());
Thread t2 = new Thread(new AddThread());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(list.size());
}
}
  • 由于多线程访问冲突,使得保存容器大小的变量被多线程不正常的访问。有三种结果:
    1. 程序正常结束,list 大小为 20000
    2. 程序抛出 ArrayIndexOutOfBoundsException 异常
    3. 程序正常结束,list大小不为 20000

3. 并发下的 HashMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class HashMapMultiThread {
static Map<String, String> map = new HashMap<>();
static class AddThread implements Runnable {
int start = 0;
public AddThread(int start) {
this.start = start;
}
@Override
public void run() {
for (int i = start; i < 10000; i+=2) {
map.put(Integer.toString(i), Integer.toBinaryString(i));
}
}
}

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new AddThread(0));
Thread t2 = new Thread(new AddThread(1));
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(map.size());
}
}
  • (JDK 7)有三种结果:
    1. 程序正常结束,map 大小为 10000
    2. 程序正常结束,map 大小不为 10000
    3. 程序无法结束(由于多线程冲突,链表结构遭到破环,链表成环。卡在 put 方法,遍历内部数据时死循环)

4. 错误的加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class LockOnInteger implements Runnable {
static Integer i = 0;

@Override
public void run() {
for (int j = 0; j < 10000; j++) {
synchronized (i) {
i++; // i = Integer.valueOf(i.intValue() + 1);
}
}
}

public static void main(String[] args) throws InterruptedException {
LockOnInteger instance = new LockOnInteger();
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i); // 14655
}
}
  • Java 中的 Integer 属于不变对象,即对象一旦被创建,就不可能被修改。因此 i++ 本质上时创建一个新的对象,并将其引用赋值给 i