当前位置:网站首页>JUC concurrent programming 06 -- in-depth analysis of AQS source code of queue synchronizer
JUC concurrent programming 06 -- in-depth analysis of AQS source code of queue synchronizer
2022-04-23 10:04:00 【Half old 518】
Let's take a look at Reentrantock Source code .
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
original lock,unlock And other core methods are through sync To achieve . and sync It's actually an internal class .
abstract static class Sync extends AbstractQueuedSynchronizer {
...}
This inner class inherits AbstractQueuedSynchronizer, That's the queue synchronizer we're going to focus on today AQS. It is actually the basis of our locking mechanism , It encapsulates the acquisition including the lock 、 Release and wait queue .
The key of thread scheduling is waiting queue , Its data structure is a two-way linked list , Refer to the figure below .
Let's look at each of the following Node What's in it , It opens at AQS Source code , It defines the inner class Node.
static final class Node {
// Each node is divided into exclusive mode and shared mode 、 Applicable to exclusive lock and shared lock respectively
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// Define the waiting state
// CANCELLED: The only value is greater than 0 The state of , This indicates that this node has been canceled
static final int CANCELLED = 1;
// Nodes behind this node are suspended , Enter the waiting state
static final int SIGNAL = -1;
// Status in the condition queue
static final int CONDITION = -2;
// spread , Commonly used for shared locks
static final int PROPAGATE = -3;
volatile int waitStatus; // Wait status value
volatile Node prev; // Basic operation of two-way linked list
volatile Node next;
volatile Thread thread; // Each thread can be encapsulated into a node and enter the waiting queue
Node nextWaiter; // The mode is represented in the waiting queue , Indicates the next node in the condition queue
// Determine whether it is a shared node
final boolean isShared() {
return nextWaiter == SHARED;
}
// Return to the precursor node
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
// Initialize and establish node or share tags (Used to establish initial head or SHARED marker)
}
Node(Thread thread, Node mode) {
// Waiting queue use
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
// Conditionally opposed use
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Jump out again Node see AQS, It defines three properties ,head,tail The default is null,state The default is 0, also AQS The constructor of does not assign values to them .
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state; // Current lock state
actually , The initialization of bidirectional linked list is completed in actual use , This will be demonstrated later . Take a look at one of the set state operations .
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
It was through unsafe Of compareAndSwapInt() Realized . This is CAS Algorithm . Let's see. unsafe, It is also an internal property .
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
// Find the memory address where each attribute is locked ( be relative to unsafe The offset address of the class )
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) {
throw new Error(ex); }
}
/** * CAS Operation header node */
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/** * CAS Operation tail node */
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/** * CAS operation WaitStatus attribute */
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
/** * CAS operation next attribute */
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
Actually ,Unsafe What is called inside is native Method , Readers can click in and have a look . It will directly find the memory address of the attribute , Operate on data in memory , High efficiency . stay AQS The static block in calculates the offset address of each attribute relative to the class , And calling Unsafe The method in will pass the offset address .
And let's look back , here unsafe The attributes of the operation are defined as volatile modification , This is because they are used when they are modified CAS Algorithm , We're going to use vilotile Decorate to ensure its visibility .
private volatile int state; // Current lock state
Now we have a general idea of AQS The underlying mechanism of , Then let's see how it is used . Take a look at the five ways it can be rewritten .
// Get sync state exclusively , Check whether the synchronization status is consistent with the parameters , If there is no problem, pass CAS Set the synchronization status and return to true
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// Exclusive release synchronization state
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// Shared get synchronization status , Return value greater than 0 It means success , Otherwise failure
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// Shared release synchronization state
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// Whether it is occupied by the current thread in exclusive mode ( Whether the current thread holds a lock )
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
Now let's use ReentantLock Take the fair lock of , See how it was rewritten .
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
...
}
Have a look first lock Method . transfer AQS Of acquire() Method . It will call AQS As defined in tryAquire Method . And in the ReentrantLock in tryAuire Methods the implementation of fair lock is different from that of non fair lock , We will skip the specific content of . Short circuit is used here && operation , If you get the lock , You won't follow the logic behind . Otherwise, it will call acquireQueued, It's called internally addWaiter. That is, if other threads hold locks , The current node will be added to the waiting queue .
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // The node is in exclusive mode ,EXCLUSIVE
selfInterrupt();
}
Follow me addWaiter Look at it .
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try it first CAS Join the team directly , If CAS Team success , be return
Node pred = tail;
if (pred != null) {
// The initial state tail If the tail node is not assigned, it means null, If it is not empty, it indicates that other nodes have been inserted
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// CAS Failure ( Other threads are also acquiring locks or tail The node is empty and cannot be cas)
enq(node);
return node;
}
The notes above are very clear , Let's see enq How to achieve , It is actually AQS A spin mechanism of .
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// This indicates that the head and tail nodes are not initialized
if (compareAndSetHead(new Node())) // Set the new node header to null
tail = head; // The head and tail nodes point to the same node
} else {
node.prev = t; // Queue insert node operation , Put the current node prev Point to the tail node
if (compareAndSetTail(t, node)) {
// Set the tail node of the queue to the current node just inserted
t.next = node;
return t;
}
}
}
}
addWaiter It's over at last , Go back and have a look , Its result will be passed as a parameter to acquireQueued, Let's take a look at acquireQueued. It will also enter the spin state when it gets the returned node ( Successfully entered the waiting queue , Ready to queue up to get the lock ).
The process can be understood in combination with the following figure .
The specific code is as follows .
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // Success or failure marks , Initial set to true
try {
boolean interrupted = false; // Interrupt flag
for (;;) {
final Node p = node.predecessor(); // Get the precursor node of the currently inserted node
if (p == head && tryAcquire(arg)) {
// If the precursor node is the head node , Indicates that the current node is at the head of the queue ( The node in the figure above 1), Would call tryAcquire Grab the lock
setHead(node); // Successful lock snatching , Node out of the team
p.next = null; // Suggest GC
failed = false;
return interrupted; // Normal return , Not interrupted in the waiting queue
}
// The current node is not the team leader node , Set the waiting state of the predecessor node of the current node to signal(siganl meaning :siganl The next node in state is in equal lock state ). If the setting fails, proceed to the next cycle , Or go ahead
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // adopt Unsafe Class operates on the underlying suspended thread ( Directly into the blocking state , That is, the state of waiting for the lock )
return Thread.interrupted();
}
Look again. shouldParkAfterFailedAcquire The concrete logic of .
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // If the precursor node is already signal, Then return directly true
return true;
if (ws > 0) {
// ws>0 Indicates that the precursor node has been canceled , Cannot be a cancelled node , Traverse forward until you find the first node that has not been cancelled
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node; // Discard all cancelled nodes
} else {
// The precursor node is not signal, Use CAS Set to signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; // return false, Go straight to the next round , Judge CAS Whether the precursor node status is successfully set to signal
}
In the above code analysis process , We often see park,unpark Method , Their role is to suspend the thread , And releasing the thread's suspended state . Look at the following sample code .
public class Demo22 {
public static void main(String[] args) {
Thread t = Thread.currentThread();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("unpark thread t");
LockSupport.unpark(t);
// thread.interrupt();
} catch (InterruptedException e) {
}
}).start();
System.out.println("Thread t to be park...");
LockSupport.park();
System.out.println("Thread t unpark successfully");
}
}
The result of its operation is .
Thread t to be park...
unpark thread t
Thread t unpark successfully
Only this and nothing more ,ReentrantLock Fair lock Lock The method has been explained . further . Let's go on to see its tryAcquire Method . That is to see how it grabs the lock .
// Fair implementation of reentrant exclusive lock
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // Get current AQS The state of , In exclusive mode, if it is 0 Indicates that... Is not occupied , If it is greater than 0 Indicates that it has been occupied
if (c == 0) {
if (!hasQueuedPredecessors() && // Judge whether the waiting queue is not empty and the current thread has not obtained the lock , In fact, it is whether the current thread needs to queue
compareAndSetState(0, acquires)) {
// CAS Set the state of , If successful, it means that the lock has been successfully obtained
setExclusiveOwnerThread(current); // Set the thread owner of the exclusive lock to the current thread
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// If AQS State not for 0, Indicates that the lock is occupied , Determine whether the occupier is the current thread
int nextc = c + acquires; // Every time the lock is added, the status value is increased by one
if (nextc < 0) // Add to int Overflowed
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; // In any other case, return to false, Locking failed
}
}
The locking process is over , Next, let's look at its unlocking process .
see ReentrantLock Of unlock Method . The original call is release Method , State transfer parameter 1, Because the number of times the lock is released is 1.
public void unlock() {
sync.release(1);
}
have a look AQS Of release Method .
public final boolean release(int arg) {
if (tryRelease(arg)) {
// Try unlocking
Node h = head;
if (h != null && h.waitStatus != 0) // The header node is not empty and waitStatus State not for 0( The initial state is 0, When set to signal after -1)
unparkSuccessor(h); // Wake up the next successor node
return true;
}
return false;
}
So let's see unparkSuccessor. See how it wakes up the next node .
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) // If the wait state <0, That is the signal state , Set it to 0, That is, restore the State
compareAndSetWaitStatus(node, ws, 0);
// Get the successor node of the current node
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// If there is no next node or state >0( Has been cancelled ), Traverse nodes to find other matching nodes unpark Required nodes
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // From the end of the team to the front of the team
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // If you don't find it , Forget it , If you find it unpark
LockSupport.unpark(s.thread);
}
I want to see others release Medium tryRelease.
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // The current state value minus the number of times to release the lock ( The prequel is 1)
if (Thread.currentThread() != getExclusiveOwnerThread()) // An exclusive lock , If the current thread does not hold the lock , Throw an exception
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// The status value after unlocking is 0, Then fully release the lock
free = true;
setExclusiveOwnerThread(null);
}
setState(c); // The status value
return free; // Is it completely released
}
The following is a diagram of locking 、 Make a summary of the release mechanism .
The situation of unfair lock is generally introduced as follows . There's no waiting line at all , Come up and Bang CAS.
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
版权声明
本文为[Half old 518]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230955171151.html
边栏推荐
- Longest common front string
- 通过流式数据集成实现数据价值(5)- 流分析
- Solving Lucas number and combination theorem
- Question bank and answers of Shanghai safety officer C certificate examination in 2022
- Realize data value through streaming data integration (3) - real-time continuous data collection
- GCD of p2257 YY (Mobius inversion)
- 101. Symmetric Tree
- A concise course of fast Fourier transform FFT
- LeetCode 1249. Minimum Remove to Make Valid Parentheses - FB高频题1
- Exercise questions and simulation test of refrigeration and air conditioning equipment operation test in 2022
猜你喜欢

