Documentation Python

Pandasで大規模データセットを効率的に処理するための最適化手法を解説します。メモリ管理、チャンク処理、代替ライブラリの活用まで、実践的なテクニックを紹介します。

大規模データ処理ライブラリの比較

ライブラリデータサイズ処理速度メモリ効率学習コスト
Pandas~10GB
Dask10GB~
Polars~100GB
PySparkTB~
Vaex100GB~

大規模データ処理の課題

課題原因解決策
メモリ不足全データをメモリにロードチャンク処理、型最適化
処理速度の低下単一スレッド処理並列処理、ベクトル化
ファイル読み込み時間I/Oボトルネックパーケット形式、列選択

チャンク処理によるメモリ効率化

基本的なチャンク処理

import pandas as pd

def process_large_csv(filepath: str, chunk_size: int = 100000):
    """大規模CSVをチャンク単位で処理"""

    results = []

    for chunk in pd.read_csv(filepath, chunksize=chunk_size):
        # 各チャンクに対して処理を実行
        chunk_result = chunk.groupby('category')['value'].sum()
        results.append(chunk_result)

    # 結果を結合
    final_result = pd.concat(results).groupby(level=0).sum()
    return final_result

# 使用例
result = process_large_csv('large_dataset.csv')
print(result)

条件付きフィルタリング

import pandas as pd

def filter_large_csv(
    filepath: str,
    condition_column: str,
    condition_value,
    chunk_size: int = 100000
):
    """条件に合う行のみを抽出"""

    filtered_chunks = []

    for chunk in pd.read_csv(filepath, chunksize=chunk_size):
        # 条件に合う行のみを保持
        filtered = chunk[chunk[condition_column] == condition_value]
        if not filtered.empty:
            filtered_chunks.append(filtered)

    if filtered_chunks:
        return pd.concat(filtered_chunks, ignore_index=True)
    return pd.DataFrame()

# 使用例
df = filter_large_csv('sales_data.csv', 'region', 'Tokyo')

メモリ使用量のモニタリング

import pandas as pd

def get_memory_usage(df: pd.DataFrame) -> str:
    """データフレームのメモリ使用量を取得"""
    memory_bytes = df.memory_usage(deep=True).sum()

    if memory_bytes < 1024:
        return f"{memory_bytes} B"
    elif memory_bytes < 1024 ** 2:
        return f"{memory_bytes / 1024:.2f} KB"
    elif memory_bytes < 1024 ** 3:
        return f"{memory_bytes / 1024 ** 2:.2f} MB"
    else:
        return f"{memory_bytes / 1024 ** 3:.2f} GB"

# 使用例
df = pd.read_csv('data.csv')
print(f"メモリ使用量: {get_memory_usage(df)}")

# 詳細なメモリ使用量
print(df.info(memory_usage='deep'))

データ型の最適化

自動型最適化関数

import pandas as pd
import numpy as np

def optimize_dtypes(df: pd.DataFrame, verbose: bool = True) -> pd.DataFrame:
    """データフレームのデータ型を最適化してメモリを節約"""

    initial_memory = df.memory_usage(deep=True).sum()

    for col in df.columns:
        col_type = df[col].dtype

        # 数値型の最適化
        if col_type in ['int64', 'int32']:
            c_min = df[col].min()
            c_max = df[col].max()

            if c_min >= 0:
                if c_max < 255:
                    df[col] = df[col].astype(np.uint8)
                elif c_max < 65535:
                    df[col] = df[col].astype(np.uint16)
                elif c_max < 4294967295:
                    df[col] = df[col].astype(np.uint32)
            else:
                if c_min > -128 and c_max < 127:
                    df[col] = df[col].astype(np.int8)
                elif c_min > -32768 and c_max < 32767:
                    df[col] = df[col].astype(np.int16)
                elif c_min > -2147483648 and c_max < 2147483647:
                    df[col] = df[col].astype(np.int32)

        elif col_type == 'float64':
            df[col] = df[col].astype(np.float32)

        # カテゴリ型への変換(ユニーク値が少ない場合)
        elif col_type == 'object':
            num_unique = df[col].nunique()
            num_total = len(df[col])
            if num_unique / num_total < 0.5:  # 50%未満がユニーク
                df[col] = df[col].astype('category')

    final_memory = df.memory_usage(deep=True).sum()

    if verbose:
        reduction = (1 - final_memory / initial_memory) * 100
        print(f"メモリ使用量: {initial_memory / 1024**2:.2f} MB → {final_memory / 1024**2:.2f} MB")
        print(f"削減率: {reduction:.1f}%")

    return df

# 使用例
df = pd.read_csv('large_dataset.csv')
df = optimize_dtypes(df)

読み込み時の型指定

import pandas as pd
import numpy as np

# 型を明示的に指定して読み込み
dtypes = {
    'id': np.int32,
    'category': 'category',
    'value': np.float32,
    'count': np.int16,
    'flag': 'bool'
}

df = pd.read_csv(
    'large_dataset.csv',
    dtype=dtypes,
    usecols=['id', 'category', 'value', 'count', 'flag']  # 必要な列のみ
)

効率的なファイル形式

Parquet形式(推奨)

import pandas as pd

# Parquetで保存(圧縮あり)
df = pd.read_csv('large_dataset.csv')
df.to_parquet('data.parquet', compression='snappy', index=False)

# Parquetから読み込み(高速)
df = pd.read_parquet('data.parquet')

# 特定の列のみ読み込み
df = pd.read_parquet('data.parquet', columns=['id', 'value'])

ファイル形式の比較

import pandas as pd
import time

