【个人技术经验及开发技巧分享】 【个人技术经验及开发技巧分享】
首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载

Louis

首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载
  • 并发编程

    • 可重入独占锁ReentrantLock
    • 共享锁Semaphore
    • 阻塞队列ArrayBlockingQueue
      • 前言
      • 1 添加元素
      • 2 取出元素
    • 阻塞队列LinkedBlockingQueue
    • HashMap详解
    • ThreadPoolExecutor线程池
  • 框架

  • 源码阅读
  • 并发编程
luoxiaofeng
2022-07-27
目录

阻塞队列ArrayBlockingQueue

# 前言

ArrayBlockingQueue内部用数组存储元素(初始化时需要指定容量大小),是最典型的有界阻塞队列,利用 ReentrantLock 实现线程安全。

在生产者-消费者模型中使用时,如果生产速度远远大于消费速度,容易导致队列填满,大量生产线程被阻塞。

使用独占锁 ReentrantLock 实现线程安全,入队和出队操作使用同一个锁对象,这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。

重要方法

public void put(E e) throws InterruptedException

put(E e) 表示添加元素

public E take() throws InterruptedException

take() 表示取出元素

构造方法

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
1
2
3
4
5
6
7
8

重要属性

final Object[] items; // 元素数组

int takeIndex; // 下一个待取出的数组下标

int putIndex; // 下一个待添加的数组下标

int count; // 元素个数

final ReentrantLock lock; // 队列用到的锁

private final Condition notEmpty; // 消费者(内置条件队列)

private final Condition notFull; // 生产者(内置条件队列)
1
2
3
4
5
6
7
8
9
10
11
12
13

# 1 添加元素

public void put(E e) throws InterruptedException {
    //不允许添加空元素
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //阻塞队列已经满了,则生产者挂起
        while (count == items.length)
            //await()方法的实现使用了条件队列
            notFull.await();
        //阻塞队列未满,则入队
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    //元素放进数组中,下标为putIndex
    items[putIndex] = x;
    //元素下标加1,若达到数组长度(下标最大为:数组长度-1),则重置为0
    //此次用到了环形数组的设计
    if (++putIndex == items.length)
        putIndex = 0;
    //总数加1
    count++;
    //唤醒消费者(入队了元素,数组不为空)
    notEmpty.signal();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

环形数组

image-20220727220956221

# 2 取出元素

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //阻塞队列空了,消费者挂起
        while (count == 0)
            notEmpty.await();
        //阻塞队列不为空,则出队
        return dequeue();
    } finally {
        lock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    //获取takeIndex下标的数组元素,后续返回
    E x = (E) items[takeIndex];
    //将takeIndex下标的元素置空
    items[takeIndex] = null;
    //元素下标加1,若达到数组长度(下标最大为:数组长度-1),则重置为0
    //此次用到了环形数组的设计
    if (++takeIndex == items.length)
        takeIndex = 0;
    //总数减1
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //唤醒生产者(此时阻塞队列中有空位)
    notFull.signal();
    return x;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
共享锁Semaphore
阻塞队列LinkedBlockingQueue

← 共享锁Semaphore 阻塞队列LinkedBlockingQueue→

最近更新
01
SpringBoot
10-21
02
Spring
10-20
03
Sentinel
10-14
更多文章>
Copyright © 2022-2023 Louis | 粤ICP备2022060093号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式