【实战分享】有哪几种实现生产者消费者模式的方法?

邓敏 2021年12月09日 71次浏览

什么是生产者消费者模式

生产者消费者模式其实是一种设计模式,在生活中四处可见,比如我们在排队买奶茶,奶茶店里面的店员去生产奶茶,然后给消费者消费,在这里,店里面的店员就是一个生产者,顾客就是一个消费者。并且在生产者消费者的概念中,生产者和消费者是一一对应的,也就是说,奶茶店里面的店员生产了一杯奶茶只能供一个顾客,顾客想和第二杯也不行,就需要重新排队。但是如果当天奶茶店的老板打了鸡血给店员们发了奖金说今天要做1000杯奶茶,导致做奶茶太快,老板发现排队的人消费不了这么多的奶茶,俗称“产能过剩”,这个时候就需要老板来调度店员先停一停,等店里做好的奶茶先消费完,同时让店员去店门口吆客。那其实在这里就其实以阻塞队列的形式形成的生产者消费者模型。
image.png

可以看到上面这个图,生产者去生产奶茶,生产到了1000杯的时候停止生产,此时消费者看到奶茶店有奶茶就会过来消费,当1000杯奶茶都杯消费完了之后,店员就会告诉消费者,让他稍微等等,消费者也会告诉生产者你们的奶茶不够1000杯啦可以继续生产。

相信通过奶茶店卖奶茶的例子大家能够对生产者消费者模式有一定的了解,总结的来说,生产者消费者模式起到的最重要的作用就是能够让生产者生产的东西可以有缓存起来让消费者慢慢消费。

那么在我们工作当中,如何使用技术来实现这样的一种生产者消费者模式呢?

实现方式

使用 BlockingQueue 实现生产者消费者模式

public class BlockingQMain {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

