Предыдущая статья15 000 слов, 6 вариантов кода и 5 принципиальных схем позволят вам полностью понять SynchronizedупомянулsynchronizedЗависит отobject реализовано монитором
В мониторе объекта стек cxq и список записей реализуют очередь блокировки, а набор ожидания реализует очередь ожидания, тем самым реализуя синхронизированный режим ожидания/уведомления.
Параллельный пакет JUC в JDK также реализует режим ожидания/уведомления посредством аналогичных очередей блокировки и очередей ожидания.
В этой статье речь пойдет о краеугольном камне JUC, AQS (AbstractQueuedSynchronizer).
Предварительные знания, которые вам необходимо знать: CAS, энергозависимые
Если вы не знаете CAS, можете прочитать предыдущую статью про синхронизированный (ссылка выше).
Если вы не знаете волатильный, вы можете прочитать эту статью 5 кейсов и блок-схемы помогут вам понять ключевое слово volutability от 0 до 1
Эта статья посвящена AQS,Опишите структуру данных и проектирование в AQS простым и понятным способом. и приобретение、Освободить состояние Процесс исходного кода, состояние и т. д. синхронизации
Просмотр этой статьи займет около 10 минут. Прочитать ее можно, задав несколько вопросов.
Что такое АКС?
AQS — это очередь синхронизации (очередь блокировки), которая является основой параллельных пакетов. Многие компоненты синхронизации в параллельных пакетах реализованы с помощью AQS, например: ReentrantLock, блокировка чтения-записи, семафор и т. д.
AQS имеет три важных поля, а именно: head Головной узел, хвост Хвостовой узел, состояние Статус синхронизации
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//головной узел
private transient volatile Node head;
//Хвостовой узел
private transient volatile Node tail;
//Статус синхронизации
private volatile int state;
}
Головной и хвостовой узлы легко понять, поскольку AQS сам по себе представляет собой двусвязный список, а затем состояниеStatus синхронизациичто такое?
Использование Статуса в AQS синхронизация представляет ресурс, а затем использует CAS для получения/освобождения ресурса, например, присваивая ресурсу значение 1, и поток, чтобы попытаться получить ресурс из-за статуса В настоящее время синхронизация равна 1, поэтому тема CAS заменяет Статус синхронизациидля0,После успеха это означает, что ресурс получен.,После этого другие потоки не смогут получить ресурсы (состояние равно 0).,Пока поток, получивший ресурс, не освободит его.
Вышеописанное приобретение/освобождение ресурсов также можно понимать как приобретение/освобождение блокировок.
При этом все три поля изменяются изменчиво.,Используйте Volatility, чтобы обеспечить видимость памяти и не дать текущему потоку узнать, когда другие потоки изменяют эти данные.
Из приведенного выше описания мы можем знать, что AQS выглядит следующим образом.
Если потоку не удается получить ресурсы, он будет создан как узел и добавлен в AQS.
Node Node — это внутренний класс в AQS. Давайте посмотрим на некоторые важные поля в Node.
static final class Node {
//узелсостояние volatile int waitStatus;
//Узел-предшественник
volatile Node prev;
//узел-преемник
volatile Node next;
//Поток, представленный текущим узлом
volatile Thread thread;
//Последующий указатель узла при ожидании использования очереди
Node nextWaiter;
}
предыдущая, следующая и тема должны быть простыми для понимания.
И очередь синхронизации AQS, и очередь ожидания используют этот тип узла. Когда узел очереди ожидания пробуждается и выводится из очереди, удобно присоединиться к очереди синхронизации.
nextWaiter используется узлами для указания следующего узла в очереди ожидания.
waitStatus представляет статус узла
состояние | иллюстрировать |
---|---|
INITIAL | 0 исходныйсостояние |
CANCELLED | 1 Поток, соответствующий узлу, отменен |
SIGNAL | -1 Поток, соответствующий узлу, заблокирован и ожидает пробуждения для конкурирующих ресурсов. |
CONDITION | -2 Узел находится в очереди ожидания (условия). После ожидания пробуждения он будет поставлен в очередь из очереди ожидания и вступит в соревнование очереди синхронизации. |
PROPAGATE | -3 В случае общего будут пробуждаться все последующие общие узлы. |
Не беда, если вы не совсем понимаете ситуацию, мы поговорим об этом позже, когда встретимся.
После приведенного выше описания узел, вероятно, выглядит так
AQSЕсть еще один внутренний класс вConditionObject
Используется для реализации очередей ожидания/условная очередь,Давай поговорим об этом позже
AQS можно разделить на эксклюзивные、общий режим, эти два режима также могут поддерживать Реагировать на прерывание、Наносекундный тайм-аут
Под эксклюзивным режимом можно понимать только один поток, который может обрабатывать статус синхронизации одновременно.
Общий режим можно понимать как наличие нескольких потоков, способных выполнять статус синхронизации,Обычно используется в методахshared
логотип
Обычно используется в методахacquire
логотип Получить статус синхронизации,release
логотип Освободить состояние синхронизации
Все эти методы являются шаблонными.,предписанный процесс,Оставьте конкретную реализацию классу реализации (например, Получить статус синхронизация, как ее получить и оставить реализацию на усмотрение класса реализации)
Эксклюзивная церемония на самом деле означает, что только одному потоку разрешено монополизировать ресурс одновременно. В случае многопоточной конкуренции может быть только один поток. статус синхронизацииуспех
Нет Реагировать на прерыванияэксклюзивный доступ и Реагировать на прерывания、Похоже на: тайм-аут,Мы используемacquire
для Пример просмотра исходного кода
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire
Способы попробовать Tweet статус синхронизации, параметр arg указывает, сколько Статуса получено синхронизация, возвращает true, если получено успешно выйдет из метода, оставив его реализацию классу реализации
addWaiter(Node.EXCLUSIVE) создает узел Эксклюзивная церемония и добавляет его в конец AQS, используя CAS+повторную попытку сбоя.
private Node addWaiter(Node mode) {
//Строим узел
Node node = new Node(Thread.currentThread(), mode);
//Хвостовой узел Нетдля Пустые правилаCASзаменить хвостузел
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//Хвостовой узелдля Пустой илиCASнеудачное выполнениеenq
enq(node);
return node;
}
private Node enq(final Node node) {
//Повторить попытку в случае неудачи
for (;;) {
Node t = tail;
//Нет хвостового узла Затем CAS устанавливает головной узел (головной и хвостовой узлы — это один узел), в противном случае CAS устанавливает хвостовой узел.
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq
Основной метод – спиннинг.(наполовину Нетвстреча Введите ожиданиемодель)идтиCASпоставь хвостузел,Если в AQS нет узла, головной и хвостовой узлы являются одним и тем же узлом.
Поскольку при добавлении хвостового узла существует конкуренция, для замены хвостового узла необходимо использовать CAS.
acquireQueued
Метод в основном используется дляAQSв очередиизузелот вращения Получить статус синхронизации,В этом вращении он не всегда выполняется.,Но будет парк. ожидание
final boolean acquireQueued(final Node node, int arg) {
//Записываем, не удалось ли это сделать
boolean failed = true;
try {
//прервалась ли запись
boolean interrupted = false;
//Повторить попытку в случае неудачи
for (;;) {
//p узел-предшественник
final Node p = node.predecessor();
//еслиузел-предшественникдляголоваузел,и попробуй Получить статус синхронизация возвращается в случае успеха
if (p == head && tryAcquire(arg)) {
//Установим головной узел
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//Если не получилось, ставим отметку и ждем прерывания проверки.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//Отменяем получение, если не удалось
if (failed)
cancelAcquire(node);
}
}
Пытающийся Получить статус синхронизации Перед этим есть условиеp == head && tryAcquire(arg)
:узел-предшественникдаголоваузел
Таким образом, статус приобретения узла в AQS — FIFO.
Но даже если удовлетворенузел-предшественникдаголоваузел,и Нет Это обязательно сработает Получить статус синхронизация прошла успешно, поскольку потоки, которые еще не присоединились к AQS, также могут попробовать статус синхронизация, для достижения нечестных блокировок
Так как же добиться справедливой блокировки?
Просто добавьте это условие, прежде чем пытаться получить статус синхронизации!
Давайте посмотрим еще разshouldParkAfterFailedAcquire
Получить статус синхронизация должна быть припаркована после сбоя
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//Узел-предшественниксостояние
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//Узел-предшественниксостояниедаSIGNAL иллюстрироватьпредшественник Освободить состояние синхронизация вернись, чтобы проснуться Возврат напрямую
return true;
if (ws > 0) {
//Если состояние предшественника больше 0 иллюстрировать было отменено, поэтому продолжайте искать узел, который не был отменен.
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//Ранг после узлов, которые не были отменены
pred.next = node;
} else {
//Передний привод никто не отменял и состояние не СИГНАЛЬНОЕ CASВолясостояниевозобновлятьдляSIGNAL,Освободить состояние синхронизация приближается, чтобы проснуться
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
На самом деле это некоторая подготовка перед парком
Давайте посмотрим еще раз parkAndCheckInterrupt
, используйте класс инструмента, чтобы войти в состояние ожидания, и проверьте, не прерывается ли оно после пробуждения.
private final boolean parkAndCheckInterrupt() {
//Поток входит и ждет состояния...
LockSupport.park(this);
//Проверяем, не прервано ли оно (Бит флага прерывания будет очищен)
return Thread.interrupted();
}
существоватьacquireQueued
изсерединаеслиеще нет Получить статус синхронизация выдает исключение,в конечном итоге будет выполненоcancelAcquire
Отмена
Возвращайте true при обнаружении прерывания,Приходите на первый этажacquire
выполнение методаselfInterrupt
метод,Прервите ветку сами
получить блок-схему:
<!---->
<!---->
действовать первым Освободить состояние синхронизация, после успеха состояние головного узла не равно 0 Разбудите узел, следующее состояние которого не отменено
public final boolean release(int arg) {
//Освободить состояние синхронизации
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//Пробуждение следующего узла, состояние которого не больше 0 (если оно больше 0, оно отменяется)
unparkSuccessor(h);
return true;
}
return false;
}
acquireInterruptibly
используется для Реагировать на прерыванияиз Получить статус синхронизации
public final void acquireInterruptibly(int arg)
throws InterruptedException {
//Проверяем, прервано ли оно, прерывание вызывает исключение
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly
Как и в исходном процессе, исключение прерывания выдается при обнаружении прерывания после пробуждения.
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//Вызываем исключение прерывания, когда обнаруживается, что оно было прервано после пробуждения.
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Когда синхронизация статуса Tweet Реагировать на прерывание прерывается, автоматически генерируется исключение прерывания, а то, которое не отвечает, является собственным прерыванием.
Тайм-аут ответаиз Получить статус синхронизациииспользоватьtryAcquireNanos
Таймаут находится на уровне наносекунд.
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
Это можно увидеть Тайм-аут ответа В то же время это также будет Реагировать на прерывания
doAcquireNanos
Это также похоже на исходный процесс
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//Сколько времени займет тайм-аут?
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
//Тайм-аут истек
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
//больше 1 мс
nanosTimeout > spinForTimeoutThreshold)
//Тайм-аут ожидания
LockSupport.parkNanos(this, nanosTimeout);
//Реагировать на прерывания
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Тайм-аут ответ рассчитает, сколько времени потребуется для тайм-аута во время вращения. Если оно превышает 1 мс, он будет ждать соответствующее время, в противном случае он продолжит вращение. на прерывания
Общий метод позволяет нескольким потокам одновременно получать определенные ресурсы. Например, семафоры и блокировки чтения реализуются с использованием общего метода.
на самом делеобщий Формула и Эксклюзивная церемония похожая процедура, просто попробуйте Tweet статус Различные реализации синхронизации
Мы используем Получить статус синхронизацииизметод Приходитьиллюстрировать
общий Режим Получить статус синхронизациииспользоватьacquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared
пытаться Получить статус синхронизации, параметр arg указывает, сколько Статуса получено синхронизации,Возвращает количество оставшихся доступных данных Получить статус синхронизации
Если осталось, можно получить статус количество синхронизации меньше 0 иллюстрировать еще нетполучатьуспех ВходитьdoAcquireShared
private void doAcquireShared(int arg) {
//Добавляем узел общего стиля
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//получатьузел-предшественник
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//еслиузел-предшественникдляголоваузел и Получить статус синхронизацииуспех Установите головной узел
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//Потеряем неудачу и вступим в вращение, которое будет ждать
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Реагировать на прерывание, тайм-аут и другие методы также аналогичны Эксклюзивной церемонии, но некоторые детали настройки отличаются.
Как уже говорилось выше, AQS выступает в роли очереди блокировки (синхронизации), а Condition выступает в роли очереди ожидания.
Внутренний класс ConditionObject AQS является реализацией Condition. Он действует как очередь ожидания и использует поля для записи начального и хвостового узлов.
public class ConditionObject implements Condition{
//головной узел
private transient Node firstWaiter;
//Хвостовой узел
private transient Node lastWaiter;
}
NextWait используется между узлами, чтобы указать на следующий узел и сформировать односторонний связанный список.
Также доступенawait
рядметод Приходить让当前线程Введите ожидание,signal
рядметод Приходитьбудить
public final void await() throws InterruptedException {
//Реагировать на прерывания
if (Thread.interrupted())
throw new InterruptedException();
//Добавляем в конец Нет необходимости гарантировать атомарность, поскольку если вы можете указать await, вы должны получить ресурс синхронизации.
Node node = addConditionWaiter();
//Отпускаем полученный Статус синхронизации
int savedState = fullyRelease(node);
int interruptMode = 0;
//Если вы не в очереди на синхронизацию, парк войдет и будет ждать.
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//Вращаемся после пробуждения Tweet статус синхронизации
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//Очистка после отмены
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await в основном добавляет узлы в условие По окончании объекта отпустите полученный Статус синхронизация, вход в ожидание, отжим после пробуждения Tweet статус синхронизации
Основная логика сигнала находится в TransferForSignal.
final boolean transferForSignal(Node node) {
//CAS изменяет состояние узла Возврат в случае неудачи аннулироваться
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//Добавить в конец AQS
Node p = enq(node);
int ws = p.waitStatus;
//CASВоляузелсостояние Изменить наSIGNAL В случае успеха разбудите узел
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
Сигнал в основном меняет статус с условия -2 на 0 (отменяет узел в случае сбоя), затем добавляет его в конец AQS и, наконец, меняет статус на сигнал -1 и пробуждает узел в случае успеха.
Зачем добавлять его в конец AQS или использовать enq для операции повторной попытки CAS+failed, чтобы гарантировать атомарность?
Поскольку разрешено несколько объектов ConditionObject, одна очередь синхронизации AQS может соответствовать нескольким очередям ожидания (условий).
В этой статье AQS взят за основу и подробно и просто описывает структуру данных, идеи дизайна, процесс получения/освобождения ресурсов синхронизации на уровне исходного кода, условия и т. д., реализованные с помощью AQS.
AQS использует головной и хвостовой узлы для реализации двусторонних очередей.,Предоставляет шаблонные методы для Статус синхронизации и Получить/Освободить состояние синхронизации для реализации блокирующих (синхронизированных) очередей.,и Эти поляиспользоватьvolatileИзменить,Убедитесь, что видимость соответствует читаемой сцене.,Нет необходимости гарантировать атомарность,CAS обычно используется при написании сценариев для обеспечения атомарности.
AQS и Condition используют узлы одного и того же типа. В AQS узлы поддерживаются как двусторонний связанный список, а в Condition узлы поддерживаются как односторонний связанный список. В дополнение к поддержанию указывающих связей узлы также должны поддерживаться. запишите соответствующий статус потока и узла.
AQSточкадля Эксклюзивная церемонияиобщий Режим,использовать Эксклюзивная Разрешается только одна беседа, когда церемония Tweet статус синхронизация, когда использование общего стиля позволяет использовать несколько потоков статус синхронизации;который также обеспечивает Реагировать на прерывания, аналогичные методы ожидания таймаута
Получить статус синхронизации:Первыйпытаться Получить статус синхронизации,В случае неудачи узел будет добавлен в конец AQS с использованием метода повторной попытки CAS+сбой.,жду, чтобы бытьузел-предшественникбудить;только еслиузел-предшественникдляголоваузели Получить статус синхронизацииуспех Только что вернулся,в противном случае Введите ожидание,Продолжайте попытки (вращаться) после пробуждения, если в этот период происходит исключение;,Узел будет отменен перед выдачей исключения
Освободить состояние синхронизации:пытаться Освободить состояние синхронизации,успехназадбудитьназад继еще нет被Отменаизузел
существовать Получить статус При синхронизации флаг прерывания будет проверяться после пробуждения. Если он установлен, Реагировать. на прерывание напрямую вызовет исключение прерывания, в то время как не отвечающее прерывание прервется на самом внешнем уровне.
Тайм-аут ответачас,Хронометраж во время отжима Получить статус синхронизации,если Расстояние суперчасменьше, чем1msСразу Нет Введите ожиданиеиз Вращаться,Если оно больше значения, подождите соответствующее время.
AQS действует как блокирующая очередь,Условие действует как очередь ожидания для реализации шаблона ожидания/уведомления.,AQSизвнутренний классConditionObjectсуществоватьawaitчасвстреча加入Conditionконец и Освободить состояние синхронизация входит в очередь ожидания и крутится после пробуждения (при сбое войдет в ожидание) статус синхронизация; если он один, CAS добавит узел заголовка условия в хвост AQS, а затем активирует его (поскольку один AQS может соответствовать нескольким условиям, поэтому CAS необходим для обеспечения атомарности)
Эта статья включена в рубрику От точки к линии, от линии к поверхности: создавайте систему знаний по параллельному программированию на Java простыми словами.,заинтересованныйиз Студенты могут продолжитьсосредоточиться наох
Примечания и случаи этой статьи включены gitee-StudyJava、 github-StudyJava Заинтересованные студенты могут следить за статистикой и продолжать обращать внимание~
Если у вас есть какие-либо вопросы, вы можете обсудить их в области комментариев. Если вы считаете, что письмо Цай Цая хорошее, вы можете поставить лайк, подписаться на него и собрать его, чтобы поддержать его ~
Следите за Цай Цаем и делитесь более полезной информацией, общедоступный аккаунт: внутренняя кухня Цай Цая.
Я участвую в последнем конкурсе эссе для специального учебного лагеря Tencent Technology Creation 2024. Приходите и разделите со мной приз!