Неблокирующие concurrent очереди
Пакет java.util.concurrent для формирования неблокирующих очередей с поддержкой многопоточности включает два класса на связанных узлах :
Классы формирования неблокирующих очередей реализуют интерфейсы Queue и Deque.
На странице представлены два примера использования неблокирующих очередей :
Интерфейс Queue
Обобщенный интерфейс Queue расширяет базовый интерфейс коллекции Collection и определяет поведение класса в качестве однонаправленной очереди. Функциональность интерфейса определяется следующими методами :
В таблице методы интерфейса Queue систематизированы.
| Действие | Вызов исключения | Возврат значения |
|---|---|---|
| Вставить | add(e) | offer(e) |
| Удалить | remove() | poll () |
| Получить | element() | peek () |
Метод offer предназначен для использования, когда отказ в размещения элемента в очереди является нормальным явлением, а не исключением, например, с фиксированной емкостью.
Методы remove() и poll() отличаются только по их поведению при пустой очереди : метод remove() выдает исключение, в то время как метод poll() возвращает null.
Методы element() и peek() возвращают элемент из головы очереди, но не удаляют его.
Реализации интерфейса Queue обычно не позволяют вставку элементов null, хотя некоторые его реализации типа LinkedList не запрещают вставку null. Даже в реализациях, которые допускают это, null не должен быть вставлен в Queue, поскольку null также используется в качестве специального возвращаемого значения методом poll, чтобы указать, что очередь не содержит элементов.
Полная документация по интерфейсу Queue представлена здесь.
Очередь ConcurrentLinkedQueue
Класс ConcurrentLinkedQueue формирует неблокирующую, основанную на связанных узлах и ориентированную на многопоточное исполнение очередь. Размер очереди ConcurrentLinkedQueue не имеет ограничений. Эта очередь реализует принцип FIFO — «первым прибыл, первым убыл». Новые элементы размещаются в хвосте очереди, а операции извлечения получают элементы из головы очереди. Этот класс не разрешает использование null элементов.
Очередь ConcurrentLinkedQueue использует эффективный неблокирующий алгоритм «без ожидания» предложенный Мэджедом М. Майклом и Майклом Л. Скоттом (Maged M. Michael, Michael L. Scott).
Итераторы класса отражают состояние очереди на определенный момент времени его создания и не вызывают ConcurrentModificationException. Содержавшиеся в очереди элементы с начала создания iterator’a, будут возвращены точно по запросу.
Аккуратно используйте метод size, который в отличие от большиства наборов, не является constant-time operation. Из-за асинхронной природы этой очереди, определение текущего количества элементов может быть не точным и требует обхода элементов, если этот набор изменяется во время обхода. Объемные операции addAll, removeAll, retainAll, containsAll, equals и toArray не гарантируют, что будут выполнены атомарно. Например, iterator, работающий с методом addAll, мог бы просмотреть только некоторые из добавленных элементов.
Очередь ConcurrentLinkedQueue и iterator реализуют все дополнительные методы интерфейсов Queue и Iterator.
Конструкторы ConcurrentLinkedQueue
Класс ConcurrentLinkedQueue имеет два конструктора. Первый конструктор без параметров создает пустую очередь. Второй конструктор создает очередь и включает в неё элементы набора, добавленные в порядке обхода iterator’a.
Полная документация по неблокирующей очереди ConcurrentLinkedQueue представлена здесь.
Пример очереди ConcurrentLinkedQueue
В примере использования неблокирующей очереди ConcurrentLinkedQueueExample создаются два потока. Первый поток Producer с задержками в 200 мс помещает в очередь 10 текстовых сообщений. Второй поток Consumer проверяет наполнение очереди и извлекает из очереди сообщения с задержкой 500 мс. Таким образом, очередь наполняется быстрее, чем опустошается. Пример демонстрирует, каким образом можно использовать неблокирующую очередь ConcurrentLinkedQueue.
Выполнение примера
При выполнении примера сообщения, представленные ниже, выводятся в консоль. Последовательность сообщений обусловлена временны́ми задержками в потоках при выполнении операций с очередью.
Интерфейс Deque
Интерфейс Deque расширяет (extends) свойства интерфейса Queue и обеспечивает двусторонний доступ к элементам очереди. Большинство реализаций Deque не устанавливают ограничение на количество элементов, которое они могут содержать в очереди. Но этот интерфейс поддерживает ограниченные емкостью двухсторонние очереди также, как и очереди без определения фиксированного размера.
Интерфейс Deque включает методы доступа к элементам в обоих концах двухсторонней очереди. Эти методы обеспечивают вставку, удаление и извлечения элемента. Каждый из этих методов существует в двух формах: один вызывает исключение, если выполнение невозможно, другой возвращает специальное значение (null или false). Методы интерфейса Deque представлены в таблице.
| Первый элемент (голова) | Последний элемент (хвост) | |||
| С исключением | Получение значения | С исключением | Получение значения | |
| Вставить | addFirst(e) | offerFirst(e) | addLast(e) | offerLast(e) |
| Удалить | removeFirst(e) | pollFirst(e) | removeLast(e) | pollLast(e) |
| Исследовать | getFirst(e) | peekFirst(e) | getLast(e) | peekLast(e) |
Наследованные от интерфейса Queue следующие методы эквивалентны методам Deque :
| Метод Queue | Эквивалентный метод Deque |
| add(e) | addLast(e) |
| offer(e) | offerLast(e) |
| remove() | removeFirst() |
| poll() | pollFirst() |
| element() | getFirst() |
| peek() | peekFirst() |
Двухсторонние очереди Deque могут быть использованы как LIFO (last-in-first-out) стек (Stack). Данный интерфейс должен иметь предпочтение по отношению к классу Stack. Когда двухсторонняя очередь используется в качестве стека, элементы вставляются и выталкиваются из начала двухсторонней очереди. Методы Stack’a эквивалентны следующим методам Deque :
| Метод Stack’a | Эквивалентный метод Deque |
| push(e) | addFirst(e) |
| pop() | removeFirst() |
| peek() | peekFirst() |
Метод peek одинаково хорошо работает при использовании Deque в качестве двухсторонней очереди или стека; в любом случае элементы извлекаются из начала двухсторонней очереди.
Для удаления внутренних элементов Deque имеет два метода : removeFirstOccurrence и removeLastOccurrence.
В отличие от List этот интерфейс не оказывает поддержку индексному доступу к элементам.
Полная документация по интерфейсу Deque представлена здесь.
Очередь ConcurrentLinkedDeque
Неограниченная по емкости двухсторонняя очередь ConcurrentLinkedDeque, основанная на связанных узлах, реализует интерфейс Deque. Очередь обеспечивает безопасные операции размещения, удаления и доступа параллельно выполняющимся потокам. То есть, ConcurrentLinkedDeque можно использовать нескольким потокам одновременно для совместного доступа к общему набору. Данный класс не разрешает размещение null элементов в очереди.
Итераторы класса отражают состояние двухсторонней очереди на момент их создания, не вызывают ConcurrentModificationException и могут использоваться одновременно с другими операциями.
Также, как и с интерфейсом Queue, аккуратно используйте метод size, который не является constant-time operation. Из-за асинхронной природы этой очереди, определение текущего количества элементов может быть не точным и требует обхода элементов, если этот набор изменяется во время обхода. Объемные операции addAll, removeAll, retainAll, containsAll, equals и toArray не гарантируют, что будут выполнены атомарно. Например, iterator, работающий с методом addAll, мог бы просмотреть только некоторые из добавленных элементов.
Конструкторы ConcurrentLinkedDeque
Класс ConcurrentLinkedDeque имеет два конструктора. Первый конструктор без параметров создает пустую очередь. Второй конструктор создает очередь и включает в неё элементы набора, добавленные в порядке обхода iterator’a.
Полная документация по неблокирующей очереди ConcurrentLinkedDeque представлена здесь.
Пример очереди ConcurrentLinkedDeque
В примере использования неблокирующей очереди ConcurrentLinkedDeque создаются два потока. Первый поток Producer с задержками в 200 мс помещает в очередь 10 текстовых сообщений. В зависимости от размера очереди (четное/нечетное) элемент помещается либо в хвост очереди, либо в начало. Второй поток Consumer проверяет наполнение очереди и извлекает из очереди сообщения с задержкой 500 мс. Consumer также проверяет четность размера очереди и извлекает элемент либо из головы, либо из хвоста очереди.
Так как задержки в Producer меньше, чем в Consumer, то очередь наполняется быстрее, чем опустошается. Пример демонстрирует, каким образом можно использовать неблокирующую двухстороннюю очередь ConcurrentLinkedDeque.
Выполнение примера
При выполнении примера в консоль выводятся представленные ниже сообщения. Последовательность сообщений обусловлена временны́ми задержками в потоках при выполнении операций с очередью.
Concurrent Linked Queue Class
Definition
Some information relates to prerelease product that may be substantially modified before it’s released. Microsoft makes no warranties, express or implied, with respect to the information provided here.
An unbounded thread-safe Queue queue based on linked nodes.
Remarks
Portions of this page are modifications based on work created and shared by the Android Open Source Project and used according to terms described in the Creative Commons 2.5 Attribution License.
Constructors
Creates a ConcurrentLinkedQueue that is initially empty.
Creates a ConcurrentLinkedQueue that is initially empty.
A constructor used when creating managed representations of JNI objects; called by the runtime.
Properties
The handle to the underlying Android instance.
(Inherited from AbstractCollection)
This API supports the Mono for Android infrastructure and is not intended to be used directly from your code.
This API supports the Mono for Android infrastructure and is not intended to be used directly from your code.
Methods
Creates and returns a copy of this object.
(Inherited from AbstractCollection)
Retrieves, but does not remove, the head of this queue.
Indicates whether some other object is «equal to» this one.
Returns a hash code value for the object.
Returns an iterator over the elements in this queue in proper sequence.
Called by the garbage collector on an object when garbage collection determines that there are no more references to the object.
Wakes up a single thread that is waiting on this object’s monitor.
Wakes up all threads that are waiting on this object’s monitor.
Inserts the specified element at the tail of this queue.
Retrieves, but does not remove, the head of this queue, or returns null if this queue is empty.
Retrieves and removes the head of this queue, or returns null if this queue is empty.
Retrieves and removes the head of this queue.
Sets the Handle property.
Returns the number of elements in this queue.
Returns a Spliterator over the elements in this queue.
(Inherited from AbstractCollection)
Returns a string representation of the object.
(Inherited from Object)
Causes the current thread to wait until another thread invokes the java.lang.Object#notify() method or the java.lang.Object#notifyAll() method for this object.
Causes the current thread to wait until another thread invokes the java.lang.Object#notify() method or the java.lang.Object#notifyAll() method for this object.
Causes the current thread to wait until another thread invokes the java.lang.Object#notify() method or the java.lang.Object#notifyAll() method for this object.
Explicit Interface Implementations
| IJavaPeerable.Disposed() | (Inherited from Object) |
| IJavaPeerable.DisposeUnlessReferenced() | (Inherited from Object) |
| IJavaPeerable.Finalized() | (Inherited from Object) |
| IJavaPeerable.JniManagedPeerState | (Inherited from Object) |
| IJavaPeerable.SetJniIdentityHashCode(Int32) | (Inherited from Object) |
| IJavaPeerable.SetJniManagedPeerState(JniManagedPeerStates) | (Inherited from Object) |
| IJavaPeerable.SetPeerReference(JniObjectReference) | (Inherited from Object) |
Extension Methods
Performs an Android runtime-checked type conversion.
Русские Блоги
Подробное объяснение ConcurrentLinkedQueue в параллельном программировании на Java
Введение
Диаграмма классов ConcurrentLinkedQueue выглядит следующим образом:
ConcurrentLinkedQueue состоит из головного узла и хвостового узла. Каждый узел (узел) состоит из элемента узла (элемента) и ссылки на следующий узел (следующий). Узлы и узлы соединяются через него рядом, чтобы сформировать связанный список. Структура очереди.
Подробное объяснение исходного кода ConcurrentLinkedQueue
Как мы уже упоминали ранее, все узлы ConcurrentLinkedQueue являются типами узлов:
Класс Node также относительно прост и не будет объясняться. Класс ConcurrentLinkedQueue имеет следующие два метода конструирования:
По умолчанию элементы, хранящиеся в головном узле, пусты, а хвостовой узел равен головному узлу.
Ниже мы в основном рассмотрим операции enqueue и dequeue ConcurrentLinkedQueue.
Операция постановки в очередь
Постановка в очередь означает добавление узла постановки в очередь в конец очереди. Чтобы облегчить понимание изменений в очереди при входе в очередь, а также изменений в головном узле и хвостовом узле, я сделал снимок очереди для каждого добавленного узла:
Процесс добавления элементов, показанный на рисунке выше, выглядит следующим образом:
Операция постановки в очередь в основном выполняет две функции: во-первых, установка узла постановки в очередь на следующий узел текущего хвостового узла очереди. Во-вторых, необходимо обновить хвостовой узел. Если следующий узел хвостового узла не пуст, установите узел очереди на хвостовой узел. Если следующий узел хвостового узла пуст, установите узел очереди на следующий узел хвоста, поэтому tail Узлы не всегда являются хвостовыми узлами, и это важно понимать.
Вышеприведенный анализ позволяет нам понять процесс постановки в очередь с точки зрения постановки в очередь одного потока, но ситуация с множественным постановкой в очередь одновременно становится более сложной, поскольку другие потоки могут попасть в очередь. Если поток ставит в очередь, он должен сначала получить хвостовой узел, а затем установить следующий узел хвостового узла в качестве узла постановки в очередь, но в это время другой поток может быть вставлен в очередь, и хвостовой узел очереди изменится. В это время текущий поток должен приостановить операцию постановки в очередь, а затем повторно получить хвостовой узел.
Давайте посмотрим на метод добавления (E e) enqueue ConcurrentLinkedQueue:
С точки зрения исходного кода весь процесс регистрации в основном делает две вещи:
Хвостовой узел не обязательно является целью проекта хвостового узла.
Пусть хвостовой узел всегда будет хвостовым узлом очереди, так что объем кода реализации очень мал, а логика очень ясна и проста для понимания. Однако это имеет недостаток, заключающийся в том, что необходимо постоянно обновлять хвостовой узел с использованием циклического CAS. Если количество раз, которое CAS обновляет хвостовой узел, может быть уменьшено, эффективность постановки в очередь может быть улучшена.
В реализации JDK 1.7 doug lea использует переменную hops для управления и уменьшения частоты обновления хвостового узла.Он не обновляет хвостовой узел до хвостового узла каждый раз, когда узел присоединяется к команде, но когда расстояние между хвостовым узлом и хвостовым узлом больше, чем Хвостовой узел обновляется только в том случае, если он равен значению постоянной HOPS (значение по умолчанию равно 1.) Чем больше расстояние между хвостом и хвостовым узлом, тем меньше раз будет обновляться хвостовой узел с помощью CAS, но отрицательный эффект, вызванный тем, что это расстояние увеличивается каждый раз, когда вы входите в очередь Чем больше времени требуется для определения местоположения хвостового узла, потому что тело цикла должно выполнить еще один цикл для определения местоположения хвостового узла, но это все же может повысить эффективность постановки в очередь, потому что это существенно уменьшает количество операций чтения для изменчивых переменных. Операция записи энергозависимых переменных и стоимость операции записи энергозависимых переменных намного больше, чем операция чтения, поэтому эффективность постановки в очередь будет улучшена.
В реализации JDK 1.8 время обновления хвоста оценивается по тому, равны ли p и t. Результат реализации такой же, как в JDK 1.7, то есть когда расстояние между хвостовым узлом и хвостовым узлом больше или равно 1, хвост обновляется.
Общая логика операции постановки в очередь ConcurrentLinkedQueue показана на следующем рисунке:
Операция дежурного
Что выходит из очереди, так это вернуть элемент узла из очереди и очистить ссылку узла на этот элемент. Давайте посмотрим на изменения головного узла через снимок каждого узла в очереди:
Как видно из приведенного выше рисунка, головной узел не обновляется каждый раз, когда вы покидаете команду. Когда в головном узле есть элементы, элементы в головном узле напрямую вызываются без обновления головного узла. Только когда в головном узле нет элементов, операция удаления обновит головной узел. Этот метод также используется для уменьшения потребления CAS для обновления головного узла, тем самым повышая эффективность удаления из очереди. Давайте использовать исходный код для дальнейшего анализа процесса удаления очереди.
Основная логика этого метода заключается в том, чтобы сначала получить элемент головного узла, а затем определить, является ли элемент головного узла пустым. Если он пуст, это означает, что другой поток выполнил операцию удаления из очереди для удаления элемента узла. Если он не пуст, Затем используйте CAS, чтобы установить для ссылки на головной узел значение NULL. Если CAS успешен, он будет непосредственно возвращать элемент головного узла. Если он неуспешен, это означает, что другой поток выполнил операцию удаления из очереди и обновил головной узел, что привело к возникновению элемента. Изменить, нужно повторно приобрести головной узел.
В операциях enqueue и dequeue бывают ситуации, когда p == q. Как это происходит? Давайте посмотрим на такую операцию:
После того, как узел появляется, у хвостового узла есть пунктирная линия, указывающая на себя. Что это значит? Давайте посмотрим на метод poll (). В этом методе после удаления элемента вызывается метод updateHead:
Мы можем видеть, что после обновления заголовка следующее поле старого узла заголовка h будет указывать на h, и пунктирная линия, показанная на рисунке выше, также представляет собой собственную ссылку на этот узел.
Если в это время существует другой поток для добавления элементов, следующий узел, полученный через tail, все еще сам по себе, что вызовет p == q. После возникновения такой ситуации он вызовет выполнение обновления заголовка, которое будет Узел p перенаправляется на головку, и все «живые» узлы (относящиеся к неподброшенным узлам) могут быть достигнуты из головы через обход, так что хвостовой узел может быть успешно получен через голову, а затем добавлены элементы.
Другие связанные методы
метод peek ()
Как видно из исходного кода, операция peek изменит точку заголовка. После выполнения метода peek () головка будет указывать на первый узел с непустым элементом.
метод size ()
Метод size () используется для получения количества элементов в текущей очереди, но в параллельной среде результат может быть неточным, поскольку весь процесс не заблокирован, поэтому можно добавлять или удалять элементы с момента вызова метода size для возвращаемого результата, что приводит к статистике Количество элементов не является точным.
метод удаления (Object o)
Содержит (Object o) метод
Этот метод похож на метод size и может возвращать неверные результаты. Например, когда метод вызывается, элемент все еще находится в очереди, но во время процесса обхода элемент удаляется, и возвращается false.
подводить итоги
Реализация неблокирующего алгоритма ConcurrentLinkedQueue может быть суммирована как следующие 5 пунктов:
Ссылка
Фан Тенгфей: «Искусство параллельного программирования на Java»
Tech Tutorials
Tutorials and posts about Java, Spring, Hadoop and many more. Java code examples and interview questions. Spring code examples.
Saturday, April 10, 2021
ConcurrentLinkedQueue in Java With Examples
ConcurrentLinkedQueue in Java implements Queue interface and it was added in Java 5 along with other concurrent utilities like CyclicBarrier, CountDownLatch, Semaphore, ConcurrentHashMap etc.
ConcurrentLinkedQueue in Java is an unbounded thread-safe queue which stores its elements as linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is the element that has been on the queue the longest time. The tail of the queue is the element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
Like most other concurrent collection implementations, ConcurrentLinkedQueue class does not permit the use of null elements.
Usage of ConcurrentLinkedQueue in Java
A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. Note that it doesn’t block operations as it is done in the implementations of BlockingQueue interface like ArrayBlockingQueue, LinkedBlockingQueue. So there are no put() or take() methods which will wait if required.
Java ConcurrentLinkedQueue constructors
Java ConcurrentLinkedQueue Iterator
Iterators in ConcurrentLinkedQueue are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.
ConcurrentLinkedQueue Java Example
Let’s create a producer consumer Java program using ConcurrentLinkedQueue. In this code there will be one producer thread putting element into the queue and three consumer threads retrieving elements from the queue. Note that producer thread will put only 3 elements.
You can run the program multiple times and observe that two consumer threads are not retrieving the same element from the queue.
Later ConcurrentLinkedQueue is replaced with PriorityQueue which is not synchronized. Running this code will result in consumer threads getting into a race condition and overstepping on each other. Note that sometimes you may get correct output also but in multiple runs you are bound to get incorrect results.
Now let’s replace the ConcurrentLinkedQueue with PriorityQueue which is not synchronized.
Here it can be seen Consumer threads are getting the same element out of the queue.
That’s all for this topic ConcurrentLinkedQueue in Java With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!








