用wait()/notify()实现生产者消费者问题

这3个方法的作用:
wait:线程自动释放其占有的对象锁,并等待notify
notify:唤醒一个正在wait当前对象锁的线程,并让它拿到对象锁
notifyAll:唤醒所有正在wait前对象锁的线程

notify和notifyAll的最主要的区别是:notify只是唤醒一个正在wait当前对象锁的线程,而notifyAll唤醒所有。值得注意的是:notify是本地方法,具体唤醒哪一个线程由虚拟机控制;notifyAll后并不是所有的线程都能马上往下执行,它们只是跳出了wait状态,接下来它们还会是竞争对象锁。

注意:

  1. 永远在synchronized的函数或对象里使用wait、notify和notifyAll,不然Java虚拟机会生成 IllegalMonitorStateException。

  2. 永远在while循环里而不是if语句下使用wait。这样,循环会在线程睡眠前后都检查wait的条件,并在条件实际上并未改变的情况下处理唤醒通知。

  3. 永远在多线程间共享的对象(在生产者消费者模型里即缓冲区队列)上使用wait。

  4. notify 仅仅通知一个线程,并且我们不知道哪个线程会收到通知,然而 notifyAll 会通知所有等待中的线程。换言之,如果只有一个线程在等待一个信号灯,notify和notifyAll都会通知到这个线程。但如果多个线程在等待这个信号灯,那么notify只会通知到其中一个,而其它线程并不会收到任何通知,而notifyAll会唤醒所有等待中的线程。所以更倾向用 notifyAll(),而不是 notify()。

  5. 调用wait()/notify()都是在synchronized代码块中,已经拿到对象锁。A线程调用wait()之后A线程释放锁,A线程进入等待状态;B线程调用notify()之后唤醒一个正在wait对象锁的线程,只有A在等待就唤醒A,B不会马上释放锁,要到synchronized代码块退出之后才释放锁。

使用wait和notify函数的规范代码模板:

1
2
3
4
5
6
7
8
// The standard idiom for calling the wait method in Java 
synchronized (sharedObject) {
while (condition) {
sharedObject.wait();
// (Releases lock, and reacquires on wakeup)
}
// do action based upon condition e.g. take or put into queue
}

我们有两个线程,分别名为PRODUCER(生产者)和CONSUMER(消费者),他们分别继承了Producer和Consumer类,而Producer和Consumer都继承了Thread类。Producer和Consumer想要实现的代码逻辑都在run()函数内。Main线程开始了生产者和消费者线程,并声明了一个LinkedList作为缓冲区队列(在Java中,LinkedList实现了队列的接口)。生产者在无限循环中持续往LinkedList里插入随机整数直到LinkedList满。我们在while(queue.size == maxSize)循环语句中检查这个条件。请注意到我们在做这个检查条件之前已经在队列对象上使用了synchronized关键词,因而其它线程不能在我们检查条件时改变这个队列。如果队列满了,那么PRODUCER线程会在CONSUMER线程消耗掉队列里的任意一个整数,并用notify来通知PRODUCER线程之前持续等待。在我们的例子中,wait和notify都是使用在同一个共享对象上的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;

public class MyProducerConsumer {
public static void main(String[] args) {
// 生产者消费者的缓冲区
Queue<Integer> buffer = new LinkedList<>();
int maxSize = 10;
Thread producer1 = new Producer(buffer, maxSize, "producer1");
Thread producer2 = new Producer(buffer, maxSize, "producer2");
Thread consumer1 = new Consumer(buffer, maxSize, "consumer1");
Thread consumer2 = new Consumer(buffer, maxSize, "consumer2");
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
}
}

class Producer extends Thread {
private Queue<Integer> buffer;
private int maxSize;
private Random random = new Random();

public Producer(Queue<Integer> buffer, int maxSize, String name) {
super(name);
this.buffer = buffer;
this.maxSize = maxSize;
}

@Override
public void run() {
// 一直会尝试去抢buffer的对象锁
while (true) {
synchronized (buffer) {
// 进入了synchronized代码区则表明已经抢到了锁
// 缓冲区为满,放不下
while (buffer.size() == maxSize) {
System.out.println("Buffer is full, cannot produce!");
// 生产者释放锁,等待被notify
try {
buffer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 随便生产一个10以内的数放入缓冲区
int tmp = random.nextInt(10);
buffer.add(tmp);
System.out.print(this.getName() + " produces value : " + tmp + ", size of buffer = " + buffer.size() + ", buffer = ");
for (Integer i : buffer) {
System.out.print(i + ",");
}
System.out.println();
// 唤醒在wait的线程,准备释放buffer对象锁
buffer.notifyAll();
}
}
}
}

class Consumer extends Thread {
private Queue<Integer> buffer;
private int maxSize;

public Consumer(Queue<Integer> buffer, int maxSize, String name) {
super(name);
this.buffer = buffer;
this.maxSize = maxSize;
}

@Override
public void run() {
// 一直尝试去抢buffer的对象锁
while (true) {
synchronized (buffer) {
// 进入了synchronized代码区则表明已经抢到了锁
while (buffer.isEmpty()) {
System.out.println("Buffer is empty, cannot consume!");
// 消费者线程被阻塞,释放锁
try {
buffer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从末端消费一个
System.out.print(this.getName() + " consumes value : " + buffer.remove() + ", size of buffer = " + buffer.size() + ", buffer = ");
for (Integer i : buffer) {
System.out.print(i + ",");
}
System.out.println();
// notify在wait的被阻塞的线程,准备释放buffer对象锁
buffer.notifyAll();
}
}
}
}

两个线程交替打印奇数和偶数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Print1234 {
public static void main(String[] args) {
Printer printer = new Printer();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
printer.printOdd();
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 5; i++) {
printer.printEven();
}
}
}).start();
}
}

class Printer {
private boolean flag = true;
private int odd = 1;
private int even = 2;

/**
* flag为true打印奇数
*/
public synchronized void printOdd() {
while (!flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(odd);
odd += 2;
flag = false;
this.notify();
}

/**
* flag为false打印偶数
*/
public synchronized void printEven() {
while (flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(even);
even += 2;
flag = true;
this.notify();
}
}
谢谢小天使请我吃糖果
0%