Documentation Python

Pythonのasync/awaitを使った非同期プログラミングは、I/O待ちが発生する処理を効率的に扱うための強力な手法です。この記事では、基本概念から実践的な使い方まで解説します。

同期処理と非同期処理の比較

項目同期処理非同期処理
処理の流れ順番に実行並行して実行
I/O待ち中ブロック(待機)他のタスクを実行
コードの複雑さシンプルやや複雑
向いている処理CPU処理I/O処理
スレッド1スレッド1スレッド

並行処理の手法比較

手法用途GILの影響メモリ
asyncioI/O処理受けない軽量
threadingI/O処理受ける中程度
multiprocessingCPU処理受けない重い

async/awaitの基本

コルーチンの定義

import asyncio

# async defで非同期関数(コルーチン)を定義
async def greet(name: str) -> str:
    print(f"{name}に挨拶します...")
    await asyncio.sleep(1)  # 1秒待機(非同期)
    return f"こんにちは、{name}さん!"

# コルーチンの実行
async def main():
    result = await greet("田中")
    print(result)

# Python 3.7以降
asyncio.run(main())

awaitとは

awaitは、非同期処理の完了を待つためのキーワードです。

import asyncio

async def fetch_data():
    print("データ取得開始")
    await asyncio.sleep(2)  # I/O待ちをシミュレート
    print("データ取得完了")
    return {"id": 1, "name": "test"}

async def main():
    # awaitで結果を待つ
    data = await fetch_data()
    print(f"取得したデータ: {data}")

asyncio.run(main())

複数タスクの並行実行

asyncio.gather

複数のコルーチンを同時に実行し、すべての結果を待ちます。

import asyncio
import time

async def task(name: str, duration: int) -> str:
    print(f"タスク {name} 開始")
    await asyncio.sleep(duration)
    print(f"タスク {name} 完了")
    return f"{name}の結果"

async def main():
    start = time.time()

    # 3つのタスクを並行実行
    results = await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 3)
    )

    elapsed = time.time() - start
    print(f"結果: {results}")
    print(f"実行時間: {elapsed:.2f}秒")  # 約3秒(最長のタスク時間)

asyncio.run(main())

出力:

タスク A 開始
タスク B 開始
タスク C 開始
タスク B 完了
タスク A 完了
タスク C 完了
結果: ['Aの結果', 'Bの結果', 'Cの結果']
実行時間: 3.00秒

asyncio.create_task

タスクをバックグラウンドで開始し、後で結果を取得できます。

import asyncio

async def background_task(name: str):
    await asyncio.sleep(2)
    return f"{name}完了"

async def main():
    # タスクを作成(すぐに実行開始)
    task1 = asyncio.create_task(background_task("タスク1"))
    task2 = asyncio.create_task(background_task("タスク2"))

    print("他の処理を実行...")
    await asyncio.sleep(1)
    print("まだタスクは実行中...")

    # 結果を取得
    result1 = await task1
    result2 = await task2

    print(f"結果: {result1}, {result2}")

asyncio.run(main())

asyncio.wait

タスクの完了を細かく制御できます。

import asyncio

async def task(n: int):
    await asyncio.sleep(n)
    return n

async def main():
    tasks = [
        asyncio.create_task(task(3)),
        asyncio.create_task(task(1)),
        asyncio.create_task(task(2))
    ]

    # 最初に完了したタスクから処理
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    for t in done:
        print(f"完了: {t.result()}")

    print(f"残りのタスク: {len(pending)}")

    # 残りを待つ
    if pending:
        done, _ = await asyncio.wait(pending)
        for t in done:
            print(f"完了: {t.result()}")

asyncio.run(main())

タイムアウト処理

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "完了"

async def main():
    try:
        # 3秒でタイムアウト
        result = await asyncio.wait_for(slow_operation(), timeout=3.0)
        print(result)
    except asyncio.TimeoutError:
        print("タイムアウトしました")

asyncio.run(main())

複数タスクにタイムアウト

import asyncio

async def fetch_with_timeout(url: str, timeout: float):
    """タイムアウト付きのデータ取得"""
    try:
        async with asyncio.timeout(timeout):
            # 実際のネットワーク処理をシミュレート
            await asyncio.sleep(2)
            return f"{url}のデータ"
    except asyncio.TimeoutError:
        return f"{url}はタイムアウト"

async def main():
    urls = ["url1", "url2", "url3"]

    tasks = [fetch_with_timeout(url, 1.5) for url in urls]
    results = await asyncio.gather(*tasks)

    for result in results:
        print(result)

asyncio.run(main())

エラーハンドリング

基本的なエラー処理

import asyncio

async def risky_task(fail: bool):
    await asyncio.sleep(1)
    if fail:
        raise ValueError("タスクが失敗しました")
    return "成功"

async def main():
    try:
        result = await risky_task(True)
        print(result)
    except ValueError as e:
        print(f"エラー: {e}")

asyncio.run(main())

gatherでのエラー処理

import asyncio

async def task(n: int):
    if n == 2:
        raise ValueError(f"タスク{n}でエラー")
    await asyncio.sleep(n)
    return f"タスク{n}完了"

