Python 並行処理:セマフォオブジェクトで共有リソースを安全に制御


Pythonにおける並行処理は、複数のタスクを同時に実行することで、プログラムのパフォーマンスと効率を向上させる強力なツールです。スレッドは、並行処理を実現するための主要なメカニズムの1つであり、複数のタスクを個別に実行できる軽量な実行単位を提供します。

しかし、複数のスレッドが共有リソースに同時にアクセスしようとすると、競合状態が発生する可能性があります。競合状態は、予期せぬ結果やデータ破損につながる可能性があるため、回避することが重要です。

セマフォオブジェクトとは

セマフォオブジェクトは、共有リソースへのアクセスを制御するために使用される同期プリミティブです。有限数のスレッドがリソースに同時にアクセスできるようにすることで、競合状態を回避します。

セマフォオブジェクトのしくみ

セマフォオブジェクトには、内部カウンターと2つの操作メソッド、acquire()release()があります。

  • release(): リソースの使用を完了したことを示します。カウンターがインクリメントされます。
  • acquire(): リソースへのアクセスを試みます。カウンターが0の場合は、スレッドはブロックされ、他のスレッドがrelease()を呼び出すまで待機します。カウンターが1以上の場合は、デクリメントされ、スレッドはリソースにアクセスできます。

セマフォオブジェクトの例

以下に、セマフォオブジェクトを使用して3つのスレッドがプリンタに排他的にアクセスできるようにする例を示します。

from threading import Semaphore

printer_semaphore = Semaphore(1)

def print_document(name):
    with printer_semaphore:
        print(f"{name} is printing a document.")
        time.sleep(2)

if __name__ == "__main__":
    thread1 = Thread(target=print_document, args=("Thread 1",))
    thread2 = Thread(target=print_document, args=("Thread 2",))
    thread3 = Thread(target=print_document, args=("Thread 3",))

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join()
    thread2.join()
    thread3.join()

この例では、printer_semaphoreというセマフォオブジェクトが作成され、初期値は1に設定されます。これは、一度に1つのスレッドしかプリンタにアクセスできないことを意味します。

各スレッドは、print_document()関数を実行します。この関数は、printer_semaphore.acquire()を呼び出してプリンタへのアクセスを取得しようとします。セマフォが利用可能であれば、スレッドはプリンタにアクセスし、ドキュメントを印刷します。その後、printer_semaphore.release()を呼び出してセマフォを解放します。

セマフォが利用できない場合、スレッドはprinter_semaphore.acquire()でブロックされます。他のスレッドがprinter_semaphore.release()を呼び出し、セマフォを解放すると、ブロックされたスレッドが実行されます。

セマフォオブジェクトの利点

  • デッドロックを回避する
  • コードをより読みやすく、理解しやすいようにする
  • 共有リソースへのアクセスを制御し、競合状態を回避する
  • セマフォオブジェクトの使用方法を誤ると、デッドロックが発生する可能性があります。
  • セマフォオブジェクトはスレッドセーフではありません。複数のスレッドから同じセマフォオブジェクトにアクセスする場合は、ロックを使用して保護する必要があります。


限られた数のリソースへのアクセスを制御

この例では、4つのスレッドがそれぞれ10回、共有リストに要素を追加しようとします。セマフォオブジェクトを使用して、一度に1つのスレッドしかリストにアクセスできるように制限します。

from threading import Semaphore, Thread

def add_to_list(semaphore, shared_list):
    with semaphore:
        for i in range(10):
            shared_list.append(i)

shared_list = []
semaphore = Semaphore(1)

threads = []
for i in range(4):
    thread = Thread(target=add_to_list, args=(semaphore, shared_list))
    threads.append(thread)

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

print(shared_list)

このコードを実行すると、shared_listには0から39までの要素が10回ずつ、合計40個出力されます。セマフォオブジェクトにより、一度に1つのスレッドしかリストにアクセスできないため、要素が重複することはありません。

複数のタスクの完了を待機

この例では、3つのスレッドがそれぞれ別々のタスクを実行し、すべてのタスクが完了するのを待機してからメインスレッドが処理を続行します。

from threading import Semaphore, Thread

def task(semaphore, event, name):
    with semaphore:
        print(f"{name} is starting its task.")
        time.sleep(2)
        print(f"{name} has completed its task.")
        event.set()

event = Semaphore(0)
semaphore = Semaphore(3)

