Современные технологии обработки объемных датасетов в библиотеке Pandas
Обработка больших данных с помощью Pandas
При анализе данных, объём которых превышает доступную оперативную память, стандартные методы Pandas могут приводить к ошибкам нехватки памяти. Ниже рассматриваются основные подходы, позволяющие работать с такими наборами, сохраняя производительность.
Как обработать большой CSV файл без полной загрузки в память?
Основной способ - чтение данных порциями (chunking). Параметр chunksize в pd.read_csv() возвращает итератор, выдающий фрагменты заданного размера.
import pandas as pd
chunksize = 50000
aggregated = []
for chunk in pd.read_csv('big_file.csv', chunksize=chunksize):
# фильтрация, агрегация
agg = chunk.groupby('category')['value'].sum()
aggregated.append(agg)
result = pd.concat(aggregated).groupby(level=0).sum()обработка больших данных python (обработка больших данных в python)
Каждый фрагмент обрабатывается независимо, а результаты объединяются. Это позволяет работать с файлом любого размера, ограничивая пиковое потребление памяти размером одного чанка.
Типичные проблемы и их решение:
- Накопление списка агрегированных данных может снова превысить память при большом количестве уникальных групп. Используйте агрегацию на лету: создайте словарь или Series и обновляйте его.
- Пропуск последнего чанка, если его размер меньше chunksize. Итератор корректно обрабатывает все строки, но если в цикле используется условие, проверьте, что последний фрагмент не пуст.
- Операции, требующие сортировки (например, groupby с сортировкой), могут замедлиться. Отключайте сортировку параметром sort=False.
Вариант 1: Dask - распараллеливание операций Pandas
Dask предоставляет фреймворк для параллельных вычислений, имитируя API Pandas. Данные разбиваются на партиции, которые обрабатываются параллельно.
import dask.dataframe as dd
ddf = dd.read_csv('big_file.csv', blocksize=100e6) # 100 MB блоки
result = ddf.groupby('category')['value'].mean().compute()очистка данных python (очистка данных в python)
Вычисления выполняются лениво до вызова .compute(). Dask автоматически управляет памятью и использует несколько ядер процессора.
Вариант 2: Vaex - интерактивный анализ вне ядра
Vaex оптимизирован для работы с таблицами, не помещающимися в память, используя ленивые вычисления и эффективное отображение данных на диск.
import vaex
df = vaex.from_csv('big_file.csv', convert=True, chunk_size=1000000)
df.mean('value') # вычисление без загрузки всей таблицы
Python подготовка данных (подготовка данных в python)
Vaex поддерживает визуализацию, статистику и перемещение данных в памяти только при необходимости.
Вариант 3: Modin - ускорение без смены API
Modin заменяет импорт Pandas и распределяет вычисления на все доступные ядра с помощью Ray или Dask.
# import modin.pandas as pd
# df = pd.read_csv('big_file.csv')
# df.groupby('col').sum()работа с dataframe python (работа с dataframe в python)
Modin часто даёт ускорение в 2-4 раза на многоядерных системах, но может потреблять больше памяти из-за промежуточных копий.
Вариант 4: Оптимизация типов данных
Снижение объёма памяти возможно за счёт более компактных типов: категории для строковых столбцов с малым количеством уникальных значений, целые типы с меньшей разрядностью, использование pd.array с nullable целыми.
dtypes = {'country': 'category', 'age': 'int8', 'salary': 'float32'}
df = pd.read_csv('big_file.csv', dtype=dtypes)Python работа с большими данными (работа с большими данными в python)
Это особенно эффективно перед чтением, если известна структура. Анализ занимаемой памяти: df.info(memory_usage='deep').
Проблемы при оптимизации типов:
- Неправильный выбор типа (например, int8 для значения более 127) приводит к переполнению. Используйте infer_objects() или проверяйте диапазоны.
- Категориальный тип неэффективен при большом числе уникальных значений (более 10% от общего числа строк).
Вариант 5: Использование формата Parquet
Parquet - колоночный формат с сжатием, поддерживающий фильтрацию на уровне файла. Pandas читает его быстрее, чем CSV.
df.to_parquet('data.parquet')
df = pd.read_parquet('data.parquet', columns=['needed_cols'])При работе с большими данными рекомендуется сразу сохранять промежуточные результаты в Parquet, а не CSV.
Пример 1: Потоковая агрегация с накоплением в словаре
При большом количестве уникальных групп группировка через список может быть заменена обновлением словаря.
import pandas as pd
from collections import defaultdict
chunksize = 50000
aggregated = defaultdict(float)
for chunk in pd.read_csv('sales.csv', chunksize=chunksize, usecols=['product','revenue']):
for idx, row in chunk.iterrows():
aggregated[row['product']] += row['revenue']
result = pd.Series(aggregated)product A 125000.5 B 89000.2 ... dtype: float64
Этот метод потребляет память, пропорциональную числу уникальных продуктов, а не размеру данных.
Пример 2: Параллельная обработка чанков с помощью multiprocessing
Можно ускорить обработку, распределяя чанки между процессами.
import pandas as pd
from multiprocessing import Pool
import numpy as np
def process_chunk(chunk):
return chunk.groupby('cat')['val'].sum()
chunks = [chunk for chunk in pd.read_csv('data.csv', chunksize=100000)]
with Pool(4) as p:
results = p.map(process_chunk, chunks)
final = pd.concat(results).groupby(level=0).sum()Количество процессов обычно равно числу ядер. Важно, чтобы каждая функция была self-contained и не требовала большого объёма данных между процессами.
Пример 3: Dask с несколькими операциями (фильтрация, join, агрегация)
Dask позволяет выполнять сложные цепочки преобразований на наборах больше памяти.
import dask.dataframe as dd
# загрузка двух датафреймов
df1 = dd.read_csv('orders.csv', blocksize=100e6)
df2 = dd.read_csv('customers.csv', blocksize=100e6)
# фильтрация и объединение
filtered = df1[df1['amount'] > 100]
joined = filtered.merge(df2, on='customer_id')
# агрегация и вычисление
result = joined.groupby('region')['amount'].sum().compute()
print(result)region North America 1.23e6 Europe 0.89e6 ... Name: amount, dtype: float64
Dask выполняет операции лениво, оптимизируя план выполнения.
Пример 4: Оптимизация типов и чтение с указанием формата Parquet
Комбинация компактных типов и Parquet даёт выигрыш по времени и памяти.
import pandas as pd
import pyarrow.parquet as pq
# конвертация CSV в Parquet с оптимизированными типами
schema = {
'id': 'int32',
'name': 'string',
'salary': 'float32',
'city': 'category'
}
df = pd.read_csv('employees.csv', dtype=schema)
df.to_parquet('employees.parquet', compression='snappy')
# чтение только нужных столбцов
df = pd.read_parquet('employees.parquet', columns=['city','salary'])
# анализ памяти
print(df.memory_usage(deep=True))Index 128 city 83502 salary 409600 dtype: int64