sched.scheduler による Python の並行処理:分かりやすい解説
sched.scheduler の基本的な仕組み
sched.scheduler は、イベントと呼ばれる個々の処理単位を時間軸に沿って管理します。各イベントは、実行時刻と実行する関数情報で構成されます。スケジューラは、現在時刻と照らし合わせ、実行時刻を迎えたイベントを順次実行していきます。
イベントのスケジューリング
イベントをスケジューリングするには、以下の2つのメソッドを用います。
- enterabs(): 絶対時刻と実行する関数を指定して、イベントをスケジューリングします。絶対時間は、イベント実行時刻をPOSIXエポック時間(秒単位)で指定します。
- enter(): 遅延時間と実行する関数を指定して、イベントをスケジューリングします。遅延時間は、現在時刻からイベント実行までの時間(秒単位)を指定します。
イベントの実行
スケジューリングされたイベントは、run() メソッドを呼び出すことで実行されます。このメソッドは、スケジューリングキューに登録されているイベントのうち、現在時刻時点で実行時刻を迎えたイベントを全て実行します。
sched.scheduler の並行実行
sched.scheduler は、マルチスレッド環境で安全に使用することができます。そのため、スケジューリングされたイベントを並行的に実行することができます。ただし、注意点は以下の通りです。
- 複数のイベントが同時に実行時刻を迎えた場合は、優先順位に基づいて実行されます。デフォルトの優先順位はイベントの登録順序ですが、priority 引数を使用して変更することができます。
- sched.scheduler は、真の並行処理ではなく、擬似的な並行処理を実現します。実際には、単一のスレッドでイベントを順番に実行していきます。
sched.scheduler の例
以下の例は、sched.scheduler を用いて2つのイベントを並行的に実行するプログラムです。
import sched
import time
def event1():
print('イベント1が実行されました。')
def event2():
print('イベント2が実行されました。')
scheduler = sched.scheduler()
scheduler.enter(1, 1, event1)
scheduler.enter(2, 2, event2)
scheduler.run()
このプログラムを実行すると、以下の出力が得られます。
イベント1が実行されました。
イベント2が実行されました。
sched.scheduler の注意点
sched.scheduler は、シンプルな並行処理を実現するのに役立ちますが、複雑な並行処理には適していません。より複雑な並行処理を行う場合は、threading や asyncio などのモジュールを使用することを検討してください。
sched.scheduler に関する詳細は、以下のドキュメントを参照してください。
2つのイベントを異なる間隔で実行
この例では、2つのイベントをそれぞれ1秒間隔と2秒間隔で実行します。
import sched
import time
def event1():
print('イベント1が実行されました。')
def event2():
print('イベント2が実行されました。')
scheduler = sched.scheduler()
scheduler.enter(1, 1, event1)
scheduler.enter(2, 2, event2)
scheduler.enter(3, 1, event1) # イベント1を再度登録
scheduler.enter(4, 2, event2) # イベント2を再度登録
scheduler.run()
イベント1が実行されました。
イベント2が実行されました。
イベント1が実行されました。
イベント2が実行されました。
優先順位付きイベント
この例では、優先順位を指定してイベントをスケジューリングし、優先順位の高いイベントが先に実行されることを確認します。
import sched
import time
def event1():
print('イベント1が実行されました。')
def event2():
print('イベント2が実行されました。')
scheduler = sched.scheduler()
scheduler.enter(1, 2, event1, priority=1) # 優先順位1
scheduler.enter(2, 1, event2, priority=2) # 優先順位2
scheduler.run()
イベント2が実行されました。
イベント1が実行されました。
特定の時間にイベントを実行
この例では、enterabs() メソッドを使用して、特定の時間にイベントを実行します。
import sched
import time
def event1():
print('イベント1が実行されました。')
scheduler = sched.scheduler()
scheduler.enterabs(time.time() + 5, 1, event1) # 5秒後に実行
scheduler.run()
このプログラムを実行すると、5秒後に「イベント1が実行されました。」という出力が出力されます。
無限ループでイベントを繰り返し実行
この例では、while ループを使用して、イベントを繰り返し実行します。
import sched
import time
def event1():
print('イベント1が実行されました。')
scheduler = sched.scheduler()
scheduler.enter(1, 1, event1)
while True:
scheduler.run()
time.sleep(1) # 1秒待つ
このプログラムを実行すると、「イベント1が実行されました。」という出力が1秒間隔で繰り返し出力されます。
キャンセルイベント
この例では、enter() メソッドでイベントを登録し、その後 cancel() メソッドでイベントをキャンセルします。
import sched
import time
def event1():
print('イベント1が実行されました。')
scheduler = sched.scheduler()
event_id = scheduler.enter(1, 1, event1)
print('イベントをキャンセルします。')
scheduler.cancel(event_id)
scheduler.run()
threading モジュール
threading モジュールは、Python 標準ライブラリに含まれるスレッド管理機能を提供します。このモジュールを用いることで、複数のタスクを別々のスレッドで実行することができます。sched.scheduler と比較すると、以下の利点があります。
- より複雑な並行処理に対応できる
- 真の並行処理を実現できる
一方、以下の点に注意する必要があります。
- デッドロックなどの問題が発生する可能性がある
- スレッド間の同期処理が必要になる
例
import threading
def task1():
print('タスク1が実行されました。')
def task2():
print('タスク2が実行されました。')
thread1 = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
asyncio モジュール
asyncio モジュールは、Python 3.5 以降で導入された非同期処理のためのライブラリです。このモジュールを用いることで、イベントループと呼ばれる制御機構に基づいて、非同期的に複数のタスクを実行することができます。threading モジュールと比較すると、以下の利点があります。
- コルーチンと呼ばれる軽量な並行処理機構を利用できる
- コードがよりシンプルで分かりやすい
- asyncio に対応していないライブラリやモジュールを使用できない
- threading モジュールよりも新しい機能であり、習得に時間がかかる
例
import asyncio
async def task1():
print('タスク1が実行されました。')
async def task2():
print('タスク2が実行されました。')
async def main():
await asyncio.gather(task1(), task2())
if __name__ == '__main__':
asyncio.run(main())
上記以外にも、様々な並行処理ライブラリが用意されています。
- Ray: 分散処理と機械学習に特化したフレームワーク
- Dask: 分散処理に特化したライブラリ
- Joblib: 高速な並行処理に特化したライブラリ
これらのライブラリは、それぞれ異なる機能や特性を持つため、具体的なニーズに合わせて選択する必要があります。