|

ABC DX Tech Internship 2024 参加者募集中!ぜひご応募ください!

2024-03-28

Tips

Pythonで非同期処理をするときのasyncioの使い方

Pythonライブラリ非同期処理

Pythonで非同期処理がしたいときに使うライブラリ: asyncio

Pythonと非同期処理の歴史

すっかり市民権を得たように感じるスクリプト言語といえば、そう、Pythonです。

思えば、後方互換性をぶった切ったPython3系の黎明期にはライブラリの互換性の多くが崩壊し、過去のコード資産は亡きものとまでは言いませんが作り直しを余儀なきされたものも多くあったはずですが、このような時代に生きていると、あれは英断だったなと思いますね。

 

そんなPythonですが、非同期処理が出来るって知っていましたか?

asyncio というライブラリモジュールを使うのですが、初登場はWhat’s new in Python 3.4

asyncio: New provisional API for asynchronous IO (PEP 3156).

とある通り、3.4です。この時点ではまだ provisional でした。

その後、What’s new in Python 3.6 には

Starting with Python 3.6 the asyncio module is no longer provisional and its API is considered stable.

とありまして、stableになったのはPython3.6からです。

 

Python 3.6は2016年12月23日にリリースされているので、もうリリースから7年ちょい経っていますね。

 

非同期処理と並列処理

と、ここまで、非同期処理、非同期処理と言ってきましたが、非同期処理って何かわかりますでしょうか。

よく混同されがちな非同期処理と並列処理の違いをまとめておきます。

 

非同期処理

非同期処理は、タスクの完了を待つことをせずに次のタスクを実行する方式です。

通常Pythonはコードで言うところの「上から順」に処理を進めますが、重たいファイルのInput/Outputの処理やAPI等からの応答待ちの時間を待っていると勿体ないですよね。

そこでその完了を待たずに先に出来る別のことをさせておくことが出来るということです。

 

並列処理

こちらはマルチコアのプロセッサでタスクを分割して行う方式です。ということは、単純に馬力を上げに行く方式と言えます。なので使いどころとしては重たい計算になります。

Pythonの場合は、 multiprocessing というライブラリで並列処理を行います。

 

今回扱うのは、非同期処理の話ですが、使いどころに応じてこれらの使い分けは必要です。

一番手軽に使うシーンが出てくるのが非同期処理だと思います。

JavaScript/TypeScriptを書いている人からしたら、逆にそちらがデフォルトなので、常に意識しているものであるかとも思います。ここぞというときに想像と違う挙動を示してきたりしますしね…

 

Pythonで非同期処理をしてみるコード

そんな非同期処理をPythonで行うためのコードを見ていきます。

 

まずは、 async def で関数を定義し、それを同期処理的に走らせる方法です。

import asyncio async def async_sleep(sec): await asyncio.sleep(sec) print(f"waited time: {sec}s") async def main(): await async_sleep(10) await async_sleep(20) print("finished") asyncio.run(main())

この場合、 await のところで完了を待ちますので、10秒 + 20秒 = 30秒かかります。

 

あと、もう一つのポイントとして、 asyncio.run として関数を実行しないと、下記のようなエラーが出ます。

RuntimeWarning: coroutine 'main' was never awaited main()

グローバルなポジションに await は書けないので、 await main() なんて書き方も出来ません。

 

では、これで10秒sleepするタスクを待たずに20秒sleepするタスクに移るにはどうするか?

asyncio.create_task を使う方法があります。

import asyncio async def async_sleep(sec): await asyncio.sleep(sec) print(f"waited time: {sec}s") async def main(): task1 = asyncio.create_task(async_sleep(10)) task2 = asyncio.create_task(async_sleep(20)) await task1 await task2 asyncio.run(main())

この場合、 task1, task2 両方の完了時点である20秒後くらいに処理は終了します。

 

では、もう一個のtaskをこれらのtaskの下にたすとどうなるかというと、

