Современные технологии обработки объемных датасетов в библиотеке Pandas

Раздел: Data Science -> Pandas

Обработка больших данных с помощью Pandas

При анализе данных, объём которых превышает доступную оперативную память, стандартные методы Pandas могут приводить к ошибкам нехватки памяти. Ниже рассматриваются основные подходы, позволяющие работать с такими наборами, сохраняя производительность.

Как обработать большой CSV файл без полной загрузки в память?

Основной способ - чтение данных порциями (chunking). Параметр chunksize в pd.read_csv() возвращает итератор, выдающий фрагменты заданного размера.

import pandas as pd
chunksize = 50000
aggregated = []
for chunk in pd.read_csv('big_file.csv', chunksize=chunksize):
    # фильтрация, агрегация
    agg = chunk.groupby('category')['value'].sum()
    aggregated.append(agg)
result = pd.concat(aggregated).groupby(level=0).sum()

обработка больших данных python (обработка больших данных в python)

Каждый фрагмент обрабатывается независимо, а результаты объединяются. Это позволяет работать с файлом любого размера, ограничивая пиковое потребление памяти размером одного чанка.

Типичные проблемы и их решение:

  • Накопление списка агрегированных данных может снова превысить память при большом количестве уникальных групп. Используйте агрегацию на лету: создайте словарь или Series и обновляйте его.
  • Пропуск последнего чанка, если его размер меньше chunksize. Итератор корректно обрабатывает все строки, но если в цикле используется условие, проверьте, что последний фрагмент не пуст.
  • Операции, требующие сортировки (например, groupby с сортировкой), могут замедлиться. Отключайте сортировку параметром sort=False.

Вариант 1: Dask - распараллеливание операций Pandas

Dask предоставляет фреймворк для параллельных вычислений, имитируя API Pandas. Данные разбиваются на партиции, которые обрабатываются параллельно.

import dask.dataframe as dd
ddf = dd.read_csv('big_file.csv', blocksize=100e6)  # 100 MB блоки
result = ddf.groupby('category')['value'].mean().compute()

очистка данных python (очистка данных в python)

Вычисления выполняются лениво до вызова .compute(). Dask автоматически управляет памятью и использует несколько ядер процессора.

Вариант 2: Vaex - интерактивный анализ вне ядра

Vaex оптимизирован для работы с таблицами, не помещающимися в память, используя ленивые вычисления и эффективное отображение данных на диск.

import vaex
df = vaex.from_csv('big_file.csv', convert=True, chunk_size=1000000)
df.mean('value')  # вычисление без загрузки всей таблицы

Python подготовка данных (подготовка данных в python)

Vaex поддерживает визуализацию, статистику и перемещение данных в памяти только при необходимости.

Вариант 3: Modin - ускорение без смены API

Modin заменяет импорт Pandas и распределяет вычисления на все доступные ядра с помощью Ray или Dask.

# import modin.pandas as pd
# df = pd.read_csv('big_file.csv')
# df.groupby('col').sum()

работа с dataframe python (работа с dataframe в python)

Modin часто даёт ускорение в 2-4 раза на многоядерных системах, но может потреблять больше памяти из-за промежуточных копий.

Вариант 4: Оптимизация типов данных

Снижение объёма памяти возможно за счёт более компактных типов: категории для строковых столбцов с малым количеством уникальных значений, целые типы с меньшей разрядностью, использование pd.array с nullable целыми.

dtypes = {'country': 'category', 'age': 'int8', 'salary': 'float32'}
df = pd.read_csv('big_file.csv', dtype=dtypes)

Python работа с большими данными (работа с большими данными в python)

Это особенно эффективно перед чтением, если известна структура. Анализ занимаемой памяти: df.info(memory_usage='deep').

Проблемы при оптимизации типов:

  • Неправильный выбор типа (например, int8 для значения более 127) приводит к переполнению. Используйте infer_objects() или проверяйте диапазоны.
  • Категориальный тип неэффективен при большом числе уникальных значений (более 10% от общего числа строк).

Вариант 5: Использование формата Parquet

Parquet - колоночный формат с сжатием, поддерживающий фильтрацию на уровне файла. Pandas читает его быстрее, чем CSV.

df.to_parquet('data.parquet')
df = pd.read_parquet('data.parquet', columns=['needed_cols'])

При работе с большими данными рекомендуется сразу сохранять промежуточные результаты в Parquet, а не CSV.

- Python код символа (код символа в python)
- код из файла python (код из файла python)
- обработка данных на python (обработка данных на python)

Пример 1: Потоковая агрегация с накоплением в словаре

При большом количестве уникальных групп группировка через список может быть заменена обновлением словаря.

Пример
import pandas as pd
from collections import defaultdict

chunksize = 50000
aggregated = defaultdict(float)

for chunk in pd.read_csv('sales.csv', chunksize=chunksize, usecols=['product','revenue']):
    for idx, row in chunk.iterrows():
        aggregated[row['product']] += row['revenue']

result = pd.Series(aggregated)
product
A    125000.5
B     89000.2
...
dtype: float64

Этот метод потребляет память, пропорциональную числу уникальных продуктов, а не размеру данных.

Пример 2: Параллельная обработка чанков с помощью multiprocessing

Можно ускорить обработку, распределяя чанки между процессами.

Пример
import pandas as pd
from multiprocessing import Pool
import numpy as np

def process_chunk(chunk):
    return chunk.groupby('cat')['val'].sum()

chunks = [chunk for chunk in pd.read_csv('data.csv', chunksize=100000)]

with Pool(4) as p:
    results = p.map(process_chunk, chunks)

final = pd.concat(results).groupby(level=0).sum()

Количество процессов обычно равно числу ядер. Важно, чтобы каждая функция была self-contained и не требовала большого объёма данных между процессами.

Пример 3: Dask с несколькими операциями (фильтрация, join, агрегация)

Dask позволяет выполнять сложные цепочки преобразований на наборах больше памяти.

Пример
import dask.dataframe as dd

# загрузка двух датафреймов
df1 = dd.read_csv('orders.csv', blocksize=100e6)
df2 = dd.read_csv('customers.csv', blocksize=100e6)

# фильтрация и объединение
filtered = df1[df1['amount'] > 100]
joined = filtered.merge(df2, on='customer_id')

# агрегация и вычисление
result = joined.groupby('region')['amount'].sum().compute()

print(result)
region
North America    1.23e6
Europe           0.89e6
...
Name: amount, dtype: float64

Dask выполняет операции лениво, оптимизируя план выполнения.

Пример 4: Оптимизация типов и чтение с указанием формата Parquet

Комбинация компактных типов и Parquet даёт выигрыш по времени и памяти.

Пример
import pandas as pd
import pyarrow.parquet as pq

# конвертация CSV в Parquet с оптимизированными типами
schema = {
    'id': 'int32',
    'name': 'string',
    'salary': 'float32',
    'city': 'category'
}
df = pd.read_csv('employees.csv', dtype=schema)
df.to_parquet('employees.parquet', compression='snappy')

# чтение только нужных столбцов
df = pd.read_parquet('employees.parquet', columns=['city','salary'])

# анализ памяти
print(df.memory_usage(deep=True))
Index           128
city         83502
salary      409600
dtype: int64

Обработка больших данных в Python - comments

En
обработка больших данных python (python)