        Thread producer = new Thread(()->{
            int i = 0;
            while (true){
                try {
                    //奶茶店每隔5秒生产一杯奶茶
                    TimeUnit.SECONDS.sleep(5);
                    queue.put(++i);
                    System.out.println("生产者生产了第"+i+"杯奶茶,当前门店还有"+queue.size()+"杯奶茶待消费");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread consumer = new Thread(()->{
            while (true){
                try {
                    //消费者随机排队,随机1到9秒出现一个消费者
                    Random random = new Random();
                    int seconds = random.nextInt(10)+1;
                    TimeUnit.SECONDS.sleep(seconds);
                    System.out.println("过了"+seconds+"秒之后来了一位消费者");
                    Integer msg = queue.take();
                    System.out.println("消费者拿到了第"+msg+"杯奶茶");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        producer.start();
        consumer.start();
    }
}

先简单的带大家了解一下BlockingQueue, BlockingQueue是一个阻塞队列,其作用就是在队列满的时候,生产者会阻塞不再生产,等队列有空闲位置的时候才去生产。消费等队列有数据时消费,如果没有数据则会阻塞等待。而且他也是一种先进先出的队列
在上面的例子中我们实现了奶茶店的场景,奶茶店会每隔五秒钟生产一杯奶茶,而出现消费者去排队会随机1-9秒出现一个。并起了两个线程去实现它,从上面的代码来看,貌似是一个很简单的实现,但实际上BlockingQueue在里面起到了很重要的作用,如果队列满了就去阻塞生产者的线程,队列有空就去唤醒生产者的线程。

使用 Condition 实现生产者消费者模式

关于Condition 可以先看下这篇文章简单入门一下
Condition类的介绍与使用

其实使用Condition来实现生产者消费模式原理跟上面BlockingQueue来实现的原理差不多,区别在于BlockingQueue这个阻塞队列需要我们自己去用Condition来实现他的功能,那么BlockingQueue会有哪些功能是需要我们用到的呢?

  1. 可以缓存消息
  2. 保证消息先入先出
  3. 可以设置队列最大限制
  4. 队列为空时,消费方法阻塞。
  5. 队列到达最大值时,生产者阻塞
  6. 队列有值时通知消费者消费
  7. 队列没有到达最大值时通知生产者生产者生产

实现代码如下

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionQueue {

    private Lock lock = new ReentrantLock();

    private Condition producer = lock.newCondition();

    private Condition consumer = lock.newCondition();

    private Queue<String> queue;

    private int max;

    public ConditionQueue(int size) {
        max = size;
        queue = new LinkedList<String>();
    }

    public void put(String msg){
        try {
            lock.lock();
            //如果队列满了则阻塞生产
            while (queue.size() == max){
                System.out.println("生产者停止生产了");
                producer.await();
                System.out.println("生产者开始生产了");
            }
            queue.add(msg);
            //队列有数据了,唤醒消费
            consumer.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public String take() throws InterruptedException {
        lock.lock();
        try {
            //如果队列空了则阻塞消费
            while (queue.isEmpty()) {
                System.out.println("消费者停止消费了");
                consumer.await();
                System.out.println("消费者开始消费了");
            }
            String msg = queue.remove();
            //队列消费有空位了,唤醒生产者生产
            producer.signal();
            return msg;
        } finally {
            lock.unlock();
        }
    }

    public int size(){
        return queue.size();
    }


    public static void main(String[] args) {
        ConditionQueue queue = new ConditionQueue(10);
        Thread producer = new Thread(()->{
            int i = 0;
            while (true){
                try {
                    //奶茶店每隔1秒生产一杯奶茶
                    TimeUnit.SECONDS.sleep(1);
                    queue.put(String.valueOf(++i));
                    System.out.println("生产者生产了第"+i+"杯奶茶,当前门店还有"+queue.size()+"杯奶茶待消费");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread consumer = new Thread(()->{
            while (true){
                try {
                    //消费者随机排队,随机1到9秒出现一个消费者
                    Random random = new Random();
                    int seconds = random.nextInt(10)+1;
                    TimeUnit.SECONDS.sleep(seconds);
                    System.out.println("过了"+seconds+"秒之后来了一位消费者");
                    String msg = queue.take();
                    System.out.println("消费者拿到了第"+msg+"杯奶茶");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        producer.start();
        consumer.start();
    }
}

可能大家看到代码后会对队列中while (queue.isEmpty())和while (queue.size() == max)产生疑问,感觉这里可以使用if语句来替换while来使用。其实如果用if来判断的话,可以适用于单线程的场景下,但是用到多线程的场景下就不适合了,如果有两个线程来充当消费者,当队列中没有数据之后就会发现,需要进行等待,此时可能会发生两个线程同时会进入到if判断中,如果第一个消费者消费到了数据,此时队列又变空了,并且生产者也没有生产数据。同时第二个线程也被唤醒了也去消费数据,但是队列中的数据变成了空的,再去取数据就会抛出NoSuchElementException异常。单如果换成while的好处就是,我被唤醒,但是我还是会再去走一遍循环,去判断队列中是否还有数据,从而就避免了上面抛异常的这种情况。说白了,while比if更加谨慎,就算你唤醒了我,我还是会再去检查一遍有没有问题,如果没有问题我再走下面的逻辑。

用 wait/notify 实现生产者消费者模式

如果你已经理解了上面Condition的方式去实现生产者消费者模式,那你应该就可以很容易发现,用wait/notify和的方式大同小异,他两实现的方式其实就是一个兄弟关系。其实这个也说的通Condition的发明就是为了替代wait/notify方式的。我们接下来看使用wait/notify去如何实现。不啰嗦,直接贴代码

public class WaitQueue {

    private LinkedList<String> queue;

    private int max;

    public WaitQueue(int size) {
        max = size;
        queue = new LinkedList<String>();
    }

    public synchronized void put(String msg) throws InterruptedException {
        //如果队列满了则阻塞生产
        while (queue.size() == max){
            System.out.println("生产者停止生产了");
            wait();
            System.out.println("生产者开始生产了");
        }
        queue.add(msg);
        //队列有数据了,唤醒消费
        notifyAll();
    }

    public synchronized String take() throws InterruptedException {
            //如果队列空了则阻塞消费
            while (queue.isEmpty()) {
                System.out.println("消费者停止消费了");
                wait();
                System.out.println("消费者开始消费了");
            }
            String msg = queue.remove();
            //队列消费有空位了,唤醒生产者生产
            notifyAll();
            return msg;
    }

    public int size(){
        return queue.size();
    }


    public static void main(String[] args) {
        ConditionQueue queue = new ConditionQueue(10);
        Thread producer = new Thread(()->{
            int i = 0;
            while (true){
                try {
                    //奶茶店每隔5秒生产一杯奶茶
                    TimeUnit.SECONDS.sleep(1);
                    queue.put(String.valueOf(++i));
                    System.out.println("生产者生产了第"+i+"杯奶茶,当前门店还有"+queue.size()+"杯奶茶待消费");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread consumer = new Thread(()->{
            while (true){
                try {
                    //消费者随机排队,随机1到9秒出现一个消费者
                    Random random = new Random();
                    int seconds = random.nextInt(10)+1;
                    TimeUnit.SECONDS.sleep(seconds);
                    System.out.println("过了"+seconds+"秒之后来了一位消费者");
                    String msg = queue.take();
                    System.out.println("消费者拿到了第"+msg+"杯奶茶");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        producer.start();
        consumer.start();
    }
}

这里的代码我就不多解释了,其实逻辑跟Condition去实现一个阻塞队列的逻辑一样,只是换成了用wait和notify的方式。

#结束语
如果有看不懂的地方或者有不对的地方,欢迎下方留言评论。一起进步~