GENGEN
主页
vuepress
  • GIT命令
  • python+django
  • vue cli搭建项目
  • babel es6转换es5
  • docker aliyun配置
  • npm 配置
  • linux 常用命令
  • Ubuntu 下Linux 命令
  • github
  • gitee
  • csdn
  • 关于我
主页
vuepress
  • GIT命令
  • python+django
  • vue cli搭建项目
  • babel es6转换es5
  • docker aliyun配置
  • npm 配置
  • linux 常用命令
  • Ubuntu 下Linux 命令
  • github
  • gitee
  • csdn
  • 关于我
  • java基础

    • JDK8 函数式编程
    • JDK8 新特性之Date-Time
    • Servlet 源码分析
    • ArrayList 源码
    • LinkedList 源码
    • HashMap 源码
    • String 源码
    • BigDecimal 源码
    • java 类的加载
    • Class 源码
    • Synchronized锁升级
    • 事务的传播机制
    • knowledge
  • JAVA WEB

    • Java Servlet
    • 权限设计
    • logback日志的链路追踪
  • DATABASE

    • MySQL EXPLAIN详解
    • MySQL 索引
    • MySQL 表锁、行锁
    • MySQL ACID与transcation
    • 分布式事务
    • MySQL MVCC机制
    • Mysql 乐观锁与悲观锁
    • 分布式锁1 数据库分布式锁
    • 分布式锁2 Redis分布式锁
    • 分布式锁3 ZK分布式锁
  • SpringCloud

    • SpringCloud服务注册中心之Eureka
    • SpringCloud服务注册中心之Zookeeper
    • SpringCloud服务调用之Ribbon
    • SpringCloud服务调用之OpenFeign
    • SpringCloud服务降级之Hystrix
    • SpringCloud服务网关之Gateway
    • SpringCloud Config分布式配置中心
    • SpringCloud服务总线之Bus
    • SpringCloud消息驱动之Stream
    • SpringCloud链路追踪之Sleuth
    • SpringCloud Alibaba Nacos
    • SpringCloud Alibaba Sentinel
  • Spring

    • SpringBoot
    • Spring-data-jpa入门
    • SpringCloud问题
    • dispatcherServlet 源码分析
    • @SpringBootApplication注解内部实现与原理
    • spring启动初始化初始化
  • 中间件

    • 分布式协调服务器Zookeeper
    • 服务治理Dubbo
    • 分布式配置管理平台Apollo
    • 消息中间件框架Kafka
    • 分布式调度平台ElasticJob
    • 可视化分析工具Kibana
    • ElacticSearch 基础
    • ElacticSearch进阶
    • ElacticSearch集成
  • 环境部署

    • 应用容器引擎Docker
    • DockerCompose服务编排
    • 负载均衡Nginx
    • Nginx的安装配置
    • K8S基础
  • 代码片段

    • listener 监听模式
    • spingboot 整合redis
    • XSS过滤
    • profile的使用
    • ConfigurationProperties注解
  • 设计模式

    • 工厂模式
    • 单例模式
    • 装饰者模式
    • 适配器模式
    • 模板方法模式
    • 观察者模式
  • 读书笔记

    • 《Spring in Action 4》 读书笔记
    • 《高性能mysql》 读书笔记
  • NoSQL

    • Redis基础
    • Redis高级
    • Redis集群
    • Redis应用
  • MQ

    • rabbitMQ基础
    • rabbitMQ高级
    • rabbitMQ集群
  • JVM

    • JVM体系架构概述
    • 堆参数调整
    • GC 分代收集算法
    • JVM 垃圾回收器
    • JVM 相关问题
  • JUC

    • JUC总览
    • volatile关键字
    • CAS
    • ABA问题
    • collections包下线程安全的集合类
    • Lock 锁
    • LockSupport
    • AQS
    • Fork/Join分支框架
    • JUC tools
    • BlockingQueue 阻塞队列
    • Executor 线程池
    • CompletableFuture
    • 死锁以及问题定位分析
  • Shell

    • shell命令
    • shell基础
  • Activiti

    • IDEA下的Activiti HelloWord
    • 流程定义的CRUD
    • 流程实例的执行
    • 流程变量
  • VUE

    • vue基础
    • vue router
    • Vuex
    • Axios 跨域
    • dialog 弹出框使用
    • vue 动态刷新页面
    • vue 封装分页组件
    • vue 动态菜单
    • vue 常用传值
  • Solidity 智能合约

    • Solidity 基础
    • Solidity ERC-20
    • Solidity 101
  • English

    • 时态

