sched.scheduler による Python の並行処理:分かりやすい解説


sched.scheduler の基本的な仕組み

sched.scheduler は、イベントと呼ばれる個々の処理単位を時間軸に沿って管理します。各イベントは、実行時刻と実行する関数情報で構成されます。スケジューラは、現在時刻と照らし合わせ、実行時刻を迎えたイベントを順次実行していきます。

イベントのスケジューリング

イベントをスケジューリングするには、以下の2つのメソッドを用います。

  • enterabs(): 絶対時刻と実行する関数を指定して、イベントをスケジューリングします。絶対時間は、イベント実行時刻をPOSIXエポック時間(秒単位)で指定します。
  • enter(): 遅延時間と実行する関数を指定して、イベントをスケジューリングします。遅延時間は、現在時刻からイベント実行までの時間(秒単位)を指定します。

イベントの実行

スケジューリングされたイベントは、run() メソッドを呼び出すことで実行されます。このメソッドは、スケジューリングキューに登録されているイベントのうち、現在時刻時点で実行時刻を迎えたイベントを全て実行します。

sched.scheduler の並行実行

sched.scheduler は、マルチスレッド環境で安全に使用することができます。そのため、スケジューリングされたイベントを並行的に実行することができます。ただし、注意点は以下の通りです。

  • 複数のイベントが同時に実行時刻を迎えた場合は、優先順位に基づいて実行されます。デフォルトの優先順位はイベントの登録順序ですが、priority 引数を使用して変更することができます。
  • sched.scheduler は、真の並行処理ではなく、擬似的な並行処理を実現します。実際には、単一のスレッドでイベントを順番に実行していきます。

sched.scheduler の例

以下の例は、sched.scheduler を用いて2つのイベントを並行的に実行するプログラムです。

import sched
import time

def event1():
    print('イベント1が実行されました。')

def event2():
    print('イベント2が実行されました。')

scheduler = sched.scheduler()
scheduler.enter(1, 1, event1)
scheduler.enter(2, 2, event2)

scheduler.run()

このプログラムを実行すると、以下の出力が得られます。

イベント1が実行されました。
イベント2が実行されました。

sched.scheduler の注意点

sched.scheduler は、シンプルな並行処理を実現するのに役立ちますが、複雑な並行処理には適していません。より複雑な並行処理を行う場合は、threadingasyncio などのモジュールを使用することを検討してください。

sched.scheduler に関する詳細は、以下のドキュメントを参照してください。



2つのイベントを異なる間隔で実行

この例では、2つのイベントをそれぞれ1秒間隔と2秒間隔で実行します。

import sched
import time

def event1():
    print('イベント1が実行されました。')

def event2():
    print('イベント2が実行されました。')

scheduler = sched.scheduler()
scheduler.enter(1, 1, event1)
scheduler.enter(2, 2, event2)
scheduler.enter(3, 1, event1)  # イベント1を再度登録
scheduler.enter(4, 2, event2)  # イベント2を再度登録

scheduler.run()
イベント1が実行されました。
イベント2が実行されました。
イベント1が実行されました。
イベント2が実行されました。

優先順位付きイベント

この例では、優先順位を指定してイベントをスケジューリングし、優先順位の高いイベントが先に実行されることを確認します。

import sched
import time

def event1():
    print('イベント1が実行されました。')

def event2():
    print('イベント2が実行されました。')

scheduler = sched.scheduler()
scheduler.enter(1, 2, event1, priority=1)  # 優先順位1
scheduler.enter(2, 1, event2, priority=2)  # 優先順位2

scheduler.run()
イベント2が実行されました。
イベント1が実行されました。

特定の時間にイベントを実行

この例では、enterabs() メソッドを使用して、特定の時間にイベントを実行します。

import sched
import time

def event1():
    print('イベント1が実行されました。')

scheduler = sched.scheduler()
scheduler.enterabs(time.time() + 5, 1, event1)  # 5秒後に実行

scheduler.run()

このプログラムを実行すると、5秒後に「イベント1が実行されました。」という出力が出力されます。

無限ループでイベントを繰り返し実行

この例では、while ループを使用して、イベントを繰り返し実行します。

import sched
import time

def event1():
    print('イベント1が実行されました。')

scheduler = sched.scheduler()
scheduler.enter(1, 1, event1)

while True:
    scheduler.run()
    time.sleep(1)  # 1秒待つ

このプログラムを実行すると、「イベント1が実行されました。」という出力が1秒間隔で繰り返し出力されます。

キャンセルイベント

この例では、enter() メソッドでイベントを登録し、その後 cancel() メソッドでイベントをキャンセルします。

import sched
import time

def event1():
    print('イベント1が実行されました。')

scheduler = sched.scheduler()
event_id = scheduler.enter(1, 1, event1)

print('イベントをキャンセルします。')
scheduler.cancel(event_id)

scheduler.run()


threading モジュール

threading モジュールは、Python 標準ライブラリに含まれるスレッド管理機能を提供します。このモジュールを用いることで、複数のタスクを別々のスレッドで実行することができます。sched.scheduler と比較すると、以下の利点があります。

  • より複雑な並行処理に対応できる
  • 真の並行処理を実現できる

一方、以下の点に注意する必要があります。

  • デッドロックなどの問題が発生する可能性がある
  • スレッド間の同期処理が必要になる

import threading

def task1():
    print('タスク1が実行されました。')

def task2():
    print('タスク2が実行されました。')

thread1 = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

asyncio モジュール

asyncio モジュールは、Python 3.5 以降で導入された非同期処理のためのライブラリです。このモジュールを用いることで、イベントループと呼ばれる制御機構に基づいて、非同期的に複数のタスクを実行することができます。threading モジュールと比較すると、以下の利点があります。

  • コルーチンと呼ばれる軽量な並行処理機構を利用できる
  • コードがよりシンプルで分かりやすい
  • asyncio に対応していないライブラリやモジュールを使用できない
  • threading モジュールよりも新しい機能であり、習得に時間がかかる

import asyncio

async def task1():
    print('タスク1が実行されました。')

async def task2():
    print('タスク2が実行されました。')

async def main():
    await asyncio.gather(task1(), task2())

if __name__ == '__main__':
    asyncio.run(main())

上記以外にも、様々な並行処理ライブラリが用意されています。

  • Ray: 分散処理と機械学習に特化したフレームワーク
  • Dask: 分散処理に特化したライブラリ
  • Joblib: 高速な並行処理に特化したライブラリ

これらのライブラリは、それぞれ異なる機能や特性を持つため、具体的なニーズに合わせて選択する必要があります。