ReadableStream: примеры (JAVASCRIPT)
ReadableStream(underlyingSource (object), strategy (object)): ReadableStreamБазовое описание ReadableStream
Объект ReadableStream является частью Streams API и представляет собой источник потоковых данных в JavaScript. Он используется для обработки больших объемов информации по частям (чанкам), что позволяет эффективно работать с сетевыми запросами, большими файлами или любыми другими данными, которые нецелесообразно загружать в память целиком.
Конструктор ReadableStream принимает необязательный объект-аргумент с тремя возможными свойствами:
- start(controller): функция, вызываемая сразу при создании потока. Получает объект
controller(ReadableStreamDefaultController или ReadableByteStreamController) для управления внутренней очередью. Может быть асинхронной. - pull(controller): (опционально) функция, вызываемая повторно, когда внутренняя очередь опустела, пока поток не будет закрыт или не отменен. Используется для управления наполнением потока.
- cancel(reason): (опционально) функция, вызываемая при отмене потребления потока потребителем (например, вызовом
cancel()). - type: (опционально) может быть
'bytes'для создания byte-oriented потока (ReadableByteStream), что дает более низкоуровневый контроль над передаваемыми данными. - autoAllocateChunkSize: (опционально) для byte streams определяет размер автоматически выделяемого буфера.
Основные методы экземпляра:
- getReader(): создает и возвращает Reader (ReadableStreamDefaultReader или ReadableStreamBYOBReader) для чтения данных из потока. После вызова поток становится заблокированным для этого читателя.
- pipeThrough(transformStream, options): передает поток через цепочку преобразований (TransformStream).
- pipeTo(destination, options): направляет поток в WritableStream (например, в файл или другой поток).
- tee(): разветвляет поток, возвращая массив из двух новых ReadableStream, которые идентичны оригиналу.
- cancel(reason): отменяет поток.
ReadableStream используется в Fetch API (body ответа response.body), в работе с файлами через File API, и для создания пользовательских потоков данных.
Простые примеры использования
Создание простого потока из статических данных:
const stream = new ReadableStream({
start(controller) {
controller.enqueue('Привет, ');
controller.enqueue('поток!');
controller.close();
}
});
const reader = stream.getReader();
reader.read().then(({ value, done }) => {
console.log(value); // 'Привет, '
return reader.read();
}).then(({ value, done }) => {
console.log(value); // 'поток!'
reader.releaseLock();
});{ value: "Привет, ", done: false }
{ value: "поток!", done: false }
{ value: undefined, done: true }Поток, генерирующий числа с задержкой:
const numberStream = new ReadableStream({
start(controller) {
let num = 1;
const intervalId = setInterval(() => {
if (num <= 5) {
controller.enqueue(num++);
} else {
clearInterval(intervalId);
controller.close();
}
}, 500);
}
});
(async () => {
const reader = numberStream.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) break;
console.log(value); // 1, 2, 3, 4, 5 с интервалом 500 мс
}
})();1 2 3 4 5
Похожие API в JavaScript
Iterable (Async Iterable): объекты, реализующие протокол итерации ([Symbol.iterator] или [Symbol.asyncIterator]). Позволяют использовать циклы for...of и for await...of. Проще в использовании для синхронных или асинхронных коллекций, но не предоставляют стандартного механизма отмены или управления давлением (backpressure).
EventTarget / EventEmitter: классическая модель событий. Хорошо подходит для событийной архитектуры, но не стандартизирована для потоков данных и не имеет встроенной обработки backpressure.
XMLHttpRequest: более старый API для сетевых запросов. Может получать ответ как текст или бинарные данные, но не в потоковом режиме по умолчанию.
ReadableStream предпочтительнее, когда требуется контроль над потоком данных, обработка backpressure, работа с большими файлами или интеграция с современными API (Fetch, Service Workers, File System Access API). Async Iterable часто удобнее для простого последовательного чтения асинхронных данных, если не нужны возможности управления потоком.
Аналоги в других языках
Python (asyncio.StreamReader): часть asyncio для асинхронного чтения потоковых данных. Работает в связке с StreamWriter.
# Пример чтения из потока
import asyncio
async def read_stream():
reader, writer = await asyncio.open_connection('example.com', 80)
writer.write(b'GET / HTTP/1.1\r\nHost: example.com\r\n\r\n')
await writer.drain()
data = await reader.read(100)
print(data)
writer.close()
await writer.wait_closed()Go (io.Reader): интерфейс с методом Read(p []byte) (n int, err error). Крайне распространен в стандартной библиотеке. Работает с байтовыми срезами.
// Чтение из файла
package main
import (
"fmt"
"os"
)
func main() {
file, _ := os.Open("file.txt")
buffer := make([]byte, 1024)
n, _ := file.Read(buffer)
fmt.Println(string(buffer[:n]))
}Rust (std::io::Read): трейт с методом read(&mut self, buf: &mut [u8]) -> Result<usize>. Аналогичен Go по концепции, но с использованием системы владения Rust.
// Чтение строки из stdin
use std::io::{self, Read};
fn main() -> io::Result<()> {
let mut input = String::new();
io::stdin().read_to_string(&mut input)?;
println!("{}", input);
Ok(())
}Отличия от JavaScript: многие языки используют синхронные или блокирующие интерфейсы чтения, либо интерфейсы на основе байтовых массивов/срезов. ReadableStream интегрирован в асинхронную модель JavaScript и предоставляет более высокоуровневую абстракцию через контроллеры и читателей.
Распространенные ошибки
Попытка чтения без получения reader или после его освобождения: поток можно читать только после вызова getReader() и до вызова releaseLock().
const stream = new ReadableStream({/*...*/});
// Неправильно:
stream.read(); // TypeError: stream.read is not a function
// Правильно:
const reader = stream.getReader();
reader.read();Игнорирование свойства done: может привести к бесконечному циклу или попытке обработать данные после завершения потока.
// Рискованный код
while (true) {
const { value } = await reader.read(); // Если поток завершится, value будет undefined
console.log(value.toUpperCase()); // Ошибка при value = undefined
}Добавление данных в закрытый поток: после вызова controller.close() или controller.error() добавлять данные нельзя.
const stream = new ReadableStream({
start(controller) {
controller.close();
controller.enqueue('данные'); // TypeError: Cannot enqueue into a closed stream
}
});Необработка ошибок потока: ошибки в источнике потока или при чтении должны быть обработаны.
const stream = new ReadableStream({
start(controller) {
throw new Error('Ошибка в источнике!');
}
});
const reader = stream.getReader();
reader.read().catch(err => console.error('Поймана ошибка:', err));Последние изменения
Спецификация Streams API продолжает развиваться. Одним из значительных недавних добавлений стала поддержка ReadableStream.from() (стадия предложения, но уже реализована в некоторых движках). Этот статический метод позволяет легко создавать поток из итерируемого или асинхронно итерируемого объекта.
// Создание потока из массива
const streamFromArray = ReadableStream.from([1, 2, 3]);
// Создание потока из асинхронного генератора
async function* asyncGen() {
yield 'a'; yield 'b';
}
const streamFromAsyncGen = ReadableStream.from(asyncGen());Также продолжается работа над улучшением интеграции с Async Iterable, включая более удобные методы потребления потоков как асинхронных итераторов (например, через Symbol.asyncIterator). В движках постоянно оптимизируется производительность и исправляются краевые случаи в реализации.
Расширенные примеры
Создание byte stream с использованием типа 'bytes': Позволяет работать с бинарными данными и использовать BYOB (Bring Your Own Buffer) чтение для эффективного управления памятью.
const byteStream = new ReadableStream({
type: 'bytes',
start(controller) {
// controller - ReadableByteStreamController
const view = new Uint8Array([65, 66, 67, 68]); // "ABCD"
controller.enqueue(view);
controller.close();
}
});
// BYOB чтение
const reader = byteStream.getReader({ mode: 'byob' });
const buffer = new ArrayBuffer(4);
const view = new Uint8Array(buffer);
reader.read(view).then(({ value, done }) => {
console.log(new TextDecoder().decode(value)); // "ABCD"
});Трансформация потока через pipeThrough:
// Создаем TransformStream для преобразования текста в верхний регистр
const toUpperCaseTransformer = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toString().toUpperCase());
}
});
const originalStream = new ReadableStream({
start(controller) {
controller.enqueue('привет');
controller.enqueue('мир');
controller.close();
}
});
const transformedStream = originalStream.pipeThrough(toUpperCaseTransformer);
// Чтение из преобразованного потока
(async () => {
const reader = transformedStream.getReader();
for await (const chunk of reader) { // Используем асинхронную итерацию
console.log(chunk); // 'ПРИВЕТ', 'МИР'
}
})();Отмена потока с логированием причины:
const stream = new ReadableStream({
start(controller) {
controller.enqueue('Данные 1');
// Предположим, мы планируем добавить еще данных, но...
},
cancel(reason) {
console.log(`Поток отменен по причине: ${reason}`);
// Здесь можно освободить ресурсы (закрыть соединение и т.д.)
}
});
const reader = stream.getReader();
reader.read(); // Читаем первый чанк
// Потребитель решает отменить поток
reader.cancel('Пользователь нажал отмену').then(() => {
console.log('Отмена завершена');
});Имитация чтения с сетевой задержкой и обработкой backpressure: Использование метода pull позволяет реагировать на скорость потребителя.
let count = 0;
const slowStream = new ReadableStream({
pull(controller) {
// Этот метод вызывается, когда потребитель готов принять новые данные
return new Promise(resolve => {
setTimeout(() => {
controller.enqueue(`Чанк #${++count}`);
if (count >= 10) {
controller.close();
}
resolve();
}, 200); // Имитация задержки сети или обработки
});
}
});
// Потребитель читает медленнее, чем генерируются данные
(async () => {
const reader = slowStream.getReader();
for (let i = 0; i < 12; i++) { // Попробуем прочитать больше, чем есть
const { value, done } = await reader.read();
if (done) {
console.log('Поток завершен.');
break;
}
console.log(value);
await new Promise(r => setTimeout(r, 300)); // Потребитель обрабатывает 300 мс
}
})();