Pythonのasync/await(Producer-Consumer編)

はじめに

Pythonのasync構文を使って、簡単なProducer-Consumerパターンを書いてみます。

async awaitの前に

コルーチンの知識があったほうがよいので、まずはコルーチン関数を抑えておくことをお勧めします。以下のスライドの23p~26pの説明が分かりやすいです。

ざっくり言うと、一時停止と再開が可能で外からコントロールできる関数がコルーチンです。

async await構文

async await構文はPython3.5で非同期プログラミングをサポートするために導入された構文です。
以下、リリースハイライトの抜粋です。

コルーチン関数は新たな構文 async def を用いて定義されます

コルーチン関数内で、新たな await 式を用いることで結果が利用可能になるまでコルーチンの実行を停止することが出来ます。

またasync/await導入の元となった、PEP492のabstract読んでみると

It is proposed to make coroutines a proper standalone concept in Python,

The ultimate goal is to help … asynchronous programming in Python and make it as close to synchronous programming as possible.

とあるので、Pythonの中でのコルーチンのコンセプトをはっきりさせ、非同期プログラミングをできるだけ意識しないで書けるようにすることが目的とのことです。

今までは、ジェネレータもコルーチンも同じyieldキーワード使っていたので、両者を混同しやすかったのもこの構文導入のきっかけのようです。

以上を踏まえて、早速Producer-Consumerパターンのサンプルを書いてみます。

Pythonのサンプル

以下PythonのProducer-Consumerのサンプルです。

import asyncio
import random

async def producer(num, queue):
    while True:
        x = random.randint(1, 100)
        print("producer-" + str(num) + " : " + str(x))
        await queue.put(x)
        await asyncio.sleep(10)

async def consumer(num, queue):
    while True:
        value = await queue.get()
        print("consumer-" + str(num) + " : " + str(value))

以下のmainで動かすと

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    q = asyncio.Queue()
    loop.create_task(producer(1, q))
    loop.create_task(producer(2, q))
    loop.create_task(consumer(1, q))
    loop.run_forever()

以下のような出力が得られます。非同期でproducer/consumerが動いていますね。

producer-1 : 5
producer-2 : 9
consumer-1 : 5
consumer-1 : 9
producer-1 : 12
producer-2 : 96
consumer-1 : 12
consumer-1 : 96

Javaのサンプル(おまけ)

ついでにJavaでもProducer-Consumerのサンプル書いてみました。こんな感じになります。

import java.util.Random;
import java.util.concurrent.*;

public class Sample {
    public static void producer(int num, BlockingQueue queue){
        Random rand = new Random();
        while(true) {
            try{
                int x = rand.nextInt(100);
                System.out.println("producer-" + num + " : " + x);
                queue.put(x);
                Thread.sleep(10000);
            }catch(InterruptedException e){}
        }
    }

    public static void consumer(int num, BlockingQueue queue) {
        while(true){
            try{
                int value = queue.take();
                System.out.println("consumer-" + num + " : " + value);
            }catch(InterruptedException e){}
        }
    }

以下のmainで動かすと似たような出力を得られます。

public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        BlockingQueue q = new LinkedBlockingQueue<>();
        service.submit(() -> producer(1, q));
        service.submit(() -> producer(2, q));
        service.submit(() -> consumer(1, q));
        Thread.sleep(Integer.MAX_VALUE);
    }

まとめ

async/await構文により、非同期の関数という意図が読み手に伝わりやすくなり、かつ書き手にとってはより非同期を意識せずにコーディングが出来るようになりました。

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

CAPTCHA