ReadableStream: примеры (JAVASCRIPT)

Работа с потоками данных: ReadableStream
Раздел: Streams API, Потоки чтения
ReadableStream(underlyingSource (object), strategy (object)): ReadableStream

Базовое описание ReadableStream

Объект ReadableStream является частью Streams API и представляет собой источник потоковых данных в JavaScript. Он используется для обработки больших объемов информации по частям (чанкам), что позволяет эффективно работать с сетевыми запросами, большими файлами или любыми другими данными, которые нецелесообразно загружать в память целиком.

Конструктор ReadableStream принимает необязательный объект-аргумент с тремя возможными свойствами:

  • start(controller): функция, вызываемая сразу при создании потока. Получает объект controller (ReadableStreamDefaultController или ReadableByteStreamController) для управления внутренней очередью. Может быть асинхронной.
  • pull(controller): (опционально) функция, вызываемая повторно, когда внутренняя очередь опустела, пока поток не будет закрыт или не отменен. Используется для управления наполнением потока.
  • cancel(reason): (опционально) функция, вызываемая при отмене потребления потока потребителем (например, вызовом cancel()).
  • type: (опционально) может быть 'bytes' для создания byte-oriented потока (ReadableByteStream), что дает более низкоуровневый контроль над передаваемыми данными.
  • autoAllocateChunkSize: (опционально) для byte streams определяет размер автоматически выделяемого буфера.

Основные методы экземпляра:

  • getReader(): создает и возвращает Reader (ReadableStreamDefaultReader или ReadableStreamBYOBReader) для чтения данных из потока. После вызова поток становится заблокированным для этого читателя.
  • pipeThrough(transformStream, options): передает поток через цепочку преобразований (TransformStream).
  • pipeTo(destination, options): направляет поток в WritableStream (например, в файл или другой поток).
  • tee(): разветвляет поток, возвращая массив из двух новых ReadableStream, которые идентичны оригиналу.
  • cancel(reason): отменяет поток.

ReadableStream используется в Fetch API (body ответа response.body), в работе с файлами через File API, и для создания пользовательских потоков данных.

Простые примеры использования

Создание простого потока из статических данных:

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('Привет, ');
    controller.enqueue('поток!');
    controller.close();
  }
});

const reader = stream.getReader();
reader.read().then(({ value, done }) => {
  console.log(value); // 'Привет, '
  return reader.read();
}).then(({ value, done }) => {
  console.log(value); // 'поток!'
  reader.releaseLock();
});
{ value: "Привет, ", done: false }
{ value: "поток!", done: false }
{ value: undefined, done: true }

Поток, генерирующий числа с задержкой:

const numberStream = new ReadableStream({
  start(controller) {
    let num = 1;
    const intervalId = setInterval(() => {
      if (num <= 5) {
        controller.enqueue(num++);
      } else {
        clearInterval(intervalId);
        controller.close();
      }
    }, 500);
  }
});

(async () => {
  const reader = numberStream.getReader();
  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    console.log(value); // 1, 2, 3, 4, 5 с интервалом 500 мс
  }
})();
1
2
3
4
5

Похожие API в JavaScript

Iterable (Async Iterable): объекты, реализующие протокол итерации ([Symbol.iterator] или [Symbol.asyncIterator]). Позволяют использовать циклы for...of и for await...of. Проще в использовании для синхронных или асинхронных коллекций, но не предоставляют стандартного механизма отмены или управления давлением (backpressure).

EventTarget / EventEmitter: классическая модель событий. Хорошо подходит для событийной архитектуры, но не стандартизирована для потоков данных и не имеет встроенной обработки backpressure.

XMLHttpRequest: более старый API для сетевых запросов. Может получать ответ как текст или бинарные данные, но не в потоковом режиме по умолчанию.

ReadableStream предпочтительнее, когда требуется контроль над потоком данных, обработка backpressure, работа с большими файлами или интеграция с современными API (Fetch, Service Workers, File System Access API). Async Iterable часто удобнее для простого последовательного чтения асинхронных данных, если не нужны возможности управления потоком.

Аналоги в других языках

Python (asyncio.StreamReader): часть asyncio для асинхронного чтения потоковых данных. Работает в связке с StreamWriter.

# Пример чтения из потока
import asyncio

async def read_stream():
    reader, writer = await asyncio.open_connection('example.com', 80)
    writer.write(b'GET / HTTP/1.1\r\nHost: example.com\r\n\r\n')
    await writer.drain()
    data = await reader.read(100)
    print(data)
    writer.close()
    await writer.wait_closed()

Go (io.Reader): интерфейс с методом Read(p []byte) (n int, err error). Крайне распространен в стандартной библиотеке. Работает с байтовыми срезами.

// Чтение из файла
package main
import (
    "fmt"
    "os"
)
func main() {
    file, _ := os.Open("file.txt")
    buffer := make([]byte, 1024)
    n, _ := file.Read(buffer)
    fmt.Println(string(buffer[:n]))
}

Rust (std::io::Read): трейт с методом read(&mut self, buf: &mut [u8]) -> Result<usize>. Аналогичен Go по концепции, но с использованием системы владения Rust.

// Чтение строки из stdin
use std::io::{self, Read};
fn main() -> io::Result<()> {
    let mut input = String::new();
    io::stdin().read_to_string(&mut input)?;
    println!("{}", input);
    Ok(())
}

