Обработка крупных наборов данных в Python: от Pandas до распределённых инструментов

Раздел: Data Science -> Pandas

Практические приёмы работы с объёмными данными в среде Python

Как обработать датасет, который не помещается в оперативную память?

Основной подход - загрузка данных частями (chunks) с помощью параметра chunksize в функции pd.read_csv. Это позволяет обрабатывать файлы любого размера, не нагружая RAM.

import pandas as pd

chunk_iter = pd.read_csv('large_dataset.csv', chunksize=50000)

results = []
for chunk in chunk_iter:
    # Фильтрация и агрегация внутри одного чанка
    filtered = chunk[chunk['value'] > 100]
    results.append(filtered['amount'].sum())

# Сборка итогового результата
total = sum(results)
print(f"Сумма amount для value > 100: {total}")

обработка больших данных python (обработка больших данных в python)

Пояснение шагов:

  • chunksize=50000 - размер одного блока (число строк). Подбирается исходя из доступной памяти и плотности данных.
  • Цикл for chunk in chunk_iter - ленивая загрузка: каждый следующий чанк читается только после обработки предыдущего.
  • Локальные результаты собираются в список и финально агрегируются.

Типичные проблемы:

  • Медленная итерация - при очень маленьком chunksize обработка затягивается. Решение: увеличить размер блока до 100 000–500 000 строк, но следить за памятью.
  • Некорректные типы данных - Pandas пытается угадать типы, что может привести к повторному чтению. Решение: явно задать dtype для всех столбцов.
  • Потеря контекста между чанками - для оконных функций или группировок по всему датасету требуется аккумуляция. Решение: использовать промежуточные структуры (словари, счетчики) или перейти к Dask.

Цель использования: обработка файлов, размер которых превышает доступную оперативную память, без перехода на специализированные Big Data инструменты.

Как выполнить Pandas-подобные операции над данными, которые больше памяти?

Библиотека Dask предоставляет параллельный фреймворк, имитирующий pandas API, и разбивает вычисления на задачи, не помещая все данные в память.

import dask.dataframe as dd

# Загрузка как Dask DataFrame (лениво)
ddf = dd.read_csv('large_dataset.csv', blocksize='100MB')

# Операции - ленивые, результат не вычисляется до .compute()
result = ddf.groupby('category')['value'].mean().compute()
print(result)

очистка данных python (очистка данных в python)

Пояснение:

  • blocksize задаёт размер одного разбиения (partition).
  • Все преобразования записываются в граф вычислений, реальная работа начинается только при вызове .compute().
  • Можно выполнять группировки, объединения, оконные функции, фильтрации.

Характерные ошибки:

  • Переполнение памяти при .compute() - если результат слишком велик, он не поместится в RAM. Решение: перед вызовом compute применять дополнительную агрегацию или использовать .to_csv(...) для записи по частям.
  • Неверный тип данных столбцов - Dask не умеет автоматически определять типы для больших файлов. Рекомендуется явно указывать dtype.

Случаи использования: ETL-процессы с файлами размером в десятки и сотни гигабайт, когда требуется богатый функционал pandas, но без ущерба для масштабируемости.

Как интерактивно исследовать миллиарды строк без перегрузки памяти?

Библиотека Vaex использует отложенные вычисления и выборочную загрузку, позволяя работать с большими файлами (до терабайт) на ноутбуке.

import vaex

# Открываем файл (формат HDF5 или Arrow)
df = vaex.open('large_data.hdf5')

# Фильтрация не загружает данные, пока не требуется визуализация
filtered = df[df.x > 0]

# Мгновенный доступ к статистике без загрузки всего столбца
print(filtered.mean('y'))

Python подготовка данных (подготовка данных в python)

Пояснение: Vaex хранит данные в памяти лениво, использует отображение файлов (memory mapping) и эффективное сжатие. Операции вроде mean() выполняются за один проход по файлу.

Возможные сложности:

  • Необходимость конвертации - Vaex наиболее эффективен с собственными форматами (HDF5, Arrow). Конвертация CSV может занять время.
  • Ограниченная поддержка операций - не все функции pandas переведены в Vaex. Для сложной агрегации может потребоваться экспорт в Pandas.

Когда использовать: разведочный анализ данных (EDA), визуализация, проверка гипотез на наборах данных от десятков до сотен гигабайт.

Как ускорить существующий pandas код без переписывания?

Библиотека Modin позволяет заменить import pandas as pd на import modin.pandas as pd, автоматически распределяя вычисления по всем ядрам.

import modin.pandas as pd

# Код остаётся практически без изменений
df = pd.read_csv('large_dataset.csv')
result = df.groupby('category')['value'].sum()
print(result)

Python работа с большими данными (работа с большими данными в python)

Особенности: Modin использует Ray или Dask в качестве бэкенда. Прирост производительности заметен на многопроцессорных системах и файлах размером от 1 ГБ.

Проблемы и пути решения:

  • Несовместимость с некоторыми функциями pandas - например, resample или crosstab могут не поддерживаться. Решение: использовать оригинальный pandas для этих участков.
  • Большое потребление памяти при материализации - Modin стремится разбить данные на части, но финальный результат может быть собран в одном месте. Для очень больших результатов применяют to_csv или to_parquet частями.

