AQS原理及多线程资源获取优化探讨
AQS原理及多线程资源获取优化探讨
在Java并发编程中,AQS(AbstractQueuedSynchronizer)是一个非常重要的基础框架,它提供了构建锁和其他同步器的基础。理解AQS的原理对于掌握Java并发编程至关重要。本文将从多个角度深入探讨AQS的原理,包括其定义、核心内容、与Lock锁的继承关系、公平锁与非公平锁的直观体现、acquire、tryAcquire、addWaiter等方法的底层逻辑,以及AQS排队后如何重新尝试获取资源。同时,作为一个大数据工程师,我们将通过Java代码演示这些原理。
2.1 AQS的定义
AQS,全称AbstractQueuedSynchronizer,是Java并发包(java.locks)下的一个抽象类。它定义了一套多线程访问共享资源的同步器框架,许多我们使用的同步器都是基于它来实现的,如常用的ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类。
2.2 AQS的核心内容
2.2.1 核心成员变量
AQS的核心成员变量包括:
private transient volatile ode head;
:CHL队列的头部节点,延迟初始化。除了初始化,它只通过setHead()
方法进行修改。如果head
节点存在,head
节点的waitStatus
保证不会被CACELLED
。private transient volatile ode tail;
:CHL队列的尾部节点,延迟初始化。仅通过enq()
方法新增等待的节点。private volatile int state;
:表示共享资源的获取情况。为0时代表着没有线程获取过此资源,而等它大于0时,则表示有线程正在获取着资源。由于AQS支持可重入机制,state
为0表示没有线程拿到锁,而当state
为n时(n >= 1),表示线程拿到锁,n为重入次数。private transient Thread exclusiveOwnerThread;
:表示当前占据锁的线程。
2.2.2 核心方法
AQS的核心方法包括:
tryAcquire(int arg)
:独占方式。尝试获取资源,成功则返回true,失败则返回false。tryRelease(int arg)
:独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int arg)
:共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。tryReleaseShared(int arg)
:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。isHeldExclusively()
:该线程是否正在独占资源。只有用到condition才需要去实现它。
AQS需要子类复写的方法均没有声明为abstract,目的是避免子类需要强制性覆写多个方法。因为一般自定义同步器要么是独占要么是共享方式,只需实现tryAcquire-tryRelease
或tryAcquireShared-tryReleaseShared
中的一种组合即可。当然,AQS也支持子类同时实现独占和共享两种模式,如ReentrantReadWriteLock
。
.1 Lock接口与AQS的关系
Lock接口的最主要实现类ReentrantLock中所有的方法实际上都是调用了其静态内部类Sync中的方法。Sync继承了AbstractQueuedSynchronizer(AQS),也就是说,Lock锁的整个体系是基于AQS同步器实现的。
.2 ReentrantLock与AQS的具体实现
ReentrantLock并非是直接继承AQS并实现里面的方法的,而是由里面的Sync类来继承AQS,并且里面还划分了onfairSync和FairSync两个Sync子静态内部类来分别对公平锁和非公平锁做逻辑实现。
4.1 定义与特点
- 公平锁:多个线程按照申请锁的顺序来获取锁,即按照线程的先后顺序来排队获取锁。当一个线程释放锁后,等待时间最长的线程会获得锁的访问权。公平锁能够保证每个线程都有机会获取到锁,避免饥饿现象的发生,但整体效率相对比较低。
- 非公平锁:多个线程获取锁的顺序是不确定的,不按照申请锁的顺序来排队。一个线程在等待锁时,不管自己是不是在等待队列的头部,都有机会在其他线程释放锁后立即获取锁。非公平锁相对公平锁增加了获取资源的不确定性,但是整体效率得以提升,但可能会产生饥饿现象。
4.2 在ReentrantLock中的实现
在Java的ReentrantLock中,可以通过构造函数传入一个boolean值fair
来设置锁是否为公平锁。默认是非公平锁,这是因为非公平锁的优点在于吞吐量比公平锁大。
- 非公平锁:在获取锁时,直接尝试CAS将当前的
state
从0置为1,如果成功则获取锁;如果失败,则执行acquire
方法加入等待队列。 - 公平锁:在获取锁时,不直接尝试CAS,而是直接执行
acquire
方法加入等待队列。
5.1 acquire方法
acquire
方法是AQS中用于获取独占锁的核心方法。其源码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(ode.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire(int arg):由子类实现,尝试获取独占锁。如果成功,则返回true,方法直接返回;如果失败,则返回false,执行后续逻辑。
- addWaiter(ode.EXCLUSIVE):将当前线程包装成一个独占式的节点,加入同步队列的队尾,并返回当前线程所在的节点。
- acquireQueued(addWaiter(ode.EXCLUSIVE), arg):如果当前节点是等待节点的第一个(即
),就尝试获取资源。如果该方法返回true,则会进入
selfInterrupt()
的逻辑,进行阻塞。 - selfInterrupt():响应中断的逻辑,与主线关系不大,这里不详细分析。
5.2 tryAcquire方法
tryAcquire
方法是AQS提供给子类实现的钩子方法,子类可以自定义实现独占式获取资源的方式。以ReentrantLock的非公平锁实现为例:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = ();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- 如果
state
为0(表示锁未被占用),则尝试通过CAS将state
设置为请求的数量acquires
,并设置当前线程为锁的持有者,返回true表示获取锁成功。 - 如果当前线程已经是锁的持有者(即重入锁的情况),则将
state
增加请求的数量acquires
,表示锁的重入次数增加,返回true表示获取锁成功。 - 如果上述两种情况都不满足,则返回false表示获取锁失败。
5. addWaiter方法
addWaiter
方法将当前线程包装成一个节点,加入同步队列的队尾。其源码如下:
private ode addWaiter(ode mode) {
ode node = new ode((), mode);
ode pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
= node;
return node;
}
}
enq(node);
return node;
}
- 创建一个新的节点
node
,将当前线程和模式(独占或共享)封装到节点中。 - 如果队列不为空(即
tail
不为null),则尝试通过CAS将新节点设置为队尾,并更新原队尾的next
指针指向新节点。 - 如果CAS失败(可能是因为队列为空或并发修改导致),则调用
enq(node)
方法不断尝试,直到设置成功。
enq(node)
方法的源码如下:
private ode enq(final ode node) {
for (;;) {
ode t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new ode()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
= node;
return t;
}
}
}
}
- 自旋循环,直到将新节点成功加入队尾为止。
- 如果队列为空(即
tail
为null),则初始化头节点head
为一个虚拟节点(不代表任何线程),并将tail
指向head
。 - 如果队列不为空,则尝试通过CAS将新节点设置为队尾,并更新原队尾的
next
指针指向新节点。
6.1 acquireQueued方法
acquireQueued
方法用于判断当前节点是否需要阻塞,并在一定条件下将其阻塞。其源码如下:
final boolean acquireQueued(final ode node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final ode p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
= null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
- 获取当前节点的前驱节点
p
。 - 如果前驱节点
p
是头节点head
,并且尝试获取锁成功(即tryAcquire(arg)
返回true),则将当前节点设置为头节点,并返回中断状态interrupted
。 - 如果不满足上述条件,则调用
shouldParkAfterFailedAcquire(p, node)
判断是否需要阻塞当前节点。如果需要阻塞,则调用parkAndCheckInterrupt()
将当前线程阻塞,并返回中断状态interrupted
。 - 如果在阻塞过程中被中断,或者在执行过程中抛出异常,则取消当前节点的获取锁操作,并处理中断状态
interrupted
。
6.2 shouldParkAfterFailedAcquire方法
shouldParkAfterFailedAcquire
方法用于判断当前节点是否需要阻塞。其源码如下:
private static boolean shouldParkAfterFailedAcquire(ode pred, ode node) {
int ws = pred.waitStatus;
if (ws == ode.SIGAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
= node;
} else {
compareAndSetWaitStatus(pred, ws, ode.SIGAL);
}
return false;
}
- 获取前驱节点
pred
的等待状态ws
。 - 如果
ws
为SIGAL
(表示后继节点需要被唤醒),则返回true,表示当前节点需要阻塞。 - 如果
ws
大于0(表示前驱节点已被取消),则不断向前遍历,直到到一个非取消状态的前驱节点,并更新当前节点的前驱指针prev
。然后返回false,表示当前节点不需要阻塞(因为前面还有有效节点在等待)。 - 如果
ws
为0或其他负值(表示正常等待状态),则通过CAS将前驱节点的等待状态设置为SIGAL
,并返回false,表示当前节点不需要阻塞(因为前驱节点已经设置为需要唤醒后继节点的状态)。
6. parkAndCheckInterrupt方法
parkAndCheckInterrupt
方法用于将当前线程阻塞,并检查在阻塞过程中是否被中断。其源码如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- 调用
LockSupport.park(this)
将当前线程阻塞。 - 返回
Thread.interrupted()
的结果,表示在阻塞过程中是否被中断。如果返回true,则表示被中断;如果返回false,则表示未被中断。
7.1 业务场景
假设我们有一个银行柜台服务,只有一个窗口可以办理业务。有多个客户(线程)需要办理业务,如果窗口被占用,则其他客户需要排队等待。这个场景非常适合使用ReentrantLock和AQS来实现。
7.2 示例代码
下面是一个基于ReentrantLock和AQS的简单示例代码,演示了如何实现上述业务场景。
代码语言:javascript代码运行次数:0运行复制import java.locks.Lock;
import java.locks.ReentrantLock;
public class BankCounter {
private final Lock lock = new ReentrantLock();
public void serveCustomer(String customerame) {
lock.lock();
try {
println(customerame + " 正在办理业务");
// 模拟业务办理时间
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
().interrupt();
println(customerame + " 被中断");
}
println(customerame + " 办理业务完毕");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
BankCounter bankCounter = new BankCounter();
Runnable task = () -> {
String customerame = ().getame();
bankCounter.serveCustomer(customerame);
};
Thread customer1 = new Thread(task, "客户1");
Thread customer2 = new Thread(task, "客户2");
Thread customer = new Thread(task, "客户");
customer1.start();
customer2.start();
customer.start();
}
}
7. 代码解析
- BankCounter类:
- 定义了一个
ReentrantLock
对象lock
,用于控制对银行柜台的访问。 serveCustomer
方法模拟了为客户办理业务的逻辑。在办理业务之前,先通过lock.lock()
获取锁;在办理业务之后,通过lock.unlock()
释放锁。如果在办理业务过程中被中断,则打印中断信息,并重新抛出中断异常(通过().interrupt()
设置中断状态)。
- 定义了一个
- main方法:
- 创建了一个
BankCounter
对象bankCounter
。 - 定义了三个线程(代表三个客户),每个线程都执行相同的任务:调用
bankCounter.serveCustomer
方法为客户办理业务。 - 启动三个线程,模拟多个客户同时办理业务的场景。
- 创建了一个
7.4 运行结果
当运行上述代码时,输出结果可能如下(具体顺序可能因线程调度而异):
代码语言:javascript代码运行次数:0运行复制客户1 正在办理业务
客户1 办理业务完毕
客户2 正在办理业务
客户2 办理业务完毕
客户 正在办理业务
客户 办理业务完毕
从输出结果可以看出,三个客户依次办理了业务,没有并发冲突。这是因为ReentrantLock
通过AQS实现了线程的同步控制,确保了同一时间只有一个客户能够办理业务。
本文深入探讨了AQS的原理及其在Java并发编程中的应用。首先解释了AQS的定义和核心内容,包括核心成员变量和方法。然后分析了Lock锁和AQS的继承关系,以及公平锁和非公平锁的直观体现。接着详细解析了AQS的acquire
、tryAcquire
、addWaiter
等方法的底层逻辑,以及AQS排队后如何重新尝试获取资源。最后通过一个具体的业务场景和示例代码演示了AQS在实际应用中的使用。
作为一个大数据工程师,理解AQS的原理对于处理并发数据访问、实现高效的同步机制至关重要。希望本文能够帮助读者更好地掌握AQS的原理和应用。
#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格
推荐阅读
留言与评论(共有 14 条评论) |
本站网友 墨脱隧道 | 22分钟前 发表 |
或者在执行过程中抛出异常 | |
本站网友 组装机器人 | 1分钟前 发表 |
如果当前线程已经是锁的持有者(即重入锁的情况) | |
本站网友 内分泌失调吃什么 | 28分钟前 发表 |
tryAcquireShared(int arg):共享方式 | |
本站网友 呼和浩特房产网 | 15分钟前 发表 |
方法直接返回;如果失败 | |
本站网友 注射胶原蛋白隆鼻多少钱 | 12分钟前 发表 |
如果ws为0或其他负值(表示正常等待状态) | |
本站网友 长江实业集团有限公司 | 17分钟前 发表 |
state为0表示没有线程拿到锁 | |
本站网友 怎样去除皱纹 | 14分钟前 发表 |
三 | |
本站网友 数学帝 | 17分钟前 发表 |
当一个线程释放锁后 | |
本站网友 迈克菲怎么样 | 27分钟前 发表 |
否则返回false | |
本站网友 硅谷传奇迅雷下载 | 20分钟前 发表 |
如果成功 | |
本站网友 上海保利茉莉公馆 | 30分钟前 发表 |
四 | |
本站网友 上海着火 | 19分钟前 发表 |
AQS排队后如何重新尝试获取资源6.1 acquireQueued方法acquireQueued方法用于判断当前节点是否需要阻塞 | |
本站网友 starvc | 17分钟前 发表 |
其源码如下:代码语言:javascript代码运行次数:0运行复制private static boolean shouldParkAfterFailedAcquire(ode pred |