Java 线程之间的通信 —— 等待和通知,生产者消费者示例

Dec 3, 2021 阅读(773)

标签: Java

wait(), notify() 与 notifyAll() 是 Object 定义的方法, 可以通过这3个方法控制线程是否对象的锁定,或者通知线程参与锁定竞争。


线程进入 synchronized 范围前,要先取得指定对象的锁定。执行 synchronized 范围的程序代码期间,若调用锁定对象的 wait() 方法,线程会释对象放锁,并进入对象等待集合而处于阻断状态,其他线程可以竞争对象锁定,取得锁的线程可以执行 synchronized 范围的程序代码。


放进等待集合的线程不会参与 CPU 排版, wait() 可以指定等待时间,指定的时间到了之后线程会再次加入排班,如果指定时间为0或不指定 ,则线程会持续等待,直到被中断(调用 interrupt())或被通知(notify())可以参与排班。


被竞争锁定的对象在调用 notify() 时,会从对象等待集合中随机通知一个线程加入排班,再次执行 synchronized 前,被通知的线程会与其他线程共同竞争对象锁定;如果调用 notifyAll() ,所有等待集合中的线程都会被通知与排班,这些线程会与其他线程共同竞争对象的锁定。

image.png

举个简单生产者消费者的例子,如下:

Producer.java

package threads.producer.consumer;

/**
 * 生产者
 */
public class Producer implements Runnable {

    private Store store;

    public Producer(Store store) {
        this.store = store;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                String product = String.format("最新款 %d 号手机", i+1);
                store.setProduct(product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


Consumer.java

package threads.producer.consumer;

/**
 * 消费者
 */
public class Consumer implements Runnable {

    private Store store;

    public Consumer(Store store) {
        this.store = store;
    }

    @Override
    public void run() {
        while(true){
            try {
                store.getProduct();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


Store.java

package threads.producer.consumer;

/**
 * 商店类
 *
 * wait(), notify() 与 notifyAll() 是 Object 定义的方法, 可以通过这3个方法控制线程是否对象的锁定,或者通知线程参与锁定竞争。
 */
public class Store {

    // 商店持有的商品
    private String product;

    // 接收商品
    public synchronized void setProduct(String product) throws InterruptedException {
        // 有商品则等待(不接收)
        waitIfExists();

        // 接收商品
        System.out.println("商品进新商品: " + product);
        this.product = product;

        // 通知等待的线程(例如消费者)
        notify();
    }

    // 如果商店有商品则等待
    private synchronized void waitIfExists() throws InterruptedException {
        while (null != this.product) wait();
    }

    // 卖出商品
    public synchronized void getProduct() throws InterruptedException {
        // 没有商品卖等待
        waitIfEmpty();

        // 卖出商品
        System.out.println("已卖出商品: " + product);
        this.product = null;

        // 通知等待线程(例如生产者)
        notify();
    }

    // 如果商店没有商品则等待
    private synchronized void waitIfEmpty() throws InterruptedException {
        while (null == this.product) wait();
    }

}


ProducerConsumerDemo.java

package threads.producer.consumer;

public class ProducerConsumerDemo {
    public static void main(String[] args) {
        Store store = new Store();
        new Thread(new Producer(store)).start();
        new Thread(new Consumer(store)).start();
    }
}

运行效果:

image.png

MongoDB学习园