Pythonのasync/awaitを使った非同期プログラミングは、I/O待ちが発生する処理を効率的に扱うための強力な手法です。この記事では、基本概念から実践的な使い方まで解説します。
同期処理と非同期処理の比較
| 項目 | 同期処理 | 非同期処理 |
|---|---|---|
| 処理の流れ | 順番に実行 | 並行して実行 |
| I/O待ち中 | ブロック(待機) | 他のタスクを実行 |
| コードの複雑さ | シンプル | やや複雑 |
| 向いている処理 | CPU処理 | I/O処理 |
| スレッド | 1スレッド | 1スレッド |
並行処理の手法比較
| 手法 | 用途 | GILの影響 | メモリ |
|---|---|---|---|
asyncio | I/O処理 | 受けない | 軽量 |
threading | I/O処理 | 受ける | 中程度 |
multiprocessing | CPU処理 | 受けない | 重い |
async/awaitの基本
コルーチンの定義
import asyncio
# async defで非同期関数(コルーチン)を定義
async def greet(name: str) -> str:
print(f"{name}に挨拶します...")
await asyncio.sleep(1) # 1秒待機(非同期)
return f"こんにちは、{name}さん!"
# コルーチンの実行
async def main():
result = await greet("田中")
print(result)
# Python 3.7以降
asyncio.run(main())
awaitとは
awaitは、非同期処理の完了を待つためのキーワードです。
import asyncio
async def fetch_data():
print("データ取得開始")
await asyncio.sleep(2) # I/O待ちをシミュレート
print("データ取得完了")
return {"id": 1, "name": "test"}
async def main():
# awaitで結果を待つ
data = await fetch_data()
print(f"取得したデータ: {data}")
asyncio.run(main())
複数タスクの並行実行
asyncio.gather
複数のコルーチンを同時に実行し、すべての結果を待ちます。
import asyncio
import time
async def task(name: str, duration: int) -> str:
print(f"タスク {name} 開始")
await asyncio.sleep(duration)
print(f"タスク {name} 完了")
return f"{name}の結果"
async def main():
start = time.time()
# 3つのタスクを並行実行
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
elapsed = time.time() - start
print(f"結果: {results}")
print(f"実行時間: {elapsed:.2f}秒") # 約3秒(最長のタスク時間)
asyncio.run(main())
出力:
タスク A 開始
タスク B 開始
タスク C 開始
タスク B 完了
タスク A 完了
タスク C 完了
結果: ['Aの結果', 'Bの結果', 'Cの結果']
実行時間: 3.00秒
asyncio.create_task
タスクをバックグラウンドで開始し、後で結果を取得できます。
import asyncio
async def background_task(name: str):
await asyncio.sleep(2)
return f"{name}完了"
async def main():
# タスクを作成(すぐに実行開始)
task1 = asyncio.create_task(background_task("タスク1"))
task2 = asyncio.create_task(background_task("タスク2"))
print("他の処理を実行...")
await asyncio.sleep(1)
print("まだタスクは実行中...")
# 結果を取得
result1 = await task1
result2 = await task2
print(f"結果: {result1}, {result2}")
asyncio.run(main())
asyncio.wait
タスクの完了を細かく制御できます。
import asyncio
async def task(n: int):
await asyncio.sleep(n)
return n
async def main():
tasks = [
asyncio.create_task(task(3)),
asyncio.create_task(task(1)),
asyncio.create_task(task(2))
]
# 最初に完了したタスクから処理
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
for t in done:
print(f"完了: {t.result()}")
print(f"残りのタスク: {len(pending)}")
# 残りを待つ
if pending:
done, _ = await asyncio.wait(pending)
for t in done:
print(f"完了: {t.result()}")
asyncio.run(main())
タイムアウト処理
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "完了"
async def main():
try:
# 3秒でタイムアウト
result = await asyncio.wait_for(slow_operation(), timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("タイムアウトしました")
asyncio.run(main())
複数タスクにタイムアウト
import asyncio
async def fetch_with_timeout(url: str, timeout: float):
"""タイムアウト付きのデータ取得"""
try:
async with asyncio.timeout(timeout):
# 実際のネットワーク処理をシミュレート
await asyncio.sleep(2)
return f"{url}のデータ"
except asyncio.TimeoutError:
return f"{url}はタイムアウト"
async def main():
urls = ["url1", "url2", "url3"]
tasks = [fetch_with_timeout(url, 1.5) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())
エラーハンドリング
基本的なエラー処理
import asyncio
async def risky_task(fail: bool):
await asyncio.sleep(1)
if fail:
raise ValueError("タスクが失敗しました")
return "成功"
async def main():
try:
result = await risky_task(True)
print(result)
except ValueError as e:
print(f"エラー: {e}")
asyncio.run(main())
gatherでのエラー処理
import asyncio
async def task(n: int):
if n == 2:
raise ValueError(f"タスク{n}でエラー")
await asyncio.sleep(n)
return f"タスク{n}完了"
async def main():
# return_exceptions=Trueで例外を結果として返す
results = await asyncio.gather(
task(1),
task(2),
task(3),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"タスク{i+1}: エラー - {result}")
else:
print(f"タスク{i+1}: {result}")
asyncio.run(main())
非同期コンテキストマネージャ
import asyncio
class AsyncResource:
async def __aenter__(self):
print("リソース取得")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("リソース解放")
await asyncio.sleep(0.1)
async def process(self):
print("処理中...")
await asyncio.sleep(0.5)
async def main():
async with AsyncResource() as resource:
await resource.process()
asyncio.run(main())
非同期イテレータ
import asyncio
class AsyncRange:
def __init__(self, start: int, end: int):
self.start = start
self.end = end
self.current = start
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.1) # 非同期処理
value = self.current
self.current += 1
return value
async def main():
async for num in AsyncRange(0, 5):
print(num)
asyncio.run(main())
実践的な例
非同期HTTPリクエスト
import asyncio
import aiohttp
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""URLからデータを取得"""
try:
async with session.get(url) as response:
return {
"url": url,
"status": response.status,
"length": len(await response.text())
}
except Exception as e:
return {"url": url, "error": str(e)}
async def fetch_all(urls: list[str]) -> list[dict]:
"""複数URLを並行取得"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1"
]
print("取得開始...")
results = await fetch_all(urls)
for result in results:
print(result)
asyncio.run(main())
非同期ファイル処理
import asyncio
import aiofiles
async def read_file(filepath: str) -> str:
"""非同期でファイルを読み込む"""
async with aiofiles.open(filepath, 'r', encoding='utf-8') as f:
return await f.read()
async def write_file(filepath: str, content: str):
"""非同期でファイルに書き込む"""
async with aiofiles.open(filepath, 'w', encoding='utf-8') as f:
await f.write(content)
async def process_files(input_files: list[str], output_dir: str):
"""複数ファイルを並行処理"""
async def process_one(filepath: str):
content = await read_file(filepath)
# 何らかの処理
processed = content.upper()
output_path = f"{output_dir}/{filepath.split('/')[-1]}"
await write_file(output_path, processed)
return filepath
tasks = [process_one(f) for f in input_files]
return await asyncio.gather(*tasks)
セマフォによる同時実行数制限
import asyncio
async def limited_task(semaphore: asyncio.Semaphore, name: str):
"""セマフォで同時実行数を制限"""
async with semaphore:
print(f"{name} 開始")
await asyncio.sleep(1)
print(f"{name} 完了")
return name
async def main():
# 最大3つまで同時実行
semaphore = asyncio.Semaphore(3)
tasks = [
limited_task(semaphore, f"タスク{i}")
for i in range(10)
]
results = await asyncio.gather(*tasks)
print(f"完了: {len(results)}件")
asyncio.run(main())
プロデューサー/コンシューマーパターン
import asyncio
import random
async def producer(queue: asyncio.Queue, name: str):
"""データを生成してキューに追加"""
for i in range(5):
item = f"{name}-item-{i}"
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(item)
print(f"[{name}] 生成: {item}")
await queue.put(None) # 終了シグナル
async def consumer(queue: asyncio.Queue, name: str):
"""キューからデータを取り出して処理"""
while True:
item = await queue.get()
if item is None:
break
print(f"[{name}] 処理: {item}")
await asyncio.sleep(random.uniform(0.1, 0.3))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
# プロデューサーとコンシューマーを並行実行
await asyncio.gather(
producer(queue, "P1"),
consumer(queue, "C1"),
consumer(queue, "C2")
)
asyncio.run(main())
同期コードとの連携
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
"""ブロッキングI/O処理(同期)"""
import time
time.sleep(2)
return "完了"
async def main():
loop = asyncio.get_running_loop()
# 別スレッドでブロッキング処理を実行
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(executor, blocking_io)
print(result)
asyncio.run(main())
よくある間違い
import asyncio
# NG: awaitを忘れる
async def bad_example():
asyncio.sleep(1) # awaitがない!何も待たない
print("すぐに実行される")
# OK: awaitを付ける
async def good_example():
await asyncio.sleep(1)
print("1秒後に実行される")
# NG: 同期関数内でawait
def sync_function():
# await asyncio.sleep(1) # SyntaxError!
pass
# NG: time.sleepを使う(ブロッキング)
async def blocking_sleep():
import time
time.sleep(1) # イベントループがブロックされる!
まとめ
| 用途 | 推奨方法 |
|---|---|
| 複数タスクの並行実行 | asyncio.gather() |
| バックグラウンドタスク | asyncio.create_task() |
| タイムアウト | asyncio.wait_for() / asyncio.timeout() |
| 同時実行数制限 | asyncio.Semaphore |
| 非同期キュー | asyncio.Queue |
| ブロッキング処理の実行 | loop.run_in_executor() |
非同期プログラミングは、I/O処理が多いアプリケーション(Webサーバー、APIクライアント、スクレイピング等)で特に効果を発揮します。