Object::wait|notify

Object::wait 最佳实践

Context

Object::wait 的方法注释中有这样一句:

As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop:

1
2
3
4
5
synchronized (obj) {
while (<condition does not hold>)
obj.wait();
... // Perform action appropriate to condition
}

大意是 Object::wait 的最佳实践是放在循环体中使用,循环体之后是获取锁资源后真实执行的同步代码。

与此同时,我们在网上找到好多使用 if 代替 while 相关的 wait 代码,比如 java JUC 中 Object里wait()、notify() 实现原理及实战讲解Java 多线程编程之:notify 和 wait 用法。当然,也有讨论为什么 wait 要放在 loop 中使用的文章:论Object.wait()要放到while循环里。文章都能看懂,但以前两篇文章为例,用 if 好像也没什么问题。

抛出问题

接下来,以下题为例讨论一下:

启动两个线程, 一个输出 1,3,5,7…99,另一个输出 2,4,6,8…100, 最后 STDOUT 中按序输出 1,2,3,4,5…100。

解题

题目中的题眼在于『按序输出 1,2,3,4,5…100』中的『按序』。如果单单是启动两个线程,一个输出奇数,一个输出偶数,很简单。只要控制好打印条件i%2即可,但输出顺序无法保证,有可能得到

1 3 5 2 4 7…

所以需要引入互斥资源,保证交替打印。以下是我给出的第一份解法:👇🏻

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 独占式对象资源
public static Object r1 = new Object();

