【个人技术经验及开发技巧分享】 【个人技术经验及开发技巧分享】
首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载

Louis

首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载
  • 并发编程

    • 可重入独占锁ReentrantLock
    • 共享锁Semaphore
    • 阻塞队列ArrayBlockingQueue
    • 阻塞队列LinkedBlockingQueue
      • 前言
      • 1 添加元素
      • 2 取出元素
    • HashMap详解
    • ThreadPoolExecutor线程池
  • 框架

  • 源码阅读
  • 并发编程
luoxiaofeng
2022-07-28
目录

阻塞队列LinkedBlockingQueue

# 前言

LinkedBlockingQueue内部基于链表实现,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以LinkedBlockingQueue也被称作无界队列,代表几乎没有界限。LinkedBlockingQueue也支持指定长度。

LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。

重要方法

public void put(E e) throws InterruptedException

put(E e) 表示添加元素

public E take() throws InterruptedException

take() 表示取出元素

构造方法

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
1
2
3
4
5

重要属性

private final int capacity; // 容量,指定容量就是有界队列

private final AtomicInteger count = new AtomicInteger(); // 元素数量

transient Node<E> head; // 链表头

private transient Node<E> last; // 链表尾

private final ReentrantLock takeLock = new ReentrantLock(); // 取出元素的锁

private final Condition notFull = putLock.newCondition(); // 添加元素的锁

// 消费者(内置条件队列)
private final Condition notEmpty = takeLock.newCondition();

// 生产者(内置条件队列)
private final ReentrantLock putLock = new ReentrantLock(); 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

单链表结构

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}
1
2
3
4
5

# 1 添加元素

public void put(E e) throws InterruptedException {
    //不允许添加空元素
    if (e == null) throw new NullPointerException();
    int c = -1;
    //新建节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //用putLock加锁
    putLock.lockInterruptibly();
    try {
        //阻塞队列元素个数等于容量时,生产者挂起
        while (count.get() == capacity) {
            notFull.await();
        }
        //阻塞队列元素个数不等于容量时,入队
        enqueue(node);
        //总数加1,返回旧值
        c = count.getAndIncrement();
        //总数小于容量时,唤醒生产者。相当于提前唤醒,不需要等到取元素的时候才唤醒
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //释放生产的锁
        putLock.unlock();
    }
    //当原队列元素个数为0(当前个数为1),立即唤醒消费者
    if (c == 0)
        signalNotEmpty();
}

private void enqueue(Node<E> node) {
    //当前节点插入到队列尾部,并且队列尾部指针指向当前节点
    last = last.next = node;
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    //用takeLock加锁
    takeLock.lock();
    try {
        //唤醒消费者
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 2 取出元素

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //用takeLock加锁
    takeLock.lockInterruptibly();
    try {
        //阻塞队列元素个数为0时,消费者挂起
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        //总数减1,返回旧值
        c = count.getAndDecrement();
        //阻塞队列有元素时,唤醒消费者
        if (c > 1)
            notEmpty.signal();
    } finally {
        //释放消费的锁
        takeLock.unlock();
    }
    //队列元素快满了的时候(总数-1),立即唤醒生产者
    if (c == capacity)
        signalNotFull();
    return x;
}

private E dequeue() {
    Node<E> h = head;
    //first = 头结点的后继节点
    Node<E> first = h.next;
    //准备回收原头节点
    h.next = h; // help GC
    //头结点指针指向后继节点
    head = first;
    //返回的元素从原头结点的后继节点中获取
    E x = first.item;
    //后继节点的元素被取出后清空(此时first已经成为头结点)
    first.item = null;
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    //putLock加锁
    putLock.lock();
    try {
        //唤醒生产者
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
阻塞队列ArrayBlockingQueue
HashMap详解

← 阻塞队列ArrayBlockingQueue HashMap详解→

最近更新
01
SpringBoot
10-21
02
Spring
10-20
03
Sentinel
10-14
更多文章>
Copyright © 2022-2023 Louis | 粤ICP备2022060093号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式