Кому подходит: командам, уже имеющим код на pandas, которые хотят ускорить его без кардинальных изменений.

Как оптимизировать типы данных для уменьшения используемой памяти?

Ещё один эффективный приём - приведение столбцов к более экономным типам (int8, float32, категории). Это особенно полезно при загрузке небольших кусков данных.

import pandas as pd

dtype_spec = {
    'user_id': 'int32',
    'age': 'uint8',
    'item': 'category',
    'price': 'float32'
}

chunk_iter = pd.read_csv('transactions.csv', chunksize=100000, dtype=dtype_spec)

for chunk in chunk_iter:
    # Обработка...
    pass

Пояснение: Явное задание типов избавляет от повторных проходов по файлу при угадывании и снижает занимаемую память в 2–4 раза.

Распространённые ошибки:

  • Переполнение типа - если в столбце есть значения, выходящие за диапазон (например, int8 не подходит для возраста 200). Решение: предварительно оценить распределение или использовать int16/int32.
  • Игнорирование пропусков - тип category плохо совместим с NaN. Рекомендуется обрабатывать пропуски до преобразования.

Когда применяют: при работе с данными фиксированной схемы (журналы, транзакции) для снижения нагрузки на память.

- код из файла python (код из файла python)
- обработка данных на python (обработка данных на python)
- обработка символьных данных python (обработка символьных данных в python)

Ниже приведены расширенные примеры, демонстрирующие нестандартные приёмы работы с большими данными.

Пример 1: Параллельная обработка чанков с помощью concurrent.futures

Вместо последовательного цикла можно распределить обработку чанков по нескольким процессам.

Пример
import pandas as pd
from concurrent.futures import ProcessPoolExecutor

def process_chunk(chunk):
    # Моделируем тяжёлую агрегацию
    return chunk[chunk['value'] > 50]['amount'].sum()

chunks = pd.read_csv('large.csv', chunksize=100000)

with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_chunk, chunks))

total = sum(results)
print(f"Итоговая сумма: {total}")
Итоговая сумма: 1234567890

Пояснение: каждый чанк обрабатывается отдельным процессом, что утилизирует несколько ядер. Важно убедиться, что функция process_chunk не требует много памяти на один вызов.

Пример 2: Создание виртуальных столбцов в Vaex для экономии памяти

Vaex позволяет добавлять вычисляемые столбцы без их материализации.

Пример
import vaex

df = vaex.open('sensor_data.hdf5')
df['temperature_f'] = df.temperature_c * 9/5 + 32
df['status'] = df.temperature_c > 100

print(df[['timestamp', 'temperature_c', 'temperature_f', 'status']].head(5))
  timestamp  temperature_c  temperature_f  status
0         1         98.0        208.4   False
1         2        102.5        216.5    True
2         3         99.9        211.8   False
3         4        105.0        221.0    True
4         5         97.8        208.0   False

Пояснение: столбцы temperature_f и status не занимают дополнительного места, так как являются виртуальными и вычисляются на лету.

Пример 3: Использование Dask для скользящего окна по большому датасету

Dask поддерживает оконные функции, например rolling mean, с автоматической сегментацией.

Пример
import dask.dataframe as dd

df = dd.read_csv('timeseries.csv', blocksize='50MB')
df = df.set_index('timestamp')
rolling_mean = df['value'].rolling(window=10).mean().compute()
print(rolling_mean.head(15))
timestamp
0         NaN
1         NaN
2         NaN
3         NaN
4         NaN
5         NaN
6         NaN
7         NaN
8         NaN
9         5.2
10        5.5
11        5.8
12        6.0
13        6.3
14        6.7
Name: value, dtype: float64

Пояснение: Dask разбивает DataFrame по времени (благодаря индексу) и обрабатывает каждую часть с перекрытием границ, чтобы окно работало корректно.

Пример 4: Чтение данных из нескольких файлов с помощью glob и Modin

Modin умеет автоматически распределять чтение множества CSV-файлов.

Пример
import modin.pandas as pd
import glob

files = glob.glob('data/2023/*.csv')
df = pd.concat([pd.read_csv(f) for f in files])

print(f"Загружено строк: {len(df)}")
print(df.groupby('region')['revenue'].sum())
Загружено строк: 45200000
region
North    5.123e8
South    3.789e8
East     2.456e8
West     4.567e8
Name: revenue, dtype: float64

Пояснение: Modin параллельно читает каждый файл, а объединённый DataFrame остаётся ленивым до момента вычисления.

Пример 5: Оптимизация памяти с помощью downcast в Pandas

Автоматическое понижение типов после загрузки снижает потребление.

Пример
import pandas as pd

df = pd.read_csv('sales.csv')
# Выбор только числовых столбцов
num_cols = df.select_dtypes(include=['int64', 'float64']).columns

for col in num_cols:
    df[col] = pd.to_numeric(df[col], downcast='integer' if df[col].dtype == 'int64' else 'float')

print(f"Память до: {df.memory_usage(deep=True).sum() / 1024**2:.1f} MB (после downcast)")
Память до: 124.3 MB (после downcast)

Пояснение: downcast автоматически выбирает минимальный подтип (int8, int16, float32 и т.д.), что особенно эффективно для столбцов с ограниченным диапазоном значений.

Работа с большими данными в Python - comments

En
Python работа с большими данными (python)