threads = []
for i in range(3):
    thread = Thread(target=task, args=(semaphore, event, f"Thread {i + 1}"))
    threads.append(thread)

for thread in threads:
    thread.start()

for thread in threads:
    event.wait()

print("All tasks have been completed.")

このコードを実行すると、以下の出力が表示されます。

Thread 1 is starting its task.
Thread 2 is starting its task.
Thread 3 is starting its task.
Thread 1 has completed its task.
Thread 2 has completed its task.
Thread 3 has completed its task.
All tasks have been completed.

各スレッドはtask()関数を呼び出し、2秒後に完了します。event.set()が呼び出されると、メインスレッドはevent.wait()でブロックされ、すべてのタスクが完了するまで待機します。

この例では、プロデューサーとコンシューマースレッド間でバッファ付きキューを共有するためにセマフォオブジェクトを使用します。プロデューサーはアイテムをキューに追加し、コンシューマはキューからアイテムを消費します。

from threading import Semaphore, Thread, Queue

def producer(semaphore, queue):
    with semaphore:
        for i in range(10):
            queue.put(i)
            print(f"Produced item: {i}")
            time.sleep(1)

def consumer(queue):
    while True:
        try:
            item = queue.get(block=True, timeout=1)
            print(f"Consumed item: {item}")
        except queue.Empty:
            break

semaphore = Semaphore(2)
queue = Queue(maxsize=3)

producer_thread = Thread(target=producer, args=(semaphore, queue))
consumer_thread = Thread(target=consumer, args=(queue,))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()
Produced item: 0
Consumed item: 0
Produced item: 1
Consumed item: 1
Produced item: 2
Consumed item: 2
Produced item: 3
Consumed item: 3
Produced item: 4
Produced item: 5
Produced item: 6
Produced item: 7
Produced item: 8
Produced item: 9
Consumed item: 4
Consumed item: 5
Consumed item: 6
Consumed item: 7
Consumed item: 8
Consumed item: 9


代替方法

  1. ロック

ロックは、共有リソースへの排他アクセスを制御するための最も基本的な同期プリミティブです。threading.Lockオブジェクトを使用して、コードのクリティカルセクションを保護することができます。

利点

  • 軽量で効率的
  • シンプルで使いやすい

欠点

  • 優先順位制御ができない
  • デッドロックが発生しやすい
  1. イベント

イベントは、1つのスレッドがタスクを完了したことを他のスレッドに通知するために使用されます。threading.Eventオブジェクトを使用して、イベントを待機したり、シグナルを送信したりすることができます。

利点

  • 複数のスレッドがイベントを待機できる
  • デッドロックが発生しにくい

欠点

  • 共有リソースへの排他アクセスを制御できない
  • ロックほど効率的ではない
  1. 条件変数

条件変数は、ロックとイベントを組み合わせた機能を提供します。threading.Conditionオブジェクトを使用して、共有リソースへのアクセスを制御し、イベントを待機することができます。

利点

  • 複雑な同期要件に対応できる
  • ロックとイベントの利点を組み合わせている

欠点

  • 誤った使用はデッドロックにつながる
  • ロックやイベントよりも複雑で理解しにくい
  1. キュー

キューは、スレッド間でデータをやり取りするための同期構造体です。queue.Queueモジュールを使用して、スレッドセーフなキューを作成し、アイテムを追加したり取り出したりすることができます。

利点

  • バッファリング機能を提供する
  • スレッド間でデータを効率的にやり取りできる

欠点

  • 共有リソースへの排他アクセスを制御できない
  1. メッセージング

メッセージングは、異なるスレッド間でメッセージをやり取りするための非同期通信メカニズムです。multiprocessing.Queueモジュールを使用して、プロセス間でキューを作成し、メッセージを送受信することができます。

利点

  • 非同期通信が可能
  • 異なるプロセス間でデータをやり取りできる

欠点

  • スレッド間同期には直接使用できない

セマフォオブジェクトは、スレッド間同期を実現するための汎用的なツールですが、状況によっては代替方法の方が適している場合があります。

  • スレッド間でデータをやり取りする必要がある場合は、キューまたはメッセージングが適しています。
  • デッドロックが発生しやすい状況を回避したい場合は、イベントまたは条件変数が適しています。
  • シンプルで効率的な同期が必要な場合は、ロックが適しています。