Pandasで大規模データセットを効率的に処理するための最適化手法を解説します。メモリ管理、チャンク処理、代替ライブラリの活用まで、実践的なテクニックを紹介します。
大規模データ処理ライブラリの比較
| ライブラリ | データサイズ | 処理速度 | メモリ効率 | 学習コスト |
|---|
| Pandas | ~10GB | 中 | 低 | 低 |
| Dask | 10GB~ | 中 | 高 | 中 |
| Polars | ~100GB | 高 | 高 | 中 |
| PySpark | TB~ | 高 | 高 | 高 |
| Vaex | 100GB~ | 高 | 高 | 中 |
大規模データ処理の課題
| 課題 | 原因 | 解決策 |
|---|
| メモリ不足 | 全データをメモリにロード | チャンク処理、型最適化 |
| 処理速度の低下 | 単一スレッド処理 | 並列処理、ベクトル化 |
| ファイル読み込み時間 | I/Oボトルネック | パーケット形式、列選択 |
チャンク処理によるメモリ効率化
基本的なチャンク処理
import pandas as pd
def process_large_csv(filepath: str, chunk_size: int = 100000):
"""大規模CSVをチャンク単位で処理"""
results = []
for chunk in pd.read_csv(filepath, chunksize=chunk_size):
# 各チャンクに対して処理を実行
chunk_result = chunk.groupby('category')['value'].sum()
results.append(chunk_result)
# 結果を結合
final_result = pd.concat(results).groupby(level=0).sum()
return final_result
# 使用例
result = process_large_csv('large_dataset.csv')
print(result)
条件付きフィルタリング
import pandas as pd
def filter_large_csv(
filepath: str,
condition_column: str,
condition_value,
chunk_size: int = 100000
):
"""条件に合う行のみを抽出"""
filtered_chunks = []
for chunk in pd.read_csv(filepath, chunksize=chunk_size):
# 条件に合う行のみを保持
filtered = chunk[chunk[condition_column] == condition_value]
if not filtered.empty:
filtered_chunks.append(filtered)
if filtered_chunks:
return pd.concat(filtered_chunks, ignore_index=True)
return pd.DataFrame()
# 使用例
df = filter_large_csv('sales_data.csv', 'region', 'Tokyo')
メモリ使用量のモニタリング
import pandas as pd
def get_memory_usage(df: pd.DataFrame) -> str:
"""データフレームのメモリ使用量を取得"""
memory_bytes = df.memory_usage(deep=True).sum()
if memory_bytes < 1024:
return f"{memory_bytes} B"
elif memory_bytes < 1024 ** 2:
return f"{memory_bytes / 1024:.2f} KB"
elif memory_bytes < 1024 ** 3:
return f"{memory_bytes / 1024 ** 2:.2f} MB"
else:
return f"{memory_bytes / 1024 ** 3:.2f} GB"
# 使用例
df = pd.read_csv('data.csv')
print(f"メモリ使用量: {get_memory_usage(df)}")
# 詳細なメモリ使用量
print(df.info(memory_usage='deep'))
データ型の最適化
自動型最適化関数
import pandas as pd
import numpy as np
def optimize_dtypes(df: pd.DataFrame, verbose: bool = True) -> pd.DataFrame:
"""データフレームのデータ型を最適化してメモリを節約"""
initial_memory = df.memory_usage(deep=True).sum()
for col in df.columns:
col_type = df[col].dtype
# 数値型の最適化
if col_type in ['int64', 'int32']:
c_min = df[col].min()
c_max = df[col].max()
if c_min >= 0:
if c_max < 255:
df[col] = df[col].astype(np.uint8)
elif c_max < 65535:
df[col] = df[col].astype(np.uint16)
elif c_max < 4294967295:
df[col] = df[col].astype(np.uint32)
else:
if c_min > -128 and c_max < 127:
df[col] = df[col].astype(np.int8)
elif c_min > -32768 and c_max < 32767:
df[col] = df[col].astype(np.int16)
elif c_min > -2147483648 and c_max < 2147483647:
df[col] = df[col].astype(np.int32)
elif col_type == 'float64':
df[col] = df[col].astype(np.float32)
# カテゴリ型への変換(ユニーク値が少ない場合)
elif col_type == 'object':
num_unique = df[col].nunique()
num_total = len(df[col])
if num_unique / num_total < 0.5: # 50%未満がユニーク
df[col] = df[col].astype('category')
final_memory = df.memory_usage(deep=True).sum()
if verbose:
reduction = (1 - final_memory / initial_memory) * 100
print(f"メモリ使用量: {initial_memory / 1024**2:.2f} MB → {final_memory / 1024**2:.2f} MB")
print(f"削減率: {reduction:.1f}%")
return df
# 使用例
df = pd.read_csv('large_dataset.csv')
df = optimize_dtypes(df)
読み込み時の型指定
import pandas as pd
import numpy as np
# 型を明示的に指定して読み込み
dtypes = {
'id': np.int32,
'category': 'category',
'value': np.float32,
'count': np.int16,
'flag': 'bool'
}
df = pd.read_csv(
'large_dataset.csv',
dtype=dtypes,
usecols=['id', 'category', 'value', 'count', 'flag'] # 必要な列のみ
)
効率的なファイル形式
Parquet形式(推奨)
import pandas as pd
# Parquetで保存(圧縮あり)
df = pd.read_csv('large_dataset.csv')
df.to_parquet('data.parquet', compression='snappy', index=False)
# Parquetから読み込み(高速)
df = pd.read_parquet('data.parquet')
# 特定の列のみ読み込み
df = pd.read_parquet('data.parquet', columns=['id', 'value'])
ファイル形式の比較
import pandas as pd
import time
def compare_file_formats(df: pd.DataFrame):
"""ファイル形式別の読み書き速度を比較"""
results = {}
# CSV
start = time.time()
df.to_csv('test.csv', index=False)
write_time = time.time() - start
start = time.time()
pd.read_csv('test.csv')
read_time = time.time() - start
results['CSV'] = {'write': write_time, 'read': read_time}
# Parquet
start = time.time()
df.to_parquet('test.parquet', index=False)
write_time = time.time() - start
start = time.time()
pd.read_parquet('test.parquet')
read_time = time.time() - start
results['Parquet'] = {'write': write_time, 'read': read_time}
# Feather
start = time.time()
df.to_feather('test.feather')
write_time = time.time() - start
start = time.time()
pd.read_feather('test.feather')
read_time = time.time() - start
results['Feather'] = {'write': write_time, 'read': read_time}
return pd.DataFrame(results).T
# 使用例
df = pd.DataFrame({'a': range(1000000), 'b': range(1000000)})
print(compare_file_formats(df))
Daskによる並列処理
import dask.dataframe as dd
# Daskでの読み込み(遅延評価)
ddf = dd.read_csv('large_dataset.csv')
# Pandasと同様の操作
result = ddf.groupby('category')['value'].sum()
# 実際に計算を実行
result_df = result.compute()
print(result_df)
Daskでの大規模データ処理
import dask.dataframe as dd
def process_with_dask(filepath: str):
"""Daskを使った大規模データ処理"""
# 読み込み
ddf = dd.read_csv(filepath)
# フィルタリング
filtered = ddf[ddf['value'] > 100]
# グループ化と集計
grouped = filtered.groupby('category').agg({
'value': ['mean', 'sum', 'count']
})
# 結果を取得
return grouped.compute()
# 複数ファイルの処理
ddf = dd.read_csv('data_*.csv') # ワイルドカードで複数ファイル
Daskの設定
import dask
from dask.distributed import Client
# 分散処理の設定
client = Client(n_workers=4, threads_per_worker=2)
print(client)
# メモリ制限の設定
dask.config.set({'dataframe.shuffle.compression': True})
Polarsによる高速処理
import polars as pl
# Polarsでの読み込み(高速)
df = pl.read_csv('large_dataset.csv')
# Lazy評価による最適化
lazy_df = pl.scan_csv('large_dataset.csv')
result = (
lazy_df
.filter(pl.col('value') > 100)
.group_by('category')
.agg([
pl.col('value').mean().alias('mean_value'),
pl.col('value').sum().alias('total_value'),
pl.count().alias('count')
])
.collect() # 実行
)
print(result)
PolarsとPandasの連携
import polars as pl
import pandas as pd
# Pandas → Polars
pandas_df = pd.read_csv('data.csv')
polars_df = pl.from_pandas(pandas_df)
# Polars → Pandas
polars_df = pl.read_csv('data.csv')
pandas_df = polars_df.to_pandas()
実践的なパイプライン
import pandas as pd
import numpy as np
from typing import Generator, Dict, Any
class BigDataProcessor:
"""大規模データ処理パイプライン"""
def __init__(self, chunk_size: int = 100000):
self.chunk_size = chunk_size
self.results: Dict[str, Any] = {}
def read_chunks(self, filepath: str) -> Generator[pd.DataFrame, None, None]:
"""チャンク単位でデータを読み込み"""
for chunk in pd.read_csv(filepath, chunksize=self.chunk_size):
yield self._optimize_chunk(chunk)
def _optimize_chunk(self, df: pd.DataFrame) -> pd.DataFrame:
"""チャンクのメモリを最適化"""
for col in df.select_dtypes(include=['object']).columns:
if df[col].nunique() / len(df) < 0.5:
df[col] = df[col].astype('category')
return df
def process(self, filepath: str, aggregations: Dict[str, str]) -> pd.DataFrame:
"""データを処理して集計"""
chunk_results = []
for chunk in self.read_chunks(filepath):
# 各チャンクで集計
agg_result = chunk.groupby('category').agg(aggregations)
chunk_results.append(agg_result)
# 結果を結合
combined = pd.concat(chunk_results)
# 再集計
final = combined.groupby(level=0).agg({
col: 'sum' if agg == 'sum' else 'mean'
for col, agg in aggregations.items()
})
return final
# 使用例
processor = BigDataProcessor(chunk_size=50000)
result = processor.process(
'large_sales_data.csv',
{'revenue': 'sum', 'quantity': 'sum'}
)
print(result)
ベストプラクティス
メモリ効率化のチェックリスト
# 1. 必要な列のみ読み込み
df = pd.read_csv('data.csv', usecols=['col1', 'col2'])
# 2. 適切なデータ型を指定
df = pd.read_csv('data.csv', dtype={'id': 'int32', 'category': 'category'})
# 3. 不要なデータは早めに削除
del df['unnecessary_column']
df = df[df['value'] > 0] # フィルタリング
# 4. インプレース操作を活用
df.drop(columns=['temp'], inplace=True)
df.reset_index(drop=True, inplace=True)
# 5. 明示的なガベージコレクション
import gc
del large_df
gc.collect()
まとめ
| データサイズ | 推奨ツール | 主な最適化手法 |
|---|
| ~1GB | Pandas | 型最適化、列選択 |
| 1-10GB | Pandas + チャンク | チャンク処理、Parquet |
| 10-100GB | Dask / Polars | 並列処理、遅延評価 |
| 100GB~ | PySpark | 分散処理 |
大規模データを扱う際は、データ型の最適化、チャンク処理、適切なファイル形式の選択が重要です。データサイズに応じて、Dask、Polars、PySparkなどの代替ツールも検討しましょう。
参考文献