【个人技术经验及开发技巧分享】 【个人技术经验及开发技巧分享】
首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载

Louis

首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载
  • 并发编程

    • 可重入独占锁ReentrantLock
    • 共享锁Semaphore
      • 前言
      • 1 获取许可(非公平)
      • 2 释放许可
      • 3 获取许可(公平)
    • 阻塞队列ArrayBlockingQueue
    • 阻塞队列LinkedBlockingQueue
    • HashMap详解
    • ThreadPoolExecutor线程池
  • 框架

  • 源码阅读
  • 并发编程
luoxiaofeng
2022-07-26
目录

共享锁Semaphore

# 前言

Semaphore(信号量)主要作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,实现逻辑与ReentrantLock很相似,有很多共用的方法,里面也使用到相同的CLH队列。

重要方法

public void acquire() throws InterruptedException

acquire() 表示阻塞并获取许可

public void release()

release() 表示释放许可

# 1 获取许可(非公平)

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    //线程已被中断,则抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果获取不到许可(state < 0),则去CLH队列排队
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12

提示

Semaphore也支持公平和非公平两种方式,tryAcquireShared 下有两种实现。

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    //自旋直到返回结果
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        //返回state小于0
        if (remaining < 0 ||
            //或者state更新成功(大于等于0)
            compareAndSetState(available, remaining))
            return remaining;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    //新增一个共享类型的节点到队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //如果当前节点的前驱节点时头结点
            if (p == head) {
                //去尝试获取许可
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //获取许可成功则把当前节点设置成头结点,并且尝试唤醒队列中下一节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //1.前驱节点状态为可唤醒,则当前节点所属线程可以park住
            //2.park住线程;清除线程的中断标识,并返回清除之前的状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //该判断代码块与ReentrantLock的实现一致,
                // 不同的是ReentrantLock只更新中断标识变量,Semaphore直接抛出中断异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            //与ReentrantLock实现一致
            cancelAcquire(node);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

注意

以下逻辑较难理解,目前未理解透彻。

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        //当前节点设置成头结点
        setHead(node);
        /*
         * 尝试去唤醒队列中的下一个节点,如果满足如下条件:
         * 调用者明确表示"传递"(propagate > 0),
         * 或者h.waitStatus为PROPAGATE(被上一个操作设置)
         * 并且
         *   下一个节点处于共享模式或者为null。
         *
         * 这两项检查中的保守主义可能会导致不必要的唤醒,但只有在有
         * 有在多个线程争取获得/释放同步状态时才会发生,所以大多
         * 数情况下会立马获得需要的信号
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void doReleaseShared() {
    /*
     * 保证释放动作(向同步等待队列尾部)传递,即使没有其他正在进行的
     * 请求或释放动作。如果头节点的后继节点需要唤醒,那么执行唤醒
     * 动作;如果不需要,将头结点的等待状态设置为PROPAGATE保证
     * 唤醒传递。另外,为了防止过程中有新节点进入(队列),这里必
     * 需做循环,所以,和其他unparkSuccessor方法使用方式不一样
     * 的是,如果(头结点)等待状态设置失败,重新检测。
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                //唤醒状态更新回0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                //唤醒头结点的后继节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        // 如果头结点发生变化,则继续循环。否则,退出循环。
        if (h == head)
            break;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 2 释放许可

public void release() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    //更新state
    if (tryReleaseShared(arg)) {
        //唤醒队列中的节点
        doReleaseShared();
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        //给state加上释放的许可
        int next = current + releases;
        //超出Integer.MAX_VALUE,即2^31,抛异常
        if (next < current) 
            throw new Error("Maximum permit count exceeded");
        //cas方式更新state
        if (compareAndSetState(current, next))
            return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 3 获取许可(公平)

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
1
2
3
4
5
6
7
8
9
10
11

公平锁与非公平锁的实现基本一致,只是在更新 state 之前会先去检查下队列中是否其他节点在排队,有的话则直接去排队。

if (hasQueuedPredecessors())
    return -1;
1
2
可重入独占锁ReentrantLock
阻塞队列ArrayBlockingQueue

← 可重入独占锁ReentrantLock 阻塞队列ArrayBlockingQueue→

最近更新
01
SpringBoot
10-21
02
Spring
10-20
03
Sentinel
10-14
更多文章>
Copyright © 2022-2023 Louis | 粤ICP备2022060093号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式