こんにちは、ユウです。
先日、Pythonの非同期処理について勉強したので、本記事にまとめようと思います。
本記事は、下記の書籍を参考にしています。
非同期処理と協調的マルチタスク
非同期処理とは、プログラムが特定のタスクを実行している間に、他のタスクを同時に進行できる手法のことを指します。これにより、プログラムは時間のかかる処理(ネットワーク通信やファイルの読み書きなど)を他のタスクをブロックすることなく実行できます(Wikipedia)。
協調的マルチタスクとは、非同期処理の中心となる要素です。このマルチタスクの方式では、各プロセスは待機状態になると自発的に制御を開放し、同時に実行されている他のプロセスに制御を渡します。制御を渡されたプロセスはタスクを実行します。
例えば、あるプロセスが入出力操作(ネットワーク通信やファイルの読み書きなど)を行うタイミングで制御を開放すれば、入出力操作の完了を待たずに他のプロセスがタスクを実行できます。
なお、アプリケーションレベルで協調的マルチタスクを行うときは、複数のプロセスやスレッドを協調させるのではなく、すべてを1つのプロセスやスレッドの中で実行します。具体的には、1つの関数が複数のタスクの実行を制御します。この関数は多くの場合、イベントループとして実装されます。
Pythonにおける非同期処理メモ
Pythonで非同期処理を行うにはコルーチンを定義します。
コルーチンは、非同期処理で実行されるタスクです。コルーチンは「中断できる関数」と考えるとわかりやすいです。
コルーチンはasync def
で定義されます。
コルーチンと通常の関数を対比すると以下のとおりです。
def func
: 関数func
を定義します。func
は関数オブジェクトを返します。- 関数を呼び出す(実行する)には
func()
とします
async def func
: コルーチンfunc
を定義しますfunc
は関数オブジェクトを返します。func()
と呼び出してもコルーチンオブジェクトが返るだけです- コルーチンの実行にはイベントループまたは
asyncio.run
を使います
以下、コルーチンを使った簡単なサンプルコードです。
import asyncio
# コルーチンを定義
async def hello():
print("Hello!")
await asyncio.sleep(1)
print("World!")
# コルーチンを実行
asyncio.run(hello())
# 実行結果
Hello!
World!
上記のサンプルでは、awaitを使って処理(1秒間スリープ)が終わるのを待機しています。
また、待機している間は制御を開放し、他のタスクを実行できるようにします。
複数のタスクが並行に処理されることを確認するため以下のサンプルを考えます。
import asyncio
async def hello(name):
print(f"{name}: Hello!")
await asyncio.sleep(1)
print(f"{name}: World!")
async def main():
await asyncio.gather(
hello("first"),
hello("second"),
)
asyncio.run(main())
# 実行結果
first: Hello!
second: Hello! # first: World!の前に実行されている
first: World!
second: World!
上記の例では、gather
を使って複数のコルーチン(hello("first")
, hello("second")
)を並列に実行しています。
hello("first")
でprint(f"{name}: Hello!")
を実行したのちに、続くawait asyncio.sleep(1)
で制御を開放し、制御がhello("second")
に移ります。そして、hello("second")
のprint(f"{name}: Hello!")
が実行されています。
非同期処理の応用例
以下、非同期処理の実践的な応用例として、AWSのS3から複数ファイルを非同期処理を使って取得してみます。
まず、比較的容量が大きい(数十MG)のファイルを3つほどS3にアップロードしておきます。
これらのfileをboto3のget_object
メソッドを使って取得します。
まず、非同期処理を使わずに3つのfileを取得する場合のコードは以下のとおりです。
import time
import boto3
BUKET_NAME = "acyncio-test"
KEYS = [
"sample_files/large_file_1.pq",
"sample_files/large_file_2.pq",
"sample_files/large_file_3.pq",
]
s3 = boto3.client("s3")
def get_object(key):
print("start get object", key)
start = time.time()
s3.get_object(Bucket=BUKET_NAME, Key=key)
print("finish get object", key)
print(f"time: {time.time() - start} (key: {key})")
def main():
total_start = time.time()
for key in KEYS:
get_object(key)
print(f"total time: {time.time() - total_start}")
if __name__ == "__main__":
main()
このコードを実行すると以下の結果になりました。
% poetry run python samples/s3_sample/s3_get_obj.py
start get object sample_files/large_file_1.pq
finish get object sample_files/large_file_1.pq
time: 0.3777639865875244 (key: sample_files/large_file_1.pq)
start get object sample_files/large_file_2.pq
finish get object sample_files/large_file_2.pq
time: 0.23510098457336426 (key: sample_files/large_file_2.pq)
start get object sample_files/large_file_3.pq
finish get object sample_files/large_file_3.pq
time: 0.24019813537597656 (key: sample_files/large_file_3.pq)
total time: 0.8567490577697754
3つのファイルが順番に取得されているのがわかります。
一方、これを非同期処理を使って書き換えてみます。
boto3のget_objectメソッドは、メインスレッドで実行するとイベントループをブロックしてしまうようなIOバウンドメソッドに当たります。そのようなメソッドについては、asyncioのto_threadメソッドを使うことで、別のスレッドで非同期的に実行できます。
to_threadメソッドを使うとコードは以下のようになります。
import asyncio
import time
import boto3
BUKET_NAME = "acyncio-test"
KEYS = [
"sample_files/large_file_1.pq",
"sample_files/large_file_2.pq",
"sample_files/large_file_3.pq",
]
s3 = boto3.client("s3")
async def get_object(key):
print("start get object", key)
start = time.time()
await asyncio.to_thread(s3.get_object, Bucket=BUKET_NAME, Key=key)
print("finish get object", key)
print(f"key: {key}, time: {time.time() - start}")
async def main():
start = time.time()
tasks = [get_object(key) for key in KEYS]
await asyncio.gather(*tasks)
print(f"total time: {time.time() - start}")
if __name__ == "__main__":
asyncio.run(main())
結果は以下のとおりです。
% poetry run python samples/s3_sample/async_s3_get_obj.py
start get object sample_files/large_file_1.pq
start get object sample_files/large_file_2.pq
start get object sample_files/large_file_3.pq
finish get object sample_files/large_file_1.pq
key: sample_files/large_file_1.pq, time: 0.3632187843322754
finish get object sample_files/large_file_2.pq
key: sample_files/large_file_2.pq, time: 0.3656728267669678
finish get object sample_files/large_file_3.pq
key: sample_files/large_file_3.pq, time: 60.354564905166626
total time: 60.36102604866028
3つのファイルが非同期に取得されているのがわかります(large_file_1.pqの取得の完了前にlarge_file_2.pqの取得が開始されている等)。ですが、時間は同期処理の場合よりもかかってしまいました…その理由については現在、調査中です。
参考
- https://docs.python.org/ja/3/library/asyncio.html
- PythonOsaka/asyncioを使ってみよう
- エキスパートPythonプログラミング 改訂4版 (アスキードワンゴ)
- aws boto3 s3ドキュメント
- https://iuk.hateblo.jp/entry/2017/01/27/173449
コメント