Отличия от JavaScript: многие языки используют синхронные или блокирующие интерфейсы чтения, либо интерфейсы на основе байтовых массивов/срезов. ReadableStream интегрирован в асинхронную модель JavaScript и предоставляет более высокоуровневую абстракцию через контроллеры и читателей.

Распространенные ошибки

Попытка чтения без получения reader или после его освобождения: поток можно читать только после вызова getReader() и до вызова releaseLock().

const stream = new ReadableStream({/*...*/});
// Неправильно:
stream.read(); // TypeError: stream.read is not a function
// Правильно:
const reader = stream.getReader();
reader.read();

Игнорирование свойства done: может привести к бесконечному циклу или попытке обработать данные после завершения потока.

// Рискованный код
while (true) {
  const { value } = await reader.read(); // Если поток завершится, value будет undefined
  console.log(value.toUpperCase()); // Ошибка при value = undefined
}

Добавление данных в закрытый поток: после вызова controller.close() или controller.error() добавлять данные нельзя.

const stream = new ReadableStream({
  start(controller) {
    controller.close();
    controller.enqueue('данные'); // TypeError: Cannot enqueue into a closed stream
  }
});

Необработка ошибок потока: ошибки в источнике потока или при чтении должны быть обработаны.

const stream = new ReadableStream({
  start(controller) {
    throw new Error('Ошибка в источнике!');
  }
});
const reader = stream.getReader();
reader.read().catch(err => console.error('Поймана ошибка:', err));

Последние изменения

Спецификация Streams API продолжает развиваться. Одним из значительных недавних добавлений стала поддержка ReadableStream.from() (стадия предложения, но уже реализована в некоторых движках). Этот статический метод позволяет легко создавать поток из итерируемого или асинхронно итерируемого объекта.

// Создание потока из массива
const streamFromArray = ReadableStream.from([1, 2, 3]);
// Создание потока из асинхронного генератора
async function* asyncGen() {
  yield 'a'; yield 'b';
}
const streamFromAsyncGen = ReadableStream.from(asyncGen());

Также продолжается работа над улучшением интеграции с Async Iterable, включая более удобные методы потребления потоков как асинхронных итераторов (например, через Symbol.asyncIterator). В движках постоянно оптимизируется производительность и исправляются краевые случаи в реализации.

Расширенные примеры

Создание byte stream с использованием типа 'bytes': Позволяет работать с бинарными данными и использовать BYOB (Bring Your Own Buffer) чтение для эффективного управления памятью.

Пример javascript
const byteStream = new ReadableStream({
  type: 'bytes',
  start(controller) {
    // controller - ReadableByteStreamController
    const view = new Uint8Array([65, 66, 67, 68]); // "ABCD"
    controller.enqueue(view);
    controller.close();
  }
});

// BYOB чтение
const reader = byteStream.getReader({ mode: 'byob' });
const buffer = new ArrayBuffer(4);
const view = new Uint8Array(buffer);
reader.read(view).then(({ value, done }) => {
  console.log(new TextDecoder().decode(value)); // "ABCD"
});

Трансформация потока через pipeThrough:

Пример javascript
// Создаем TransformStream для преобразования текста в верхний регистр
const toUpperCaseTransformer = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toString().toUpperCase());
  }
});

const originalStream = new ReadableStream({
  start(controller) {
    controller.enqueue('привет');
    controller.enqueue('мир');
    controller.close();
  }
});

const transformedStream = originalStream.pipeThrough(toUpperCaseTransformer);

// Чтение из преобразованного потока
(async () => {
  const reader = transformedStream.getReader();
  for await (const chunk of reader) { // Используем асинхронную итерацию
    console.log(chunk); // 'ПРИВЕТ', 'МИР'
  }
})();

Отмена потока с логированием причины:

Пример javascript
const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('Данные 1');
    // Предположим, мы планируем добавить еще данных, но...
  },
  cancel(reason) {
    console.log(`Поток отменен по причине: ${reason}`);
    // Здесь можно освободить ресурсы (закрыть соединение и т.д.)
  }
});

const reader = stream.getReader();
reader.read(); // Читаем первый чанк
// Потребитель решает отменить поток
reader.cancel('Пользователь нажал отмену').then(() => {
  console.log('Отмена завершена');
});

Имитация чтения с сетевой задержкой и обработкой backpressure: Использование метода pull позволяет реагировать на скорость потребителя.

Пример javascript
let count = 0;
const slowStream = new ReadableStream({
  pull(controller) {
    // Этот метод вызывается, когда потребитель готов принять новые данные
    return new Promise(resolve => {
      setTimeout(() => {
        controller.enqueue(`Чанк #${++count}`);
        if (count >= 10) {
          controller.close();
        }
        resolve();
      }, 200); // Имитация задержки сети или обработки
    });
  }
});

// Потребитель читает медленнее, чем генерируются данные
(async () => {
  const reader = slowStream.getReader();
  for (let i = 0; i < 12; i++) { // Попробуем прочитать больше, чем есть
    const { value, done } = await reader.read();
    if (done) {
      console.log('Поток завершен.');
      break;
    }
    console.log(value);
    await new Promise(r => setTimeout(r, 300)); // Потребитель обрабатывает 300 мс
  }
})();

JS ReadableStream function comments

En
ReadableStream Represents a readable stream of data