BlockingQueue: примеры (JAVA)

Разбор принципов работы BlockingQueue
Раздел: Многопоточность, Коллекции
BlockingQueue: interface

Общее описание BlockingQueue

Интерфейс java.util.concurrent.BlockingQueue<E> представляет очередь, поддерживающую операции, которые блокируют поток при попытке добавить элемент в полную очередь или извлечь элемент из пустой очереди. Подходит для сценариев «производитель-потребитель», когда требуется синхронизация доступа к очереди между потоками.

Когда применяется

  • Организация передачи задач между потоками с ожиданием появления элементов.
  • Реализация ограниченного буфера для контроля нагрузки.
  • Планирование отложенных задач и приоритетная обработка через специальные реализации.

Ключевые методы интерфейса и их поведение

  • boolean add(E e) - добавляет элемент немедленно, если место есть; при переполнении бросает IllegalStateException (неконстантный метод для нелимитированных реализаций может всегда возвращать true).
  • boolean offer(E e) - пытается добавить элемент немедленно; возвращает true при успехе, false при переполнении.
  • boolean offer(E e, long timeout, TimeUnit unit) - пытается добавить элемент в течение указанного времени; возвращает true при успехе, false по таймауту; может бросать InterruptedException.
  • void put(E e) - блокирующее добавление; ожидает пока освободится место; может бросать InterruptedException.
  • E take() - блокирующее извлечение первого элемента; ожидает появления элемента; может бросать InterruptedException.
  • E poll() - немедленное извлечение; возвращает null если очередь пуста.
  • E poll(long timeout, TimeUnit unit) - ждет элемент до заданного времени; возвращает элемент или null по таймауту; может бросать InterruptedException.
  • E peek() - возвращает элемент без удаления; null если пусто.
  • int remainingCapacity() - возвращает количество свободного места для ограниченных реализаций (например, ArrayBlockingQueue), для неограниченных реализаций может возвращать большое значение или Integer.MAX_VALUE.
  • int drainTo(Collection<? super E> c) и int drainTo(Collection<? super E> c, int maxElements) - массовое извлечение элементов в коллекцию; полезно для пакетной обработки.

Возвращаемые значения и исключения

  • Методы типа offer и add возвращают булевый результат по успешности. add может бросать исключение при отсутствии места.
  • Методы с таймаутом возвращают результат операции (элемент или булево значение) или специальное значение при таймауте (false или null).
  • Операции блокировки (put, take, таймаутные методы) могут бросать InterruptedException, если поток прерывается.
  • Некоторые реализации (например, PriorityBlockingQueue) не поддерживают порядок FIFO; у этих реализаций поведение peek/take соответствует порядку приоритета.

Основные реализации

  • ArrayBlockingQueue - ограниченная очередь на основе массива, обеспечивает порядок FIFO, хороша при фиксированной емкости.
  • LinkedBlockingQueue - может быть ограниченной или неограниченной, основана на связном списке; высокая пропускная способность для параллельных производителей и потребителей.
  • PriorityBlockingQueue - неблокирующая по емкости приоритетная очередь; элементы извлекаются согласно компаратору.
  • DelayQueue - очередь отложенных элементов (элементы становятся доступны после наступления задержки).
  • SynchronousQueue - не хранит элементов; каждый put должен встретить соответствующий take; удобна для прямой передачи задач.
  • LinkedTransferQueue - поддерживает transfer-операции, позволяет блокирующе ожидать приема элемента потребителем.

Короткие примеры использования

Пример 1: базовый producer/consumer с LinkedBlockingQueue

import java.util.concurrent.*;

