【Python】スレッド同時実行を安全に制御! `threading.Semaphore.acquire()` のしくみとサンプルコード


共有リソースとデータ競合

マルチスレッドプログラミングにおいて、複数のスレッドが同時に同じ共有リソースにアクセスしようとすると、データ競合が発生する可能性があります。これは、スレッドが互いの変更を上書きしたり、不完全なデータを読み込んだりするなど、予期せぬ結果につながる可能性があります。

セマフォによる保護

threading.Semaphoreは、この問題を解決するための同期メカニズムを提供します。セマフォは、許可された最大アクセス数を表す内部カウンターを保持します。スレッドが共有リソースにアクセスするには、まずacquire()メソッドを呼び出してセマフォから許可を得る必要があります。

acquire()メソッドは、以下の処理を行います。

  1. セマフォのカウンターをデクリメントします。
  2. カウンターが0の場合は、呼び出しスレッドをブロックし、他のスレッドがrelease()を呼び出すまで待機します。
  3. カウンターが0より大きい場合は、呼び出しスレッドがリソースにアクセスできるようにします。

リソースへのアクセスが完了したら、スレッドはrelease()メソッドを呼び出してセマフォのカウンターをインクリメントする必要があります。これにより、他のスレッドがリソースにアクセスできるようになります。

import threading

semaphore = threading.Semaphore(max_value=1)

def access_resource():
    # 共有リソースにアクセスする処理
    with semaphore:
        # セマフォで保護された領域
        pass

# 複数のスレッドから `access_resource` 関数を呼び出す
for _ in range(10):
    thread = threading.Thread(target=access_resource)
    thread.start()

この例では、max_value=1を指定して、一度に1つのスレッドだけがaccess_resource関数を実行できるようにしています。withステートメントを使用すると、acquire()release()の呼び出しを自動的に処理できます。



import threading
import time

def access_resource(num):
    print(f"スレッド {num} がリソースにアクセスしています")
    time.sleep(1)
    print(f"スレッド {num} がリソースから解放されました")

semaphore = threading.Semaphore(max_value=2)

for i in range(5):
    thread = threading.Thread(target=access_resource, args=(i,))
    thread.start()

# すべてのスレッドが完了するのを待つ
for thread in threading.active_count():
    thread.join()

このコードでは、access_resource関数がsemaphoreを使用して共有リソースへのアクセスを制御します。最大2つのスレッドが同時にリソースにアクセスできるように設定されています。

各スレッドは、access_resource関数を呼び出され、そのスレッド番号とランダムな待ち時間を引数として渡されます。関数内では、スレッド番号とメッセージを出力し、1秒間スリープしてから、再度メッセージを出力します。

メインスレッドは、5つのスレッドを作成して開始し、その後、すべてのスレッドが完了するのを待機してから終了します。

この例は、threading.Semaphore.acquire()を使用して、共有リソースへのアクセスを制限し、スレッド間で競合が発生しないようにする方法を示しています。

以下のコードは、threading.Semaphore.acquire()を使用して、プリンタへの排他アクセスを制御する例です。

import threading
import time

def print_document(name):
    print(f"{name} が印刷を開始しました")
    time.sleep(1.5)
    print(f"{name} の印刷が完了しました")

semaphore = threading.Semaphore(max_value=1)

documents = ["文書1.txt", "文書2.pdf", "文書3.docx"]

for document in documents:
    thread = threading.Thread(target=print_document, args=(document,))
    thread.start()

# すべてのスレッドが完了するのを待つ
for thread in threading.active_count():
    thread.join()

この例では、print_document関数がsemaphoreを使用してプリンタへのアクセスを制御します。一度に1つのドキュメントしか印刷できないように設定されています。

各スレッドは、print_document関数を呼び出され、ドキュメント名を引数として渡されます。関数内では、ドキュメント名とメッセージを出力し、1.5秒間スリープしてから、再度メッセージを出力します。

メインスレッドは、documentsリスト内の各ドキュメントに対してスレッドを作成して開始し、その後、すべてのスレッドが完了するのを待機してから終了します。



ロックオブジェクト

ロックオブジェクトは、共有リソースへの排他アクセスを制御するためのもう1つの一般的な方法です。threading.Lockmultiprocessing.Lockなどのロックオブジェクトを使用できます。

長所

  • セmaphoreよりも軽量
  • シンプルで使いやすい

短所

  • 優先順位付けの機能がない: ロックを取得するスレッドの順序を制御できない
  • 再入可能ではない: すでにロックを持っているスレッドが再度ロックしようとすると、デッドロックが発生する可能性がある


import threading

lock = threading.Lock()

def access_resource():
    with lock:
        # 共有リソースにアクセスする処理
        pass

# 複数のスレッドから `access_resource` 関数を呼び出す
for _ in range(10):
    thread = threading.Thread(target=access_resource)
    thread.start()

イベント

イベントは、スレッド間の通信に使用できる同期プリミティブです。スレッドは、イベントを待機したり、シグナルを送信したりできます。

長所

  • 優先順位付けが可能: イベントを待機するスレッドの順序を制御できる
  • スレッド間の通信に使用できる

短所

  • 共有リソースへの排他アクセスを制御するには、追加のロックメカニズムが必要


import threading

event = threading.Event()

def access_resource():
    event.wait()
    # 共有リソースにアクセスする処理
    event.clear()

# 1つのスレッドがリソースにアクセスできるようにする
resource_access_thread = threading.Thread(target=access_resource)
resource_access_thread.start()

# 他のスレッドがリソースにアクセスできるようにシグナルを送信する
time.sleep(2)
event.set()

# すべてのスレッドが完了するのを待つ
resource_access_thread.join()

条件変数

条件変数は、ロックオブジェクトとイベントを組み合わせた機能を提供します。共有リソースへの排他アクセスを制御し、スレッド間の通信に使用できます。

長所

  • 優先順位付けが可能
  • スレッド間の通信に使用できる
  • 共有リソースへの排他アクセスを制御できる

短所

  • ロックオブジェクトとイベントよりも複雑


import threading

condition = threading.Condition()

def access_resource():
    with condition:
        condition.wait_for(lambda: resource_available)
        # 共有リソースにアクセスする処理
        resource_available = False
        condition.notify_all()

resource_available = True

# 複数のスレッドから `access_resource` 関数を呼び出す
for _ in range(10):
    thread = threading.Thread(target=access_resource)
    thread.start()

キュー

キューは、スレッド間でデータをやり取りするためのもう1つの方法です。スレッドは、キューにアイテムを追加したり、キューからアイテムを取り出したりできます。

長所

  • 優先順位付けが可能
  • スレッド間でデータをやり取りするのに便利

短所

  • 共有リソースへの排他アクセスを制御するには、追加のロックメカニズムが必要
import threading
import queue

q = queue.Queue()

def access_resource():
    while True:
        try:
            item = q.get(block=True)
            # 共有リソースにアクセスする処理
        except queue.Empty:
            break

# キューにアイテムを追加する
for i in range(10):
    q.put(i)

# 複数のスレッドから `access_resource` 関数を呼び出す
for _ in range(10):
    thread = threading.Thread(target=access_resource)
    thread.start()