当前位置:网站首页>多线程案例——阻塞式队列

多线程案例——阻塞式队列

2022-08-09 09:34:00 Living_Amethyst

本篇文章讲给大家带来有关 阻塞式队列 的有关知识
在这里插入图片描述

什么是阻塞式队列

阻塞队列是一种特殊的队列. 也遵守 "先进先出" 的原则.
阻塞队列是一种线程安全的数据结构, 并且具有以下特性:

  • 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素.
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素

阻塞队列的一个典型应用场景就是 “生产者消费者模型”. 这是一种非常典型的开发模型.

生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取

  1. 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
  • 比如在 “秒杀” 场景下, 服务器同一时刻可能会收到大量的支付请求. 如果直接处理这些支付请求,
  • 服务器可能扛不住(每个支付请求的处理都需要比较复杂的流程).
  • 这个时候就可以把这些请求都放到一个阻塞队列中, 然后再由消费者线程慢慢的来处理每个支付请求.
  • 这样做可以有效进行 “削峰”, 防止服务器被突然到来的一波请求直接冲垮

一张图帮助理解
在这里插入图片描述
而当我们应用了阻塞队列之后
在这里插入图片描述
这时即使由很大规模的用户同时访问A,那么压力也不会给到B上
多出来的压力就有阻塞队列承担了,只要把数据在队列中多存放一会就可以了

  1. 阻塞队列也能使生产者和消费者之间 解耦
  • 比如过年一家人一起包饺子. 一般都是有明确分工, 比如一个人负责擀饺子皮, 其他人负责包.
  • 擀饺子皮的人就是 “生产者”, 包饺子的人就是 “消费者”.
  • 擀饺子皮的人不关心包饺子的人是谁(能包就行, 无论是手工包, 借助工具, 还是机器包),
  • 包饺子的人也不关心擀饺子皮的人是谁(有饺子皮就行, 无论是用擀面杖擀的, 还是拿罐头瓶擀, 还是直接从超市买的)

我们仍然是用一张图来帮助我们理解
在这里插入图片描述
如果是A直接给B发送数据,那么就是 耦合性比较强
在开发A的代码时需要考虑B是如何接收的
在开发B的代码时需要考虑A是如何发送的
此时要是加入了C,就需要修改A
而且如果A挂了,B很可能也要出问题
B挂了,A也可能出问题

我们现在再加入阻塞队列
在这里插入图片描述

  • AB不再直接交互
  • 开发阶段: A只考虑自己和队列如何交互, B只考虑自己和队列如何交互. A B之间甚至不需要知道对方的存在
  • 部署阶段,A和B中有一个挂了,对另一个也不影响
  • 此时加入C,A也不需要做任何调整

从上面两点我们可以看出应用了阻塞队列的生产者消费者模型有很大的作用

标准库中的阻塞队列

在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可

  • BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
  • put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 入队列
queue.put("abc");
// 出队列. 如果没有 put 直接 take, 就会阻塞.
String elem = queue.take();

生产者消费者模型

public class Demo15 {
    
    public static void main(String[] args) {
    
        BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();

        Thread customer = new Thread(()->{
    
           while(true){
    
               try {
    
                   int val = queue.take();
                   System.out.println("消费元素:"+val);
               } catch (InterruptedException e) {
    
                   e.printStackTrace();
               }
           }
        });
        customer.start();

        Thread producer = new Thread(()->{
    
           int n = 0;
           while(true){
    
               try {
    
                   System.out.println("生产元素:"+n);
                   queue.put(n);
                   n++;
                   Thread.sleep(500);  //每生产一个元素就休眠一会
               } catch (InterruptedException e) {
    
                   e.printStackTrace();
               }
           }
        });
        producer.start();
    }
}

阻塞队列模拟实现

这是一个循环队列

我们用一个数组来实现

在这里插入图片描述
我们先看看普通的队列怎么写

// 模拟实现一个阻塞队列
// 基于数组的方式实现
// 提供两个核心方法:
// 1.put入队列 2.take出队列
class MyBlockQueue {
    
    // 假定最大1000个元素
    private int[] items = new int[1000];
    // 队首的位置
    private int head = 0;
    // 队尾的位置
    private int tail = 0;
    // 队列的元素个数
    private int size = 0;

