Pythonの非同期処理メモ

Python

こんにちは、ユウです。

先日、Pythonの非同期処理について勉強したので、本記事にまとめようと思います。

本記事は、下記の書籍を参考にしています。

非同期処理とは、プログラムが特定のタスクを実行している間に、他のタスクを同時に進行できる手法のことを指します。これにより、プログラムは時間のかかる処理(ネットワーク通信やファイルの読み書きなど)を他のタスクをブロックすることなく実行できます(Wikipedia)。

協調的マルチタスクとは、非同期処理の中心となる要素です。このマルチタスクの方式では、各プロセスは待機状態になると自発的に制御を開放し、同時に実行されている他のプロセスに制御を渡します。制御を渡されたプロセスはタスクを実行します。

例えば、あるプロセスが入出力操作(ネットワーク通信やファイルの読み書きなど)を行うタイミングで制御を開放すれば、入出力操作の完了を待たずに他のプロセスがタスクを実行できます。

なお、アプリケーションレベルで協調的マルチタスクを行うときは、複数のプロセスやスレッドを協調させるのではなく、すべてを1つのプロセスやスレッドの中で実行します。具体的には、1つの関数が複数のタスクの実行を制御します。この関数は多くの場合、イベントループとして実装されます。

Pythonで非同期処理を行うにはコルーチンを定義します。

コルーチンは、非同期処理で実行されるタスクです。コルーチンは「中断できる関数」と考えるとわかりやすいです。

コルーチンはasync defで定義されます。

コルーチンと通常の関数を対比すると以下のとおりです。

  • def func: 関数funcを定義します。
    • funcは関数オブジェクトを返します。
    • 関数を呼び出す(実行する)にはfunc()とします
  • async def func: コルーチンfuncを定義します
    • funcは関数オブジェクトを返します。
    • func()と呼び出してもコルーチンオブジェクトが返るだけです
    • コルーチンの実行にはイベントループまたはasyncio.runを使います

コルーチンという用語は、コルーチン関数(async def)を指すほか、 コルーチンオブジェクト(コルーチン関数を呼び出すと返ってくるオブジェクト)を指すこともあります(参考

以下、コルーチンを使った簡単なサンプルコードです。

import asyncio

# コルーチンを定義
async def hello():
    print("Hello!")
    await asyncio.sleep(1)
    print("World!")

# コルーチンを実行
asyncio.run(hello())

# 実行結果
Hello!
World!

上記のサンプルでは、awaitを使って処理(1秒間スリープ)が終わるのを待機しています。

また、待機している間は制御を開放し、他のタスクを実行できるようにします。

複数のタスクが並行に処理されることを確認するため以下のサンプルを考えます。

import asyncio

async def hello(name):
    print(f"{name}: Hello!")
    await asyncio.sleep(1)
    print(f"{name}: World!")

async def main():
    await asyncio.gather(
        hello("first"),
        hello("second"),
    )

asyncio.run(main())

# 実行結果
first: Hello!
second: Hello! # first: World!の前に実行されている
first: World!
second: World!

上記の例では、gatherを使って複数のコルーチン(hello("first"), hello("second"))を並列に実行しています。

hello("first")print(f"{name}: Hello!")を実行したのちに、続くawait asyncio.sleep(1)で制御を開放し、制御がhello("second")に移ります。そして、hello("second")print(f"{name}: Hello!")が実行されています。

以下、非同期処理の実践的な応用例として、AWSのS3から複数ファイルを非同期処理を使って取得してみます。

まず、比較的容量が大きい(数十MG)のファイルを3つほどS3にアップロードしておきます。

これらのfileをboto3のget_objectメソッドを使って取得します。

まず、非同期処理を使わずに3つのfileを取得する場合のコードは以下のとおりです。

import time

import boto3

BUKET_NAME = "acyncio-test"
KEYS = [
    "sample_files/large_file_1.pq",
    "sample_files/large_file_2.pq",
    "sample_files/large_file_3.pq",
]

s3 = boto3.client("s3")


def get_object(key):
    print("start get object", key)
    start = time.time()

    s3.get_object(Bucket=BUKET_NAME, Key=key)

    print("finish get object", key)
    print(f"time: {time.time() - start} (key: {key})")


def main():
    total_start = time.time()

    for key in KEYS:
        get_object(key)

    print(f"total time: {time.time() - total_start}")


if __name__ == "__main__":
    main()

このコードを実行すると以下の結果になりました。

% poetry run python samples/s3_sample/s3_get_obj.py
start get object sample_files/large_file_1.pq
finish get object sample_files/large_file_1.pq
time: 0.3777639865875244 (key: sample_files/large_file_1.pq)
start get object sample_files/large_file_2.pq
finish get object sample_files/large_file_2.pq
time: 0.23510098457336426 (key: sample_files/large_file_2.pq)
start get object sample_files/large_file_3.pq
finish get object sample_files/large_file_3.pq
time: 0.24019813537597656 (key: sample_files/large_file_3.pq)
total time: 0.8567490577697754

3つのファイルが順番に取得されているのがわかります。

一方、これを非同期処理を使って書き換えてみます。

boto3のget_objectメソッドは、メインスレッドで実行するとイベントループをブロックしてしまうようなIOバウンドメソッドに当たります。そのようなメソッドについては、asyncioのto_threadメソッドを使うことで、別のスレッドで非同期的に実行できます。

to_threadメソッドを使うとコードは以下のようになります。

import asyncio
import time

import boto3

BUKET_NAME = "acyncio-test"
KEYS = [
    "sample_files/large_file_1.pq",
    "sample_files/large_file_2.pq",
    "sample_files/large_file_3.pq",
]

s3 = boto3.client("s3")


async def get_object(key):
    print("start get object", key)
    start = time.time()

    await asyncio.to_thread(s3.get_object, Bucket=BUKET_NAME, Key=key)

    print("finish get object", key)
    print(f"key: {key}, time: {time.time() - start}")


async def main():
    start = time.time()

    tasks = [get_object(key) for key in KEYS]
    await asyncio.gather(*tasks)

    print(f"total time: {time.time() - start}")


if __name__ == "__main__":
    asyncio.run(main())

結果は以下のとおりです。

% poetry run python samples/s3_sample/async_s3_get_obj.py
start get object sample_files/large_file_1.pq
start get object sample_files/large_file_2.pq
start get object sample_files/large_file_3.pq
finish get object sample_files/large_file_1.pq
key: sample_files/large_file_1.pq, time: 0.3632187843322754
finish get object sample_files/large_file_2.pq
key: sample_files/large_file_2.pq, time: 0.3656728267669678
finish get object sample_files/large_file_3.pq
key: sample_files/large_file_3.pq, time: 60.354564905166626
total time: 60.36102604866028

3つのファイルが非同期に取得されているのがわかります(large_file_1.pqの取得の完了前にlarge_file_2.pqの取得が開始されている等)。ですが、時間は同期処理の場合よりもかかってしまいました…その理由については現在、調査中です。

プロフィール
この記事を書いた人

30代半ばで未経験でプログラマーに転職し、日々奮闘中です
プログラミング、AI、NLP、キャリア関連などで少しでも役に立てる情報を発信していきます

ユウをフォローする
Python
ユウをフォローする

コメント

タイトルとURLをコピーしました