用wait()/notify()实现生产者消费者问题

这3个方法的作用:
wait:线程自动释放其占有的对象锁,并等待notify
notify:唤醒一个正在wait当前对象锁的线程,并让它拿到对象锁
notifyAll:唤醒所有正在wait前对象锁的线程

notify和notifyAll的最主要的区别是:notify只是唤醒一个正在wait当前对象锁的线程,而notifyAll唤醒所有。值得注意的是:notify是本地方法,具体唤醒哪一个线程由虚拟机控制;notifyAll后并不是所有的线程都能马上往下执行,它们只是跳出了wait状态,接下来它们还会是竞争对象锁。

注意:

  1. 永远在synchronized的函数或对象里使用wait、notify和notifyAll,不然Java虚拟机会生成 IllegalMonitorStateException。

  2. 永远在while循环里而不是if语句下使用wait。这样,循环会在线程睡眠前后都检查wait的条件,并在条件实际上并未改变的情况下处理唤醒通知。

  3. 永远在多线程间共享的对象(在生产者消费者模型里即缓冲区队列)上使用wait。

  4. notify 仅仅通知一个线程,并且我们不知道哪个线程会收到通知,然而 notifyAll 会通知所有等待中的线程。换言之,如果只有一个线程在等待一个信号灯,notify和notifyAll都会通知到这个线程。但如果多个线程在等待这个信号灯,那么notify只会通知到其中一个,而其它线程并不会收到任何通知,而notifyAll会唤醒所有等待中的线程。所以更倾向用 notifyAll(),而不是 notify()。

  5. 调用wait()/notify()都是在synchronized代码块中,已经拿到对象锁。A线程调用wait()之后A线程释放锁,A线程进入等待状态;B线程调用notify()之后唤醒一个正在wait对象锁的线程,只有A在等待就唤醒A,B不会马上释放锁,要到synchronized代码块退出之后才释放锁。

使用wait和notify函数的规范代码模板:

1
2
3
4
5
6
7
8
// The standard idiom for calling the wait method in Java 
synchronized (sharedObject) {
while (condition) {
sharedObject.wait();
// (Releases lock, and reacquires on wakeup)
}
// do action based upon condition e.g. take or put into queue
}

我们有两个线程,分别名为PRODUCER(生产者)和CONSUMER(消费者),他们分别继承了Producer和Consumer类,而Producer和Consumer都继承了Thread类。Producer和Consumer想要实现的代码逻辑都在run()函数内。Main线程开始了生产者和消费者线程,并声明了一个LinkedList作为缓冲区队列(在Java中,LinkedList实现了队列的接口)。生产者在无限循环中持续往LinkedList里插入随机整数直到LinkedList满。我们在while(queue.size == maxSize)循环语句中检查这个条件。请注意到我们在做这个检查条件之前已经在队列对象上使用了synchronized关键词,因而其它线程不能在我们检查条件时改变这个队列。如果队列满了,那么PRODUCER线程会在CONSUMER线程消耗掉队列里的任意一个整数,并用notify来通知PRODUCER线程之前持续等待。在我们的例子中,wait和notify都是使用在同一个共享对象上的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package array;

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;

public class ProducerConsumer {
public static void main(String args[]) {
Queue<Integer> buffer = new LinkedList<>();
int maxSize = 10;
Thread producer = new Producer(buffer, maxSize, "PRODUCER");
Thread consumer = new Consumer(buffer, maxSize, "CONSUMER");
producer.start();
consumer.start();
}
}

class Producer extends Thread {
private Queue<Integer> queue;
private int maxSize;

public Producer(Queue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
this.maxSize = maxSize;
}

@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.size() == maxSize) {
System.out.println("Queue is full, Producer thread waiting for consumer to take something from queue");
try {
queue.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
Random random = new Random();
int i = random.nextInt();
System.out.println("Producing value : " + i);
queue.add(i);
queue.notifyAll();
}
}
}
}

class Consumer extends Thread {
private Queue<Integer> queue;
private int maxSize;

public Consumer(Queue<Integer> queue, int maxSize, String name) {
super(name);
this.queue = queue;
this.maxSize = maxSize;
}

@Override
public void run() {
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
System.out.println("Queue is empty, Consumer thread is waiting for producer thread to put something in queue");
try {
queue.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("Consuming value : " + queue.remove());
queue.notifyAll();
}
}
}
}
谢谢小天使请我吃糖果
0%