101. Symmetric Tree
![[untitled]](/img/6c/df2ebb3e39d1e47b8dd74cfdddbb06.gif)
[untitled]

Solving Lucas number and combination theorem

元宇宙时代的职业规划与执行

从知识传播的维度对比分析元宇宙

Chinese Remainder Theorem and extended Chinese remainder theorem that can be understood by Aunt Baojie

0704、ansible----01

Examination questions and answers of the third batch (main person in charge) of Guangdong safety officer a certificate in 2022

Juc并发编程06——深入剖析队列同步器AQS源码

Juc并发编程09——Condition实现源码分析
随机推荐
计算机网络安全实验二|DNS协议漏洞利用实验
杰理之AES能256bit吗【篇】
杰理之用户如何最简单的处理事件【篇】
Skill point digging
DBA common SQL statements (3) - cache, undo, index and wait events
实践六 Windows操作系统安全攻防
Less than 100 secrets about prime numbers
GCD of p2257 YY (Mobius inversion)
[ACM-ICPC 2018 Shenyang Network preliminaries] J. Ka Chang (block + DFS sequence)
Longest common front string
Mobius inversion
SQL tuning series - Introduction to SQL tuning
DBA常用SQL语句(2)— SGA和PGA
Prefix sum of integral function -- Du Jiao sieve
《Redis设计与实现》
Go语言实践模式 - 函数选项模式(Functional Options Pattern)
[hdu6868] absolute math (pusher + Mobius inversion)
Computer network security experiment II DNS protocol vulnerability utilization experiment
Chapter II in memory architecture (im-2.2)
DBA common SQL statements (5) - latch related