public class Example1 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> q = new LinkedBlockingQueue<>(2);
        q.put("A");
        q.put("B");
        // следующий put будет ждать освобождения места
        new Thread(() -> {
            try {
                q.put("C");
                System.out.println("Добавлен C");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
        Thread.sleep(200);
        System.out.println(q.take());
    }
}
A
Добавлен C

Пример 2: offer с таймаутом и проверка результата

BlockingQueue<Integer> q = new ArrayBlockingQueue<>(1);
q.offer(1);
boolean ok = q.offer(2, 1, java.util.concurrent.TimeUnit.SECONDS);
System.out.println(ok);
false

Пример 3: poll и peek

BlockingQueue<String> q = new LinkedBlockingQueue<>();
System.out.println(q.peek());
System.out.println(q.poll());
q.offer("X");
System.out.println(q.peek());
System.out.println(q.poll());
null
null
X
X

Похожие механизмы в Java и их особенности

  • ConcurrentLinkedQueue
  • Неблокирующая очередь без ограничений. Подход для сценариев с низкой задержкой, где не нужен блокирующий характер операций. Не обеспечивает ожидание при пустой очереди.

  • BlockingDeque
  • Двунаправленная блокирующая очередь. Предпочтительнее при необходимости добавления/извлечения с обоих концов (LIFO/сложные стратегии).

  • SynchronousQueue
  • Специальный случай очереди без внутреннего буфера для прямой передачи. Полезна при точечной передаче задач между потоками.

  • DelayQueue и PriorityBlockingQueue
  • Специализированные реализации: первая для задач со временем задержки, вторая для приоритетной обработки. Выбор зависит от требуемого порядка извлечения.

Выбор между этими типами зависит от требований по порядку, ограничению емкости и поведения при отсутствии элементов. Для надежной блокировки и управления емкостью чаще выбирается BlockingQueue; для неблокирующих сценариев с низкой задержкой - ConcurrentLinkedQueue.

Аналоги BlockingQueue в других языках

  • Python
  • Модуль queue.Queue для потоков и asyncio.Queue для корутин. Поведение очень похоже: методы put/get блокируют или поддерживают таймаут.

    from queue import Queue
    q = Queue(maxsize=2)
    q.put(1)
    q.put(2)
    # следующий put будет ждать
    (программа блокируется до освобождения места)
  • C#
  • Класс BlockingCollection<T> поверх IProducerConsumerCollection<T>. Поддерживает блокировку, таймауты и завершение добавления.

    var bc = new BlockingCollection<int>(2);
    bc.Add(1);
    bc.Add(2);
    // следующий Add блокирует
    (ожидание до освобождения места)
  • Go
  • Канал (chan) обеспечивает передачу между горутинами; буферизованный канал ведёт себя как ограниченная очередь, неблокирующие операции возможны с использованием select.

    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    // следующий отправитель блокируется
    (ожидание до чтения из канала)
  • Kotlin
  • В корутинах используется kotlinx.coroutines.channels.Channel и ReceiveChannel/SendChannel. Похож на Go-каналы, поддерживает буфер и блокировку вызовов в корутинах.

    val ch = Channel<Int>(2)
    runBlocking {
      ch.send(1)
      ch.send(2)
      // дальше send приостановится
    }
    (корутина приостановится до recv)
  • JavaScript (Node.js)
  • Нет встроенной блокирующей очереди; симуляция через промисы, async/await или библиотеки (например, async.queue). В среде с потоками можно использовать Atomics/SharedArrayBuffer или worker_threads для синхронизации.

    // Простейшая очередь на промисах
    class AsyncQueue {
      constructor(){ this._items = []; this._resolvers = []; }
      push(v){ if(this._resolvers.length) this._resolvers.shift()(v); else this._items.push(v); }
      async pop(){ if(this._items.length) return this._items.shift(); return await new Promise(r => this._resolvers.push(r)); }
    }
    
    (pop будет ждать, если очередь пуста)
  • PHP
  • В стандартной установке нет потоко-безопасной блокирующей очереди; в расширениях pthreads или параллельных библиотеках предоставляются примитивы или очереди через разделяемую память.

Отличия от Java: в большинстве языков блокирующие очереди встроены в стандартную библиотеку (Python, C#) либо реализуются через каналы (Go, Kotlin). В Java доступны разные реализации с хорошо продуманной семантикой таймаутов и прерываний.

Типичные ошибки при работе с BlockingQueue

  • Игнорирование InterruptedException
  • Многие методы бросают InterruptedException. Частая ошибка - пустой catch без восстановления флага прерывания.

    try {
        queue.put(x);
    } catch (InterruptedException e) {
        // Неправильно: ничего не делается
    }
    (поток теряет информацию о прерывании)

    Правильнее: восстановить флаг Thread.currentThread().interrupt() или корректно обработать прерывание.

  • Использование ненадёжной реализации для ограничений
  • Применение неограниченной очереди (например, LinkedBlockingQueue без ограничения) в случае чрезмерного поступления задач ведёт к потреблению всего доступного хипа.

  • Смешивание методов с различными семантиками
  • Например, ожидание результата offer как у put - offer может просто вернуть false, а put блокирует. Неправильная проверка булевых возвращаемых значений приводит к потере данных.

  • Возможность взаимной блокировки
  • Неправильная логика нескольких очередей и блокировок приводит к дедлоку, например, если один поток ждёт освобождения первой очереди и одновременно держит ресурс второй.

    // Упрощённый пример дедлока: два потока и две очереди
    BlockingQueue<Integer> q1 = new ArrayBlockingQueue<>(1);
    BlockingQueue<Integer> q2 = new ArrayBlockingQueue<>(1);
    // Поток A: q1.put(1); q2.put(2);
    // Поток B: q2.put(3); q1.put(4);
    // При одновременном выполнении возможно взаимное ожидание
    (оба потока блокируются)

Изменения и эволюция в JDK

  • История
  • Интерфейс BlockingQueue введён в JDK 5 в пакете java.util.concurrent как часть набора утилит для конкурентного программирования.

  • Новые реализации и дополнения
  • В последующих версиях JDK появились дополнительные реализации и структуры, расширяющие возможности: DelayQueue, PriorityBlockingQueue, LinkedTransferQueue и другие. Интерфейс как таковой не претерпел значительных изменений и остаётся стабильным API.

  • Совместимость
  • Методы и семантика сохраняются обратно совместимыми, улучшения в JDK касаются производительности и оптимизации конкретных реализаций.

Расширенные и нестандартные примеры

Пример A: DrainTo для пакетной записи в базу

Пример java
BlockingQueue<String> q = new LinkedBlockingQueue<>();
// Наполнение
q.offer("a"); q.offer("b"); q.offer("c");
List<String> batch = new ArrayList<>();
int drained = q.drainTo(batch, 2); // забирает до 2 элементов
System.out.println(drained + ", batch=" + batch);
System.out.println("Осталось: " + q.size());
2, batch=[a, b]
Осталось: 1

Пример B: DelayQueue для планировщика задач

Пример java
import java.util.concurrent.*;

class Task implements Delayed {
    private final long startTime;
    private final String name;
    Task(String name, long delayMs){ this.name = name; this.startTime = System.currentTimeMillis() + delayMs; }
    public long getDelay(TimeUnit unit){ return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }
    public int compareTo(Delayed o){ return Long.compare(this.startTime, ((Task)o).startTime); }
    public String toString(){ return name; }
}

public class DelayExample{
  public static void main(String[] args) throws InterruptedException{
    DelayQueue<Task> dq = new DelayQueue<>();
    dq.put(new Task("T1", 500));
    dq.put(new Task("T2", 100));
    System.out.println(dq.take());
    System.out.println(dq.take());
  }
}
T2
T1

Пример C: SynchronousQueue для хэнд-оффа

Пример java
import java.util.concurrent.*;

public class SyncQ {
  public static void main(String[] args) {
    SynchronousQueue<String> sq = new SynchronousQueue<>();
    new Thread(() -> {
      try { System.out.println("Producer: putting"); sq.put("DATA"); System.out.println("Producer: done"); }
      catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    }).start();
    new Thread(() -> {
      try { Thread.sleep(200); System.out.println("Consumer: " + sq.take()); }
      catch (Exception e) { }
    }).start();
  }
}
Producer: putting
Consumer: DATA
Producer: done

Пример D: LinkedTransferQueue и немедленная передача

Пример java
import java.util.concurrent.*;

public class TransferExample{
  public static void main(String[] args) throws InterruptedException{
    LinkedTransferQueue<String> ltq = new LinkedTransferQueue<>();
    new Thread(() -> {
      try { System.out.println("Producer waiting to transfer"); ltq.transfer("P"); System.out.println("Producer transferred"); }
      catch (InterruptedException e){ Thread.currentThread().interrupt(); }
    }).start();
    Thread.sleep(200);
    System.out.println("Consumer: " + ltq.take());
  }
}
Producer waiting to transfer
Consumer: P
Producer transferred

Пример E: управление нагрузкой - bounded queue и thread pool

Пример java
ThreadPoolExecutor exec = new ThreadPoolExecutor(
  2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
// при перегрузке задачи будут либо отклоняться, либо блокировать отправителя в зависимости от RejectedExecutionHandler
(task submission может быть отклонено при полном пуле и настроенном handler)

Эти примеры иллюстрируют разные способы применения блокирующих очередей от простых паттернов до более редких случаев прямой передачи и отложенных задач.

джава BlockingQueue function comments

En
BlockingQueue Интерфейс блокирующей очереди