【Python】スレッド同時実行を安全に制御! `threading.Semaphore.acquire()` のしくみとサンプルコード
共有リソースとデータ競合
マルチスレッドプログラミングにおいて、複数のスレッドが同時に同じ共有リソースにアクセスしようとすると、データ競合が発生する可能性があります。これは、スレッドが互いの変更を上書きしたり、不完全なデータを読み込んだりするなど、予期せぬ結果につながる可能性があります。
セマフォによる保護
threading.Semaphore
は、この問題を解決するための同期メカニズムを提供します。セマフォは、許可された最大アクセス数を表す内部カウンターを保持します。スレッドが共有リソースにアクセスするには、まずacquire()
メソッドを呼び出してセマフォから許可を得る必要があります。
acquire()
メソッドは、以下の処理を行います。
- セマフォのカウンターをデクリメントします。
- カウンターが0の場合は、呼び出しスレッドをブロックし、他のスレッドが
release()
を呼び出すまで待機します。 - カウンターが0より大きい場合は、呼び出しスレッドがリソースにアクセスできるようにします。
リソースへのアクセスが完了したら、スレッドはrelease()
メソッドを呼び出してセマフォのカウンターをインクリメントする必要があります。これにより、他のスレッドがリソースにアクセスできるようになります。
import threading
semaphore = threading.Semaphore(max_value=1)
def access_resource():
# 共有リソースにアクセスする処理
with semaphore:
# セマフォで保護された領域
pass
# 複数のスレッドから `access_resource` 関数を呼び出す
for _ in range(10):
thread = threading.Thread(target=access_resource)
thread.start()
この例では、max_value=1
を指定して、一度に1つのスレッドだけがaccess_resource
関数を実行できるようにしています。with
ステートメントを使用すると、acquire()
とrelease()
の呼び出しを自動的に処理できます。
import threading
import time
def access_resource(num):
print(f"スレッド {num} がリソースにアクセスしています")
time.sleep(1)
print(f"スレッド {num} がリソースから解放されました")
semaphore = threading.Semaphore(max_value=2)
for i in range(5):
thread = threading.Thread(target=access_resource, args=(i,))
thread.start()
# すべてのスレッドが完了するのを待つ
for thread in threading.active_count():
thread.join()
このコードでは、access_resource
関数がsemaphore
を使用して共有リソースへのアクセスを制御します。最大2つのスレッドが同時にリソースにアクセスできるように設定されています。
各スレッドは、access_resource
関数を呼び出され、そのスレッド番号とランダムな待ち時間を引数として渡されます。関数内では、スレッド番号とメッセージを出力し、1秒間スリープしてから、再度メッセージを出力します。
メインスレッドは、5つのスレッドを作成して開始し、その後、すべてのスレッドが完了するのを待機してから終了します。
この例は、threading.Semaphore.acquire()
を使用して、共有リソースへのアクセスを制限し、スレッド間で競合が発生しないようにする方法を示しています。
以下のコードは、threading.Semaphore.acquire()
を使用して、プリンタへの排他アクセスを制御する例です。
import threading
import time
def print_document(name):
print(f"{name} が印刷を開始しました")
time.sleep(1.5)
print(f"{name} の印刷が完了しました")
semaphore = threading.Semaphore(max_value=1)
documents = ["文書1.txt", "文書2.pdf", "文書3.docx"]
for document in documents:
thread = threading.Thread(target=print_document, args=(document,))
thread.start()
# すべてのスレッドが完了するのを待つ
for thread in threading.active_count():
thread.join()
この例では、print_document
関数がsemaphore
を使用してプリンタへのアクセスを制御します。一度に1つのドキュメントしか印刷できないように設定されています。
各スレッドは、print_document
関数を呼び出され、ドキュメント名を引数として渡されます。関数内では、ドキュメント名とメッセージを出力し、1.5秒間スリープしてから、再度メッセージを出力します。
メインスレッドは、documents
リスト内の各ドキュメントに対してスレッドを作成して開始し、その後、すべてのスレッドが完了するのを待機してから終了します。
ロックオブジェクト
ロックオブジェクトは、共有リソースへの排他アクセスを制御するためのもう1つの一般的な方法です。threading.Lock
やmultiprocessing.Lock
などのロックオブジェクトを使用できます。
長所
- セmaphoreよりも軽量
- シンプルで使いやすい
短所
- 優先順位付けの機能がない: ロックを取得するスレッドの順序を制御できない
- 再入可能ではない: すでにロックを持っているスレッドが再度ロックしようとすると、デッドロックが発生する可能性がある
例
import threading
lock = threading.Lock()
def access_resource():
with lock:
# 共有リソースにアクセスする処理
pass
# 複数のスレッドから `access_resource` 関数を呼び出す
for _ in range(10):
thread = threading.Thread(target=access_resource)
thread.start()
イベント
イベントは、スレッド間の通信に使用できる同期プリミティブです。スレッドは、イベントを待機したり、シグナルを送信したりできます。
長所
- 優先順位付けが可能: イベントを待機するスレッドの順序を制御できる
- スレッド間の通信に使用できる
短所
- 共有リソースへの排他アクセスを制御するには、追加のロックメカニズムが必要
例
import threading
event = threading.Event()
def access_resource():
event.wait()
# 共有リソースにアクセスする処理
event.clear()
# 1つのスレッドがリソースにアクセスできるようにする
resource_access_thread = threading.Thread(target=access_resource)
resource_access_thread.start()
# 他のスレッドがリソースにアクセスできるようにシグナルを送信する
time.sleep(2)
event.set()
# すべてのスレッドが完了するのを待つ
resource_access_thread.join()
条件変数
条件変数は、ロックオブジェクトとイベントを組み合わせた機能を提供します。共有リソースへの排他アクセスを制御し、スレッド間の通信に使用できます。
長所
- 優先順位付けが可能
- スレッド間の通信に使用できる
- 共有リソースへの排他アクセスを制御できる
短所
- ロックオブジェクトとイベントよりも複雑
例
import threading
condition = threading.Condition()
def access_resource():
with condition:
condition.wait_for(lambda: resource_available)
# 共有リソースにアクセスする処理
resource_available = False
condition.notify_all()
resource_available = True
# 複数のスレッドから `access_resource` 関数を呼び出す
for _ in range(10):
thread = threading.Thread(target=access_resource)
thread.start()
キュー
キューは、スレッド間でデータをやり取りするためのもう1つの方法です。スレッドは、キューにアイテムを追加したり、キューからアイテムを取り出したりできます。
長所
- 優先順位付けが可能
- スレッド間でデータをやり取りするのに便利
短所
- 共有リソースへの排他アクセスを制御するには、追加のロックメカニズムが必要
import threading
import queue
q = queue.Queue()
def access_resource():
while True:
try:
item = q.get(block=True)
# 共有リソースにアクセスする処理
except queue.Empty:
break
# キューにアイテムを追加する
for i in range(10):
q.put(i)
# 複数のスレッドから `access_resource` 関数を呼び出す
for _ in range(10):
thread = threading.Thread(target=access_resource)
thread.start()