async def main():
    # return_exceptions=Trueで例外を結果として返す
    results = await asyncio.gather(
        task(1),
        task(2),
        task(3),
        return_exceptions=True
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"タスク{i+1}: エラー - {result}")
        else:
            print(f"タスク{i+1}: {result}")

asyncio.run(main())

非同期コンテキストマネージャ

import asyncio

class AsyncResource:
    async def __aenter__(self):
        print("リソース取得")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("リソース解放")
        await asyncio.sleep(0.1)

    async def process(self):
        print("処理中...")
        await asyncio.sleep(0.5)

async def main():
    async with AsyncResource() as resource:
        await resource.process()

asyncio.run(main())

非同期イテレータ

import asyncio

class AsyncRange:
    def __init__(self, start: int, end: int):
        self.start = start
        self.end = end
        self.current = start

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)  # 非同期処理
        value = self.current
        self.current += 1
        return value

async def main():
    async for num in AsyncRange(0, 5):
        print(num)

asyncio.run(main())

実践的な例

非同期HTTPリクエスト

import asyncio
import aiohttp

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    """URLからデータを取得"""
    try:
        async with session.get(url) as response:
            return {
                "url": url,
                "status": response.status,
                "length": len(await response.text())
            }
    except Exception as e:
        return {"url": url, "error": str(e)}

async def fetch_all(urls: list[str]) -> list[dict]:
    """複数URLを並行取得"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1"
    ]

    print("取得開始...")
    results = await fetch_all(urls)

    for result in results:
        print(result)

asyncio.run(main())

非同期ファイル処理

import asyncio
import aiofiles

async def read_file(filepath: str) -> str:
    """非同期でファイルを読み込む"""
    async with aiofiles.open(filepath, 'r', encoding='utf-8') as f:
        return await f.read()

async def write_file(filepath: str, content: str):
    """非同期でファイルに書き込む"""
    async with aiofiles.open(filepath, 'w', encoding='utf-8') as f:
        await f.write(content)

async def process_files(input_files: list[str], output_dir: str):
    """複数ファイルを並行処理"""
    async def process_one(filepath: str):
        content = await read_file(filepath)
        # 何らかの処理
        processed = content.upper()
        output_path = f"{output_dir}/{filepath.split('/')[-1]}"
        await write_file(output_path, processed)
        return filepath

    tasks = [process_one(f) for f in input_files]
    return await asyncio.gather(*tasks)

セマフォによる同時実行数制限

import asyncio

async def limited_task(semaphore: asyncio.Semaphore, name: str):
    """セマフォで同時実行数を制限"""
    async with semaphore:
        print(f"{name} 開始")
        await asyncio.sleep(1)
        print(f"{name} 完了")
        return name

async def main():
    # 最大3つまで同時実行
    semaphore = asyncio.Semaphore(3)

    tasks = [
        limited_task(semaphore, f"タスク{i}")
        for i in range(10)
    ]

    results = await asyncio.gather(*tasks)
    print(f"完了: {len(results)}件")

asyncio.run(main())

プロデューサー/コンシューマーパターン

import asyncio
import random

async def producer(queue: asyncio.Queue, name: str):
    """データを生成してキューに追加"""
    for i in range(5):
        item = f"{name}-item-{i}"
        await asyncio.sleep(random.uniform(0.1, 0.5))
        await queue.put(item)
        print(f"[{name}] 生成: {item}")
    await queue.put(None)  # 終了シグナル

async def consumer(queue: asyncio.Queue, name: str):
    """キューからデータを取り出して処理"""
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"[{name}] 処理: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.3))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)

    # プロデューサーとコンシューマーを並行実行
    await asyncio.gather(
        producer(queue, "P1"),
        consumer(queue, "C1"),
        consumer(queue, "C2")
    )

asyncio.run(main())

同期コードとの連携

import asyncio
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    """ブロッキングI/O処理(同期)"""
    import time
    time.sleep(2)
    return "完了"

async def main():
    loop = asyncio.get_running_loop()

    # 別スレッドでブロッキング処理を実行
    with ThreadPoolExecutor() as executor:
        result = await loop.run_in_executor(executor, blocking_io)
        print(result)

asyncio.run(main())

よくある間違い

import asyncio

# NG: awaitを忘れる
async def bad_example():
    asyncio.sleep(1)  # awaitがない!何も待たない
    print("すぐに実行される")

# OK: awaitを付ける
async def good_example():
    await asyncio.sleep(1)
    print("1秒後に実行される")

# NG: 同期関数内でawait
def sync_function():
    # await asyncio.sleep(1)  # SyntaxError!
    pass

# NG: time.sleepを使う(ブロッキング)
async def blocking_sleep():
    import time
    time.sleep(1)  # イベントループがブロックされる!

まとめ

用途推奨方法
複数タスクの並行実行asyncio.gather()
バックグラウンドタスクasyncio.create_task()
タイムアウトasyncio.wait_for() / asyncio.timeout()
同時実行数制限asyncio.Semaphore
非同期キューasyncio.Queue
ブロッキング処理の実行loop.run_in_executor()

非同期プログラミングは、I/O処理が多いアプリケーション(Webサーバー、APIクライアント、スクレイピング等)で特に効果を発揮します。

参考文献

円