BlockingQueue: примеры (JAVA)
BlockingQueue: interfaceОбщее описание BlockingQueue
Интерфейс java.util.concurrent.BlockingQueue<E> представляет очередь, поддерживающую операции, которые блокируют поток при попытке добавить элемент в полную очередь или извлечь элемент из пустой очереди. Подходит для сценариев «производитель-потребитель», когда требуется синхронизация доступа к очереди между потоками.
Когда применяется
- Организация передачи задач между потоками с ожиданием появления элементов.
- Реализация ограниченного буфера для контроля нагрузки.
- Планирование отложенных задач и приоритетная обработка через специальные реализации.
Ключевые методы интерфейса и их поведение
boolean add(E e)- добавляет элемент немедленно, если место есть; при переполнении бросаетIllegalStateException(неконстантный метод для нелимитированных реализаций может всегда возвращать true).boolean offer(E e)- пытается добавить элемент немедленно; возвращаетtrueпри успехе,falseпри переполнении.boolean offer(E e, long timeout, TimeUnit unit)- пытается добавить элемент в течение указанного времени; возвращаетtrueпри успехе,falseпо таймауту; может бросатьInterruptedException.void put(E e)- блокирующее добавление; ожидает пока освободится место; может бросатьInterruptedException.E take()- блокирующее извлечение первого элемента; ожидает появления элемента; может бросатьInterruptedException.E poll()- немедленное извлечение; возвращаетnullесли очередь пуста.E poll(long timeout, TimeUnit unit)- ждет элемент до заданного времени; возвращает элемент илиnullпо таймауту; может бросатьInterruptedException.E peek()- возвращает элемент без удаления;nullесли пусто.int remainingCapacity()- возвращает количество свободного места для ограниченных реализаций (например,ArrayBlockingQueue), для неограниченных реализаций может возвращать большое значение илиInteger.MAX_VALUE.int drainTo(Collection<? super E> c)иint drainTo(Collection<? super E> c, int maxElements)- массовое извлечение элементов в коллекцию; полезно для пакетной обработки.
Возвращаемые значения и исключения
- Методы типа
offerиaddвозвращают булевый результат по успешности.addможет бросать исключение при отсутствии места. - Методы с таймаутом возвращают результат операции (элемент или булево значение) или специальное значение при таймауте (
falseилиnull). - Операции блокировки (
put,take, таймаутные методы) могут бросатьInterruptedException, если поток прерывается. - Некоторые реализации (например,
PriorityBlockingQueue) не поддерживают порядок FIFO; у этих реализаций поведениеpeek/takeсоответствует порядку приоритета.
Основные реализации
ArrayBlockingQueue- ограниченная очередь на основе массива, обеспечивает порядок FIFO, хороша при фиксированной емкости.LinkedBlockingQueue- может быть ограниченной или неограниченной, основана на связном списке; высокая пропускная способность для параллельных производителей и потребителей.PriorityBlockingQueue- неблокирующая по емкости приоритетная очередь; элементы извлекаются согласно компаратору.DelayQueue- очередь отложенных элементов (элементы становятся доступны после наступления задержки).SynchronousQueue- не хранит элементов; каждыйputдолжен встретить соответствующийtake; удобна для прямой передачи задач.LinkedTransferQueue- поддерживает transfer-операции, позволяет блокирующе ожидать приема элемента потребителем.
Короткие примеры использования
Пример 1: базовый producer/consumer с LinkedBlockingQueue
import java.util.concurrent.*;
public class Example1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> q = new LinkedBlockingQueue<>(2);
q.put("A");
q.put("B");
// следующий put будет ждать освобождения места
new Thread(() -> {
try {
q.put("C");
System.out.println("Добавлен C");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
Thread.sleep(200);
System.out.println(q.take());
}
}
A Добавлен C
Пример 2: offer с таймаутом и проверка результата
BlockingQueue<Integer> q = new ArrayBlockingQueue<>(1);
q.offer(1);
boolean ok = q.offer(2, 1, java.util.concurrent.TimeUnit.SECONDS);
System.out.println(ok);
false
Пример 3: poll и peek
BlockingQueue<String> q = new LinkedBlockingQueue<>();
System.out.println(q.peek());
System.out.println(q.poll());
q.offer("X");
System.out.println(q.peek());
System.out.println(q.poll());
null null X X
Похожие механизмы в Java и их особенности
- ConcurrentLinkedQueue
- BlockingDeque
- SynchronousQueue
- DelayQueue и PriorityBlockingQueue
Неблокирующая очередь без ограничений. Подход для сценариев с низкой задержкой, где не нужен блокирующий характер операций. Не обеспечивает ожидание при пустой очереди.
Двунаправленная блокирующая очередь. Предпочтительнее при необходимости добавления/извлечения с обоих концов (LIFO/сложные стратегии).
Специальный случай очереди без внутреннего буфера для прямой передачи. Полезна при точечной передаче задач между потоками.
Специализированные реализации: первая для задач со временем задержки, вторая для приоритетной обработки. Выбор зависит от требуемого порядка извлечения.
Выбор между этими типами зависит от требований по порядку, ограничению емкости и поведения при отсутствии элементов. Для надежной блокировки и управления емкостью чаще выбирается BlockingQueue; для неблокирующих сценариев с низкой задержкой - ConcurrentLinkedQueue.
Аналоги BlockingQueue в других языках
- Python
Модуль queue.Queue для потоков и asyncio.Queue для корутин. Поведение очень похоже: методы put/get блокируют или поддерживают таймаут.
from queue import Queue
q = Queue(maxsize=2)
q.put(1)
q.put(2)
# следующий put будет ждать
(программа блокируется до освобождения места)
Класс BlockingCollection<T> поверх IProducerConsumerCollection<T>. Поддерживает блокировку, таймауты и завершение добавления.
var bc = new BlockingCollection<int>(2);
bc.Add(1);
bc.Add(2);
// следующий Add блокирует
(ожидание до освобождения места)
Канал (chan) обеспечивает передачу между горутинами; буферизованный канал ведёт себя как ограниченная очередь, неблокирующие операции возможны с использованием select.
ch := make(chan int, 2)
ch <- 1
ch <- 2
// следующий отправитель блокируется
(ожидание до чтения из канала)
В корутинах используется kotlinx.coroutines.channels.Channel и ReceiveChannel/SendChannel. Похож на Go-каналы, поддерживает буфер и блокировку вызовов в корутинах.
val ch = Channel<Int>(2)
runBlocking {
ch.send(1)
ch.send(2)
// дальше send приостановится
}
(корутина приостановится до recv)
Нет встроенной блокирующей очереди; симуляция через промисы, async/await или библиотеки (например, async.queue). В среде с потоками можно использовать Atomics/SharedArrayBuffer или worker_threads для синхронизации.
// Простейшая очередь на промисах
class AsyncQueue {
constructor(){ this._items = []; this._resolvers = []; }
push(v){ if(this._resolvers.length) this._resolvers.shift()(v); else this._items.push(v); }
async pop(){ if(this._items.length) return this._items.shift(); return await new Promise(r => this._resolvers.push(r)); }
}
(pop будет ждать, если очередь пуста)
В стандартной установке нет потоко-безопасной блокирующей очереди; в расширениях pthreads или параллельных библиотеках предоставляются примитивы или очереди через разделяемую память.
Отличия от Java: в большинстве языков блокирующие очереди встроены в стандартную библиотеку (Python, C#) либо реализуются через каналы (Go, Kotlin). В Java доступны разные реализации с хорошо продуманной семантикой таймаутов и прерываний.
Типичные ошибки при работе с BlockingQueue
- Игнорирование InterruptedException
Многие методы бросают InterruptedException. Частая ошибка - пустой catch без восстановления флага прерывания.
try {
queue.put(x);
} catch (InterruptedException e) {
// Неправильно: ничего не делается
}
(поток теряет информацию о прерывании)
Правильнее: восстановить флаг Thread.currentThread().interrupt() или корректно обработать прерывание.
Применение неограниченной очереди (например, LinkedBlockingQueue без ограничения) в случае чрезмерного поступления задач ведёт к потреблению всего доступного хипа.
Например, ожидание результата offer как у put - offer может просто вернуть false, а put блокирует. Неправильная проверка булевых возвращаемых значений приводит к потере данных.
Неправильная логика нескольких очередей и блокировок приводит к дедлоку, например, если один поток ждёт освобождения первой очереди и одновременно держит ресурс второй.
// Упрощённый пример дедлока: два потока и две очереди
BlockingQueue<Integer> q1 = new ArrayBlockingQueue<>(1);
BlockingQueue<Integer> q2 = new ArrayBlockingQueue<>(1);
// Поток A: q1.put(1); q2.put(2);
// Поток B: q2.put(3); q1.put(4);
// При одновременном выполнении возможно взаимное ожидание
(оба потока блокируются)
Изменения и эволюция в JDK
- История
- Новые реализации и дополнения
- Совместимость
Интерфейс BlockingQueue введён в JDK 5 в пакете java.util.concurrent как часть набора утилит для конкурентного программирования.
В последующих версиях JDK появились дополнительные реализации и структуры, расширяющие возможности: DelayQueue, PriorityBlockingQueue, LinkedTransferQueue и другие. Интерфейс как таковой не претерпел значительных изменений и остаётся стабильным API.
Методы и семантика сохраняются обратно совместимыми, улучшения в JDK касаются производительности и оптимизации конкретных реализаций.
Расширенные и нестандартные примеры
Пример A: DrainTo для пакетной записи в базу
BlockingQueue<String> q = new LinkedBlockingQueue<>();
// Наполнение
q.offer("a"); q.offer("b"); q.offer("c");
List<String> batch = new ArrayList<>();
int drained = q.drainTo(batch, 2); // забирает до 2 элементов
System.out.println(drained + ", batch=" + batch);
System.out.println("Осталось: " + q.size());
2, batch=[a, b] Осталось: 1
Пример B: DelayQueue для планировщика задач
import java.util.concurrent.*;
class Task implements Delayed {
private final long startTime;
private final String name;
Task(String name, long delayMs){ this.name = name; this.startTime = System.currentTimeMillis() + delayMs; }
public long getDelay(TimeUnit unit){ return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }
public int compareTo(Delayed o){ return Long.compare(this.startTime, ((Task)o).startTime); }
public String toString(){ return name; }
}
public class DelayExample{
public static void main(String[] args) throws InterruptedException{
DelayQueue<Task> dq = new DelayQueue<>();
dq.put(new Task("T1", 500));
dq.put(new Task("T2", 100));
System.out.println(dq.take());
System.out.println(dq.take());
}
}
T2 T1
Пример C: SynchronousQueue для хэнд-оффа
import java.util.concurrent.*;
public class SyncQ {
public static void main(String[] args) {
SynchronousQueue<String> sq = new SynchronousQueue<>();
new Thread(() -> {
try { System.out.println("Producer: putting"); sq.put("DATA"); System.out.println("Producer: done"); }
catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}).start();
new Thread(() -> {
try { Thread.sleep(200); System.out.println("Consumer: " + sq.take()); }
catch (Exception e) { }
}).start();
}
}
Producer: putting Consumer: DATA Producer: done
Пример D: LinkedTransferQueue и немедленная передача
import java.util.concurrent.*;
public class TransferExample{
public static void main(String[] args) throws InterruptedException{
LinkedTransferQueue<String> ltq = new LinkedTransferQueue<>();
new Thread(() -> {
try { System.out.println("Producer waiting to transfer"); ltq.transfer("P"); System.out.println("Producer transferred"); }
catch (InterruptedException e){ Thread.currentThread().interrupt(); }
}).start();
Thread.sleep(200);
System.out.println("Consumer: " + ltq.take());
}
}
Producer waiting to transfer Consumer: P Producer transferred
Пример E: управление нагрузкой - bounded queue и thread pool
ThreadPoolExecutor exec = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
// при перегрузке задачи будут либо отклоняться, либо блокировать отправителя в зависимости от RejectedExecutionHandler
(task submission может быть отклонено при полном пуле и настроенном handler)
Эти примеры иллюстрируют разные способы применения блокирующих очередей от простых паттернов до более редких случаев прямой передачи и отложенных задач.