Практические приёмы работы с большими файлами в Python
Обработка больших файлов в Python
Как обработать файл, который не помещается в оперативную память?
Наиболее эффективное решение для работы с большими файлами – чтение данных порциями (chunks) с использованием итераторов. Это позволяет минимизировать потребление памяти и обрабатывать файлы любого размера. Рекомендуется использовать менеджер контекста with open(...) as f для автоматического закрытия файла.
with open('big_file.txt', 'r') as f:
for chunk in iter(lambda: f.read(65536), ''):
process(chunk) # ваша функция обработки
ввод программ на python (ввод данных в программе python)
Пояснение:
- Файл открывается в режиме чтения.
iter(lambda: f.read(65536), '')создаёт итератор, который читает по 64 КБ до конца файла.- Каждый чанк передаётся в функцию
process(), которая может, например, фильтровать или агрегировать данные.
Типичные ошибки и их решение:
- Ошибка: MemoryError при чтении всего файла сразу – решение: использовать чтение по частям, как показано выше.
- Проблема: неправильная кодировка – при открытии файла явно указывать
encoding='utf-8'(или другую кодировку). - Ошибка: забыли закрыть файл – всегда использовать менеджер контекста
with.
Как построчно обрабатывать огромный лог-файл?
Если каждая строка содержит отдельную запись, идеально подходит итерация по строкам через for line in f:
with open('server.log', 'r', encoding='utf-8') as f:
for line in f:
if 'ERROR' in line:
print(line.strip())
Python file io (ввод-вывод файлов в python)
Этот метод эффективен, так как Python читает строку только тогда, когда она нужна.
Как получить быстрый случайный доступ к большим бинарным файлам?
Для этого применяется memory-mapped файл (mmap). Файл отображается в адресное пространство процесса, и данные читаются как массив байт.
import mmap
with open('large_binary.dat', 'r+b') as f:
with mmap.mmap(f.fileno(), length=0, access=mmap.ACCESS_READ) as mm:
# Чтение данных с позиции 1024
data = mm[1024:2048]
print(data)
Python temp files (временные файлы в python)
Важно:
- Размер отображения задаётся параметром
length=0(весь файл). - Для больших файлов используйте
ACCESS_READво избежание случайного изменения.
Типичная ошибка: FileNotFoundError при открытии
Файл должен существовать, иначе mmap вызовет исключение. Предварительно проверяйте наличие файла.
Как обработать большой CSV-файл по частям с помощью pandas?
Библиотека pandas предоставляет параметр chunksize, возвращающий итератор по датафреймам:
import pandas as pd
for chunk in pd.read_csv('huge_data.csv', chunksize=5000, encoding='utf-8'):
filtered = chunk[chunk['value'] > 100]
print(len(filtered))
Python index files (индексация файлов в python)
Каждый chunk – DataFrame размером до 5000 строк. Это позволяет обработать CSV любого размера.
Как ускорить обработку большого файла с помощью многопроцессорности?
Если операции над данными независимы, можно разделить файл на части и обработать их параллельно. Пример с multiprocessing.Pool:
import multiprocessing as mp
def process_chunk(chunk):
# имитация обработки
return len(chunk)
with open('big.txt', 'r') as f:
lines = f.readlines() # только для малых файлов!
# Для больших – разбиение по чанкам
# Разделяем на 4 части
chunks = [lines[i::4] for i in range(4)]
with mp.Pool(processes=4) as pool:
results = pool.map(process_chunk, chunks)
print(sum(results))
Внимание:
- Загрузка всего файла в память (readlines) допустима только если файл помещается. Для действительно больших файлов следует разбивать на физические части (например, по номеру строки с помощью seek).
- Потоки (threading) не помогают из-за GIL, для CPU-ёмких задач лучше процессы.
Проблема: дублирование данных при параллельной обработке
Необходимо гарантировать, что каждая строка обрабатывается ровно один раз. Используйте смещения (offset) или деление на непересекающиеся блоки.
Расширенные примеры обработки больших файлов
Пример 1: Фильтрация строк с использованием memory-mapped файла
import mmap
import re
def grep_mmap(pattern, filename):
with open(filename, 'r', encoding='utf-8') as f:
with mmap.mmap(f.fileno(), length=0, access=mmap.ACCESS_READ) as mm:
for match in re.finditer(pattern.encode(), mm):
start = max(0, match.start() - 50)
end = min(len(mm), match.end() + 50)
yield mm[start:end].decode('utf-8', errors='ignore')
for context in grep_mmap('ERROR', 'server.log'):
print(context[:100])
Результат:
2024-01-15 10:23:45 ERROR Connection refused from 192.168.1.1 2024-01-15 10:24:01 ERROR Timeout while waiting for response ...
Этот пример выполняет эффективный grep по большому файлу без загрузки всего файла в память. Memory-mapped файл позволяет искать по всему файлу как по массиву байт, а yield отдаёт контекст вокруг найденного совпадения.
Пример 2: Потоковая обработка JSON-логов (построчное чтение)
import json
with open('events.jsonl', 'r', encoding='utf-8') as f:
for line in f:
try:
event = json.loads(line)
if event.get('type') == 'purchase':
amount = event.get('amount', 0)
print(f"Purchase: {amount}")
except json.JSONDecodeError:
print(f"Skipping invalid line: {line[:100]}...")
Результат:
Purchase: 42.5
Purchase: 100.0
Skipping invalid line: {malformed json...
Каждая строка содержит отдельный JSON-объект. Построчное чтение позволяет обработать файл любого размера, а обработка исключений защищает от повреждённых строк.
Пример 3: Разбиение большого бинарного файла на блоки и параллельная обработка
import os
import multiprocessing as mp
def process_block(filename, start, size):
with open(filename, 'rb') as f:
f.seek(start)
data = f.read(size)
# здесь выполняется вычисление (например, хэш)
return hashlib.md5(data).hexdigest()
if __name__ == '__main__':
filename = 'large_file.bin'
block_size = 1024 * 1024 # 1 MB
file_size = os.path.getsize(filename)
offsets = [(filename, i, min(block_size, file_size - i))
for i in range(0, file_size, block_size)]
with mp.Pool(processes=4) as pool:
results = pool.starmap(process_block, offsets)
print(results[:5]) # первые 5 хэшей
Результат (первые 5 хэшей):
['d41d8cd98f00b204e9800998ecf8427e', 'f67a0b6b2e39a5d7c8e4b9a3f2c1d0e1', ...]
Файл разбивается на непересекающиеся блоки фиксированного размера (последний блок может быть меньше). Каждый блок обрабатывается независимо в отдельном процессе. Этот подход идеален для задач, где каждый блок не зависит от соседнего (например, вычисление контрольных сумм, фильтрация шума).
Пример 4: Обработка CSV с помощью Pandas и агрегация по чанкам
import pandas as pd
chunk_iter = pd.read_csv('sales.csv', chunksize=10000,
usecols=['product', 'revenue'],
dtype={'revenue': 'float32'})
result = []
for chunk in chunk_iter:
grouped = chunk.groupby('product')['revenue'].agg(['sum', 'count'])
result.append(grouped)
final = pd.concat(result).groupby(level=0).sum()
print(final.head())
Результат:
sum count
product
A 15234.5 120
B 8940.2 85
C 23401.8 200
Чанки размером 10 000 строк агрегируются отдельно, затем результаты объединяются. Использование dtype уменьшает потребление памяти, usecols загружает только нужные столбцы.
Пример 5: Генератор для построчного чтения с пропуском первых N строк
def read_skip(filepath, skip=5):
with open(filepath, 'r', encoding='utf-8') as f:
for _ in range(skip):
next(f) # пропустить заголовки
for line in f:
yield line.strip()
for line in read_skip('big_data.csv', skip=10):
process(line)
Генератор позволяет экономить память и контролировать, какие строки обрабатывать. Можно легко модифицировать для пропуска пустых строк или комментариев.