    //入队列
    private void put(int value){
    
        synchronized (this){
    
            if(size == items.length){
    
                //队列已满 无法插入
                return;
            }
            items[tail] = value;
            tail++;
            if(tail == items.length){
    
                // 如果 tail 达到数组末尾,就需要从头开始
                tail = 0;
            }
            size++;
        }
    }

    //出队列
    public Integer take(){
    
        int ret = 0;
        synchronized (this){
    
            if(size == 0){
    
                //队列为空 无法出队列
                return null;
            }
            ret = items[head];
            head++;
            if(head == items.length){
    
                head=0;
            }
            size--;
        }
        return ret;
    }
}

我们要写的是阻塞队列

阻塞队列的两个特点:

  1. 线程安全(我们可以通过加锁方式实现)
  2. 阻塞(用wait):队列为空就阻塞不为空时唤醒,队列满时阻塞不满时唤醒
// 模拟实现一个阻塞队列
// 基于数组的方式实现
// 提供两个核心方法:
// 1.put入队列 2.take出队列
class MyBlockingQueue {
    
    // 假定最大1000个元素
    private int[] items = new int[1000];
    // 队首的位置
    private int head = 0;
    // 队尾的位置
    private int tail = 0;
    // 队列的元素个数
    volatile private int size = 0;

    //入队列
   public void put(int value) throws InterruptedException {
    
        synchronized (this){
    
            while(size == items.length){
    
                //队列已满 无法插入
                this.wait();
            }
            items[tail] = value;
            tail++;
            if(tail == items.length){
    
                // 如果 tail 达到数组末尾,就需要从头开始
                tail = 0;
            }
            size++;
            //即使没人在等待 多调用几次 notify 也没啥副作用
            this.notify();  //当队列不空的时候 就唤醒
        }
    }

    //出队列
    public Integer take() throws InterruptedException {
    
        int ret = 0;
        synchronized (this){
    
            while (size == 0){
    
                //队列为空 ,就等待
                this.wait();
            }
            ret = items[head];
            head++;
            if(head == items.length){
    
                head=0;
            }
            size--;
            this.notify();//当队列不满的时候,就唤醒
        }
        return ret;
    }
}

然后基于这个阻塞队列写一个生产者消费者模型

// 模拟实现一个阻塞队列
// 基于数组的方式实现
// 提供两个核心方法:
// 1.put入队列 2.take出队列
class MyBlockingQueue {
    
    // 假定最大1000个元素
    private int[] items = new int[1000];
    // 队首的位置
    private int head = 0;
    // 队尾的位置
    private int tail = 0;
    // 队列的元素个数
    volatile private int size = 0;

    //入队列
   public void put(int value) throws InterruptedException {
    
        synchronized (this){
    
            while(size == items.length){
    
                //队列已满 无法插入
                this.wait();
            }
            items[tail] = value;
            tail++;
            if(tail == items.length){
    
                // 如果 tail 达到数组末尾,就需要从头开始
                tail = 0;
            }
            size++;
            //即使没人在等待 多调用几次 notify 也没啥副作用
            this.notify();  //当队列不空的时候 就唤醒
        }
    }

    //出队列
    public Integer take() throws InterruptedException {
    
        int ret = 0;
        synchronized (this){
    
            while (size == 0){
    
                //队列为空 ,就等待
                this.wait();
            }
            ret = items[head];
            head++;
            if(head == items.length){
    
                head=0;
            }
            size--;
            this.notify();//当队列不满的时候,就唤醒
        }
        return ret;
    }
}
public class Demo16 {
    
    public static void main(String[] args) throws InterruptedException {
    
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread customer = new Thread(()->{
    
            while(true){
    
                try {
    
                    int value = queue.take();
                    System.out.println("消费:"+value);
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
            }
        }) ;
        customer.start();

        Thread producer = new Thread(()->{
    
           int value = 0;
            while(true){
    
                try {
    
                    queue.put(value);
                    System.out.println("生产:"+value);
                    value++;
                    Thread.sleep(500);
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
           }
        });
        producer.start();
    }
}
原网站

版权声明
本文为[Living_Amethyst]所创,转载请带上原文链接,感谢
https://blog.csdn.net/living_amethyst/article/details/126073180