import asyncio async def async_sleep(sec): await asyncio.sleep(sec) print(f"waited time: {sec}s") async def main(): task1 = asyncio.create_task(async_sleep(3)) task2 = asyncio.create_task(async_sleep(3)) await task1 await task2 task3 = asyncio.create_task(async_sleep(3)) await task3 asyncio.run(main())

この場合は、 task3task1task2 の完了後に実行されるということです。

このへんまではなんとなく想像がつきますし、よくある話です。

 

では、これだとどうでしょうか?

import asyncio import time async def async_sleep(sec): print("task start") await asyncio.sleep(sec) print(f"waited time: {sec}s") async def main(): print("start") task1 = asyncio.create_task(async_sleep(3)) task2 = asyncio.create_task(async_sleep(3)) time.sleep(5) print("sleep end") await task1 task3 = asyncio.create_task(async_sleep(3)) await task2 await task3 asyncio.run(main())

答えは、こうです。

start sleep end task start task start waited time: 3s waited time: 3s task start waited time: 3s

つまり、

  • create_task で積まれたタスクは溜まっており、 await task1 の行に達するまで実行されない

  • await task1 の行に達したら、そこまでの create_task されていたタスクは全て同時に実行されはじめる

ということになります。

 

ちなみに、

import asyncio import time async def async_sleep(sec): print("task start") await asyncio.sleep(sec) print(f"waited time: {sec}s") async def main(): task1 = asyncio.create_task(async_sleep(3)) task2 = asyncio.create_task(async_sleep(3)) task3 = asyncio.create_task(async_sleep(3)) asyncio.run(main())

のように、 await 行を用意せずに実行すると、溜まっていたタスクが実行されてそのまま終了を待たず終わります。

task start task start task start

とだけ出力される形ですね。

 

というような非常にわかりにくい挙動を示す中で、どういったことをすると良いか

 

asyncio.gatherを使う

まずは asyncio.gather を使う場合です。

async def main(): print("start") await asyncio.gather( async_sleep(3), async_sleep(3) ) time.sleep(5) print("sleep end")

これで、ちゃんと

start task start task start waited time: 3s waited time: 3s

と出てから5秒後に「sleep end」が出力されます。単純明快ですね。

 

TaskGroupを使う(Python 3.11以降)

Python3.11以降は asyncio.TaskGroup というものが新しく登場しています。

async def main(): print("start") async with asyncio.TaskGroup() as tg: tg.create_task(async_sleep(3)) tg.create_task(async_sleep(3)) time.sleep(5) print("sleep end")

これで同じ意味を持ちます。

 

なぜわざわざ TaskGroup が追加されたかというと、 except* という新しい構文で ExceptionGroup という形で例外を捕捉できるようになったからですね。

こうしないと、エラー把握に全てのタスクの完了を待つ必要があったり、最初の例外しか拾えなかったりとか例外処理およびタスクキャンセルまわりがかなり不便でしたが大きく改善されました。

 

まとめ

今回はPythonを使う人でも知らないままの人が多いかもしれない asyncio の話でした。

オープンソースのコードを読んでいるときなどに突然出くわすより、予めこういった機能については把握しておきたいところですね…あわよくばうまく使っていきたいです。

 

最近はWebアプリやちょっとしたバッチ処理等をPythonで書く方も多いかと思いますが、序盤で述べたように、ボトルネックがI/Oや他サービスからの反応待ちにある場合はぜひ活用しておきたい機能ですね。

 


この記事の著者

プロフィール画像

伴 拓也

朝日放送グループホールディングス株式会社 DX・メディアデザイン局 デジタル・メディアチーム

アプリケーションからインフラ、ネットワーク、データエンジニアリングまで幅広い守備範囲が売り。最近はデータ基盤の構築まわりに力を入れて取り組む。 主な実績として、M-1グランプリ敗者復活戦投票システムのマルチクラウド化等。