def compare_file_formats(df: pd.DataFrame):
    """ファイル形式別の読み書き速度を比較"""

    results = {}

    # CSV
    start = time.time()
    df.to_csv('test.csv', index=False)
    write_time = time.time() - start

    start = time.time()
    pd.read_csv('test.csv')
    read_time = time.time() - start
    results['CSV'] = {'write': write_time, 'read': read_time}

    # Parquet
    start = time.time()
    df.to_parquet('test.parquet', index=False)
    write_time = time.time() - start

    start = time.time()
    pd.read_parquet('test.parquet')
    read_time = time.time() - start
    results['Parquet'] = {'write': write_time, 'read': read_time}

    # Feather
    start = time.time()
    df.to_feather('test.feather')
    write_time = time.time() - start

    start = time.time()
    pd.read_feather('test.feather')
    read_time = time.time() - start
    results['Feather'] = {'write': write_time, 'read': read_time}

    return pd.DataFrame(results).T

# 使用例
df = pd.DataFrame({'a': range(1000000), 'b': range(1000000)})
print(compare_file_formats(df))

Daskによる並列処理

import dask.dataframe as dd

# Daskでの読み込み(遅延評価)
ddf = dd.read_csv('large_dataset.csv')

# Pandasと同様の操作
result = ddf.groupby('category')['value'].sum()

# 実際に計算を実行
result_df = result.compute()
print(result_df)

Daskでの大規模データ処理

import dask.dataframe as dd

def process_with_dask(filepath: str):
    """Daskを使った大規模データ処理"""

    # 読み込み
    ddf = dd.read_csv(filepath)

    # フィルタリング
    filtered = ddf[ddf['value'] > 100]

    # グループ化と集計
    grouped = filtered.groupby('category').agg({
        'value': ['mean', 'sum', 'count']
    })

    # 結果を取得
    return grouped.compute()

# 複数ファイルの処理
ddf = dd.read_csv('data_*.csv')  # ワイルドカードで複数ファイル

Daskの設定

import dask
from dask.distributed import Client

# 分散処理の設定
client = Client(n_workers=4, threads_per_worker=2)
print(client)

# メモリ制限の設定
dask.config.set({'dataframe.shuffle.compression': True})

Polarsによる高速処理

import polars as pl

# Polarsでの読み込み(高速)
df = pl.read_csv('large_dataset.csv')

# Lazy評価による最適化
lazy_df = pl.scan_csv('large_dataset.csv')

result = (
    lazy_df
    .filter(pl.col('value') > 100)
    .group_by('category')
    .agg([
        pl.col('value').mean().alias('mean_value'),
        pl.col('value').sum().alias('total_value'),
        pl.count().alias('count')
    ])
    .collect()  # 実行
)

print(result)

PolarsとPandasの連携

import polars as pl
import pandas as pd

# Pandas → Polars
pandas_df = pd.read_csv('data.csv')
polars_df = pl.from_pandas(pandas_df)

# Polars → Pandas
polars_df = pl.read_csv('data.csv')
pandas_df = polars_df.to_pandas()

実践的なパイプライン

import pandas as pd
import numpy as np
from typing import Generator, Dict, Any

class BigDataProcessor:
    """大規模データ処理パイプライン"""

    def __init__(self, chunk_size: int = 100000):
        self.chunk_size = chunk_size
        self.results: Dict[str, Any] = {}

    def read_chunks(self, filepath: str) -> Generator[pd.DataFrame, None, None]:
        """チャンク単位でデータを読み込み"""
        for chunk in pd.read_csv(filepath, chunksize=self.chunk_size):
            yield self._optimize_chunk(chunk)

    def _optimize_chunk(self, df: pd.DataFrame) -> pd.DataFrame:
        """チャンクのメモリを最適化"""
        for col in df.select_dtypes(include=['object']).columns:
            if df[col].nunique() / len(df) < 0.5:
                df[col] = df[col].astype('category')
        return df

    def process(self, filepath: str, aggregations: Dict[str, str]) -> pd.DataFrame:
        """データを処理して集計"""
        chunk_results = []

        for chunk in self.read_chunks(filepath):
            # 各チャンクで集計
            agg_result = chunk.groupby('category').agg(aggregations)
            chunk_results.append(agg_result)

        # 結果を結合
        combined = pd.concat(chunk_results)

        # 再集計
        final = combined.groupby(level=0).agg({
            col: 'sum' if agg == 'sum' else 'mean'
            for col, agg in aggregations.items()
        })

        return final

# 使用例
processor = BigDataProcessor(chunk_size=50000)
result = processor.process(
    'large_sales_data.csv',
    {'revenue': 'sum', 'quantity': 'sum'}
)
print(result)

ベストプラクティス

メモリ効率化のチェックリスト

# 1. 必要な列のみ読み込み
df = pd.read_csv('data.csv', usecols=['col1', 'col2'])

# 2. 適切なデータ型を指定
df = pd.read_csv('data.csv', dtype={'id': 'int32', 'category': 'category'})

# 3. 不要なデータは早めに削除
del df['unnecessary_column']
df = df[df['value'] > 0]  # フィルタリング

# 4. インプレース操作を活用
df.drop(columns=['temp'], inplace=True)
df.reset_index(drop=True, inplace=True)

# 5. 明示的なガベージコレクション
import gc
del large_df
gc.collect()

まとめ

データサイズ推奨ツール主な最適化手法
~1GBPandas型最適化、列選択
1-10GBPandas + チャンクチャンク処理、Parquet
10-100GBDask / Polars並列処理、遅延評価
100GB~PySpark分散処理

大規模データを扱う際は、データ型の最適化、チャンク処理、適切なファイル形式の選択が重要です。データサイズに応じて、Dask、Polars、PySparkなどの代替ツールも検討しましょう。

参考文献

円