BlockingQueue 阻塞队列

  • 当队列是空的,从队列里获取元素操作会被阻塞。如果队列是满的,往队列里面添加元素会被阻塞。

BlockingQueue接口7个实现类

  • **ArrayBlockingQueue** 数组结构的有界阻塞队列
  • **LinkedBlockingQueue** 链表结构的有界(默认Integer.MAX_VALUE)阻塞队列(太大,接近无界)
  • PriorityBlockingQueue 支持优先级排序的无界阻塞队列
  • DelayQueue 优先级队列实现的延迟无界阻塞队列
  • **SynchronousQueue** 不存储元素的阻塞队列,即单个元素队列
  • LinkedTransferQueue 链表结构组成的无界阻塞队列
  • LinkedBlockingDeque 链表结构组成的双向阻塞队列

BlockingQueue核心方法

抛出异常

  • 插入 add(e)
  • 移除 remove()
  • 检查 element()

Tips

  • 当阻塞队列满时,再add会抛出IllegalStateException
  • 当阻塞队列空时,再remove会抛出NoSuchElementException

特殊值

  • 插入 offer(e)
  • 移除 poll()
  • 检查 peek()

Tips

  • 当插入时,返回boolean值判断是否插入成功
  • 当移除时,如果成功,返回移除的值,如果没值返回null

阻塞

  • 插入 put(e)
  • 移除 take()
  • 检查 不可用

Tips

  • 如果队列满了,新put数据线程会一直等待(阻塞)直到put成功或者响应中退出
  • 当队列为空时,消费者试图从队列中take数据线程一直等待直到队列可用

超时

  • 插入 offer(e,time,unit)
  • 移除 poll(time,unit)
  • 检查 不可用

Tips

  • 当队列满或者空时,offer、poll数据会阻塞直到拿到数据或者超过时限退出

SynchronousQueue 单元素队列演示

public class BlockingQueue {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue queue = new SynchronousQueue();
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName() + ": put 1");
                queue.put("a");
                System.out.println(Thread.currentThread().getName() + ": put 2");
                queue.put("b");
                System.out.println(Thread.currentThread().getName() + ": put 3");
                queue.put("c");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"AA").start();

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println("take:" + queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println("take:" +queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println("take:" +queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"BB").start();
    }
}

阻塞队列的使用场景

  • 生产者消费者模式
  • 线程池
  • 消息中间件

阻塞队列版 生产者消费者模式

class Source {
    private volatile boolean flag = true; //默认开启消费标记
    private AtomicInteger atomicInteger = new AtomicInteger();

    BlockingQueue<String> queue = null;

    public Source(BlockingQueue<String> blockingQueue) {
        this.queue = blockingQueue;
        System.out.println(queue.getClass().getName());
    }


    //生产
    public void producer() throws Exception {
        String data = null;
        boolean retVal;
        while (flag) {
            data = atomicInteger.incrementAndGet() + "";
            retVal = queue.offer(data, 2L, TimeUnit.SECONDS);
            //插入成功
            if (retVal) {
                System.out.println(Thread.currentThread().getName() + "\t插入数据" + data + "成功");
            } else {
                System.out.println(Thread.currentThread().getName() + "\t插入数据" + data + "失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println("生产停止!");
    }

    //消费
    public void consumer() throws Exception {
        String result;
        while (flag) {
            result = queue.poll(2L, TimeUnit.SECONDS);
            if (null == result || result.equalsIgnoreCase("")) {
                System.out.println("队列超过2S无数据!消费退出!");
                flag = false;
                return;
            }
            System.out.println(Thread.currentThread().getName() + "\t消费队列-->" + result + "成功");
        }
    }
}

public class ProducerConsumer {

    public static void main(String[] args) {
        Source source = new Source(new ArrayBlockingQueue<>(2));
        new Thread(() -> {
            try {
                source.producer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "pord").start();


        new Thread(() -> {
            try {
                source.consumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "cons").start();
    }
}

Tips

  • 不用阻塞队列,可以用lock condition实现
Last Updated:
Contributors: wal365@126.com
Prev
JUC tools
Next
Executor 线程池