// 代码需要 main 方法中执行,不要使用 @Test
public static void main(String[] arg){
new Thread(() -> {
synchronized (r1){
for (int i = 1; i<= 100; ++i){
if(i%2 != 0){
// 输出单数
System.out.println("t1: " + i);
try {
// 通知输出双数
r1.notifyAll();
// 当前线程主动阻塞,最后一次不阻塞
if(i != 99){
r1.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}, "t1").start();

new Thread(() -> {
synchronized (r1){
for (int i = 1; i<= 100; ++i){
// 输出双数
if(i%2 == 0){
System.out.println("t2: " + i);
// 通知单数执行
r1.notifyAll();
// 最后一次不阻塞
if(i != 100){
try {
r1.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}, "t2").start();
}

代码中在执行 wait 语句时使用的是 if ,最后输出的结果没啥问题。

我们将for 循环体中的逻辑改成范式试试看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 引入公共变量
public static int i = 1;

// t1
for (; i<= 100; ++i){
// i 为偶数则阻塞
while(i%2 == 0){
try {
r1.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(i<=100){
System.out.println("t1: " + i);
}
r1.notifyAll();
}

// t2
for (; i<= 100; ++i){
// i 为奇数,则阻塞
while(i%2 != 0){
try {
r1.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(i <= 100){
System.out.println("t2: " + i);
}
r1.notifyAll();
}

改造后,运行结果与原来无异。以上👆🏻面的改造为基础,单纯把 while 改造成 if 对结果也是没影响的。为什么呢?

因为只有两个线程,且每次 i++ 都会满足另外一个线程中同步代码块的条件,完全不存在 虚假唤醒 !此时,我们需要像魏成一样,引入第三个球体。

引入第三个变量

此时,题目由原来的两个线程变成三个线程,同时启动,交替打印t1:1 t2:2 t3:3 t4:4 t5:5 t6:6… 即,3*+1 3*+2 3*+3

mark:10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public static Object r1 = new Object();
// 引入公共变量
public static int i = 1;

public static void main(String[] arg){
new Thread(() -> {
synchronized (r1){
System.out.println("t1 get r1. i= " + i);
for (; i<= 100; ++i){
if(i%3 != 1){
try {
System.out.println("t1 before wait.");
r1.wait();
System.out.println("t1 waited.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(if<=100){
System.out.println("t1: " + i);
}
r1.notifyAll();
System.out.println("t1 after notify i: " + i);
}
}
}, "t1").start();

new Thread(() -> {
synchronized (r1){
System.out.println("t2 get r1. i= " + i);
for (; i<= 100; ++i){
if(i%3 != 2){
try {
System.out.println("t2 before wait.");
r1.wait();
System.out.println("t2 waited.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(i <= 100){
System.out.println("t2: " + i);
}
r1.notifyAll();
System.out.println("t2 after notify i: " + i);
}
}
}, "t2").start();

new Thread(() -> {
synchronized (r1){
System.out.println("t3 get r1. i= " + i);
for (; i<= 100; ++i){
if(i%3 != 0){
try {
System.out.println("t3 before wait.");
r1.wait();
System.out.println("t3 waited.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(i <= 100){
System.out.println("t3: " + i);
}
r1.notifyAll();
System.out.println("t3 after notify i: " + i);
}
}
}, "t3").start();
}

以上我们使用的是 if ,结果如下:

t1 get r1. i= 1
t1: 1
t1 after notify i: 1
t1 before wait. // t1 阻塞,进入monitor WAIT_SET 条件等待队列
t2 get r1. i= 2
t2: 2
t2 after notify i: 2 // t2 唤醒所有线程,t1 进入 ENTRY_SET 入口等待队列
t2 before wait. // t2 阻塞
t1 waited. // t1获取独占锁,但此时并不满足 i%3 != 1条件,不应向下执行。 ❌
t1: 3 // 我们的期望是 t3 输出 3 ❌
t1 after notify i: 3
t1: 4
t1 after notify i: 4
t1 before wait.

此时将上述 wait 代码块中的 if 改为 while 可完美解决问题。

可能有同学说了,那我把三个循环都改成这样,成不成啊?👇🏻

1
2
3
4
5
6
7
8
9
10
11
12
for (; i<= 100; ++i){
if(i%3 == 1){
System.out.println("t1: " + i);
}
try {
r1.notifyAll();
// 当前线程主动阻塞,最后一次不阻塞
r1.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

那肯定不成啊。能保证三个线程依次获取锁资源么?妥妥不能,为啥?synchronized 是非公平锁喔~最后输出的没准儿是这样:

t1: 1
t1: 4
t1: 7
t3: 12
t1: 13
t2: 14

最终的是,wait 代码放最后,打印结束之后谁来唤醒呢?最后三个线程都一直处于阻塞状态,服务器要崩…

扩展:使用 ReentrantLock 和 Condition 该如何实现呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
ReentrantLock lock = new ReentrantLock();
Condition c1 = lock.newCondition();

new Thread(() -> {
try {
lock.lockInterruptibly();
for (; i<=100; i++){
while(i%3 != 1){
c1.await();
}
if(i<=100){
System.out.println("t1: " + i);
}
c1.signalAll();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();

new Thread(() -> {
try {
lock.lockInterruptibly();
for (; i<=100; i++){
while(i%3 != 2){
c1.await();
}
if(i<=100){
System.out.println("t2: " + i);
}
c1.signalAll();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();

new Thread(() -> {
try {
lock.lockInterruptibly();
for (; i<=100; i++){
while(i%3 != 0){
c1.await();
}
if(i<=100){
System.out.println("t3: " + i);
}
c1.signalAll();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();

或者使用 signal 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
ReentrantLock lock = new ReentrantLock();
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
Condition c3 = lock.newCondition();

new Thread(() -> {
try {
lock.lockInterruptibly();
for (; i<=100; i++){
while(i%3 != 1){
c1.await();
}
if(i<=100){
System.out.println("t1: " + i);
}
c2.signal();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();

new Thread(() -> {
try {
lock.lockInterruptibly();
for (; i<=100; i++){
while(i%3 != 2){
c2.await();
}
if(i<=100){
System.out.println("t2: " + i);
}
c3.signal();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();

new Thread(() -> {
try {
lock.lockInterruptibly();
for (; i<=100; i++){
while(i%3 != 0){
c3.await();
}
if(i<=100){
System.out.println("t3: " + i);
}
c1.signal();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();