Pythonマルチプロセスでデータ共有!SyncManager徹底解説
Pythonの multiprocessing
モジュールは、スレッドではなくプロセス(OSのプロセス)を使って並列処理を実現します。これにより、PythonのGlobal Interpreter Lock (GIL) の制約を回避し、複数のCPUコアをフル活用することができます。しかし、プロセスはそれぞれ独立したメモリ空間を持つため、直接的にデータを共有することはできません。
そこで登場するのが Manager
です。SyncManager
は、Manager
の一種であり、プロセス間で共有される特定の種類のオブジェクト(リスト、辞書、キュー、ロック、セマフォなど)を生成し、管理するためのサーバープロセスを起動します。
SyncManager の主な役割と特徴
-
- 通常、プロセス間でデータをやり取りするには、キューやパイプなどの明示的なプロセス間通信 (IPC) メカニズムを使用する必要があります。
SyncManager
を使うと、共有されるデータ構造(例:list
、dict
)をマネージャーが管理するサーバープロセス上に作成できます。これらのオブジェクトは、あたかも通常のPythonオブジェクトであるかのように、複数のプロセスからアクセス・変更できます。- マネージャーが提供する共有オブジェクトは、内部的に同期メカニズムを備えているため、複数のプロセスから同時にアクセスしてもデータの一貫性が保たれます。例えば、
Manager().list()
で作成したリストは、複数のプロセスから要素の追加や削除を行っても競合状態が発生しにくいように設計されています。
-
GILの回避
multiprocessing
モジュール自体がGILを回避するためにプロセスを使用しますが、SyncManager
はさらに、共有されるオブジェクトに対する操作がマネージャーサーバーで行われることで、それぞれのクライアントプロセスがGILの制約を受けずに並列処理を進めることができます。
-
サーバープロセス
SyncManager
のインスタンスを作成すると、バックグラウンドで独立したサーバープロセスが起動します。このサーバープロセスが、共有されるオブジェクトの実体を保持し、各クライアントプロセスからのリクエストに応じて操作を実行します。- クライアントプロセスは、このサーバープロセスと通信することで、共有オブジェクトにアクセスします。
-
オブジェクトのプロキシ化
- クライアントプロセスが
SyncManager
を通じて共有オブジェクトを取得すると、実際にはそのオブジェクトの「プロキシ」が返されます。このプロキシを通じて行われた操作は、マネージャーサーバーに送信され、そこで実際のオブジェクトに対して実行されます。
- クライアントプロセスが
SyncManager で作成できる主な共有オブジェクトの例
Array()
: 配列を共有Value()
: 単一の値を共有Event()
: プロセス間で共有されるイベントオブジェクトSemaphore()
: プロセス間で共有されるセマフォLock()
: プロセス間で共有されるロック(排他制御)Queue()
: プロセス間で共有されるキュー(multiprocessing.Queue
とは異なり、マネージャーが管理する)dict()
: プロセス間で共有される辞書list()
: プロセス間で共有されるリスト
import multiprocessing
def worker(manager_list):
# マネージャーから提供された共有リストにアクセス
manager_list.append(multiprocessing.current_process().name)
print(f"プロセス {multiprocessing.current_process().name}: 現在のリストは {manager_list} です。")
if __name__ == "__main__":
# SyncManager のインスタンスを作成
with multiprocessing.Manager() as manager:
# マネージャーを通じて共有リストを作成
shared_list = manager.list()
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(shared_list,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"メインプロセス: 最終的な共有リストは {shared_list} です。")
この例では、manager.list()
を使ってプロセス間で共有されるリストを作成しています。各ワーカープロセスはこの共有リストに自分のプロセス名を追加し、メインプロセスで最終的なリストの内容を確認しています。通常の方法でリストを渡すとコピーが渡されるため、各プロセスが個別のリストを操作してしまいますが、SyncManager
を使うことで複数のプロセスが同じリストを共有し、変更を反映させることができます。
BrokenPipeError / ConnectionRefusedError (接続が切れた、接続拒否)
原因
- リソース不足
システムのメモリやファイルディスクリプタが不足している場合に発生することがあります。 - シリアライズできないオブジェクトを共有しようとした
マネージャーを通じて共有されるオブジェクトは、プロセス間で通信するためにpickle化可能である必要があります。pickle化できないオブジェクト(例: Lambda関数、ローカルに定義された関数など)を共有しようとするとエラーになることがあります。 - プロキシオブジェクトの参照が失われた
マネージャーが管理しているオブジェクトのプロキシが、クライアントプロセス側で参照されなくなり、マネージャー側でガーベッジコレクションされてしまうと、後からそのオブジェクトにアクセスしようとしたときにエラーになります。 - マネージャープロセスが予期せず終了した
共有オブジェクトを管理しているマネージャーサーバーが、何らかの理由でクラッシュしたり、明示的に終了されずにプロセスが終了したりした場合に発生します。
トラブルシューティング
- リソースの使用状況を確認する
システムのメモリ使用量や開いているファイルディスクリプタの数などを確認し、不足していないか確認します。 - マネージャーのサーバーのログを確認する
マネージャーサーバーが内部でエラーを発生させている可能性があります。デバッグレベルのログを有効にすることで、問題の原因を特定できる場合があります。 - 共有するオブジェクトを確認する
カスタムクラスのインスタンスなどを共有する場合は、そのクラスがpickle化可能であることを確認してください。通常、クラスが適切に定義されていれば問題ありませんが、例えばファイルハンドルなどのシステムリソースを含むオブジェクトは共有できない場合があります。 - プロセスを適切に join() する
ワーカープロセスが終了するまでメインプロセスが待機するように、process.join()
を呼び出すようにしてください。これにより、マネージャーが管理するオブジェクトのクリーンアップが適切に行われる機会が増えます。 - with ステートメントを使用する
SyncManager
をwith
ステートメントと組み合わせて使用することで、マネージャーのライフサイクルを適切に管理し、不要になったときに自動的にシャットダウンされるようにします。import multiprocessing def worker(shared_list): # 処理 if __name__ == "__main__": with multiprocessing.Manager() as manager: shared_list = manager.list() # プロセスを生成・開始 # ...
AttributeError: 'SyncManager' object has no attribute 'shutdown' (Context Managerに関するエラー)
原因
- これは古いPythonのバージョン(特にPython 2.x系や初期のPython 3.x系)で、
SyncManager
をwith
ステートメントで使用しようとした際に発生することがあった既知のバグでした。BaseManager
やそのサブクラスのコンテキストマネージャーの実装に問題がありました。
トラブルシューティング
- 手動で start() と shutdown() を呼び出す (非推奨)
もし古いバージョンを使用せざるを得ない場合は、with
ステートメントを使わずに、手動でmanager.start()
を呼び出し、処理が完了したらmanager.shutdown()
を呼び出す必要があります。ただし、これはエラーハンドリングが複雑になるため推奨されません。import multiprocessing if __name__ == "__main__": manager = multiprocessing.Manager() manager.start() try: shared_list = manager.list() # 処理 finally: manager.shutdown()
- Pythonのバージョンを更新する
最新のPythonバージョンではこの問題は修正されています。Python 3.6以降であれば問題なく動作するはずです。
RuntimeError: "Cannot pickle local object..." (ローカルオブジェクトのpickle化に関するエラー)
原因
multiprocessing
モジュールでは、プロセス間でデータをやり取りするためにオブジェクトをpickle化します。そのため、ラムダ関数、ネストされた関数、ローカルスコープで定義されたクラスのインスタンスなど、pickle化できないオブジェクトをマネージャーを通じて共有しようとするとこのエラーが発生します。
トラブルシューティング
- 共有する情報の種類を見直す
本当にオブジェクト全体を共有する必要があるのか、それとも必要なデータだけを渡せば良いのかを検討します。シンプルなデータ型(数値、文字列、タプル、リスト、辞書など)は基本的に問題なく共有できます。 - 共有するオブジェクトをモジュールレベルで定義する
関数やクラスをメインスクリプトのトップレベル(グローバルスコープ)で定義することで、pickle化可能になります。import multiprocessing # グローバルスコープで定義 def my_function(data): return data * 2 class MySharedObject: def __init__(self, value): self.value = value def get_value(self): return self.value def worker(manager_dict): # 共有オブジェクトにアクセス print(manager_dict['func'](5)) obj = manager_dict['obj'] print(obj.get_value()) if __name__ == "__main__": with multiprocessing.Manager() as manager: shared_dict = manager.dict() shared_dict['func'] = my_function # グローバル関数はOK shared_dict['obj'] = MySharedObject(10) # グローバルクラスのインスタンスもOK p = multiprocessing.Process(target=worker, args=(shared_dict,)) p.start() p.join()
デッドロック (Deadlock)
原因
- 共有オブジェクトに対する競合
マネージャーが管理するオブジェクト自体はスレッドセーフですが、そのオブジェクトに対する複数の操作が不可分な一連の処理として実行されることを期待する場合、別途ロックやセマフォを使って明示的に同期をとる必要があります。例えば、共有リストへの追加と削除を複数のプロセスが同時に行い、それぞれが別の処理を待機するとデッドロックに陥ることがあります。 - ロックの不適切な使用
複数のプロセスが異なる順序で複数のロックを獲得しようとしたり、ロックを獲得したまま別のリソースを待機したりすると、デッドロックが発生します。
トラブルシューティング
- キューやパイプの使用を検討する
複雑な同期が必要な場合は、Queue
やPipe
など、よりシンプルなプロセス間通信メカニズムで問題を解決できないかを検討します。 - 共有オブジェクトの操作をシンプルに保つ
可能な限り、共有オブジェクトに対する操作をアトミック(不可分)なものにし、複雑な複数ステップの操作は避けます。 - タイムアウトを設定する
Lock.acquire(timeout=...)
のように、ロックの取得にタイムアウトを設定することで、デッドロックが発生しても無限に待機するのを防ぎ、エラーとして検出できます。 - ロックの取得順序を統一する
複数のロックを使用する場合、すべてのプロセスで同じ順序でロックを獲得するようにします。
メモリリーク (Memory Leak) / メモリ使用量の増加
原因
- 大量のデータを共有オブジェクトに格納する
大量のデータを共有リストや共有辞書に格納し続けると、マネージャーサーバーのメモリ使用量が増大します。 - マネージャーが管理するオブジェクトの参照が残り続ける
クライアントプロセスでマネージャーから取得したプロキシオブジェクトの参照を削除しても、マネージャーサーバー側でオブジェクトが適切に解放されない場合があります。これは、マネージャーサーバーがオブジェクトに対する「参照」を内部的に保持し続けるためです。特に、マネージャーのオブジェクトをリストや辞書に格納し、後からそのリストや辞書から削除しても、マネージャー側の参照カウントが減らないケースが報告されています。
トラブルシューティング
- カスタムの BaseManager を使用し、参照管理を改善する
より高度なケースでは、BaseManager
を継承して独自のマネージャーを作成し、参照カウントのメカニズムをより細かく制御することが考えられます。ただし、これは複雑な作業になります。 - 共有データの量を制限する
大量のデータを共有する必要がある場合は、データを小さなチャンクに分割したり、データベースやファイルなどの永続的なストレージを使用したりするなど、代替手段を検討します。 - SyncManager._registry や _id_to_obj をデバッグ目的で確認する
Pythonの内部実装に関わる部分ですが、マネージャーサーバーが管理しているオブジェクトのリスト(_id_to_obj
など)をデバッグ時に確認することで、不要なオブジェクトが残っていないか調査できます。 - 不要になったオブジェクトを明示的に削除する
マネージャーから取得した共有オブジェクトが不要になったら、明示的にdel
で参照を削除したり、コンテナ(リストや辞書)から削除したりすることで、マネージャーの参照カウントが減ることを期待します。ただし、これが常に期待通りに機能するとは限りません。
__main__ ガードの欠如
原因
multiprocessing
モジュールを使用する際には、プロセスを起動するコードがif __name__ == "__main__":
ブロック内にある必要があります。これが不足していると、子プロセスがモジュールを再インポートする際に無限ループに陥ったり、予期せぬエラーが発生したりします。
トラブルシューティング
- 必ず if __name__ == "__main__": でメイン処理を囲む
これはmultiprocessing
を使用する上での基本的なルールです。import multiprocessing def worker(shared_data): # 処理 if __name__ == "__main__": with multiprocessing.Manager() as manager: shared_data = manager.dict() p = multiprocessing.Process(target=worker, args=(shared_data,)) p.start() p.join()
原因
SyncManager
は通常、同じマシン上のプロセス間で共有するために設計されていますが、ネットワーク越しにマネージャーサービスに接続することも可能です。この際、ファイアウォールの設定、ネットワークの不安定性、認証キーの不一致などが原因で接続が失敗したり、BrokenPipeError
が発生したりすることがあります。
トラブルシューティング
- セキュリティの考慮
ネットワーク越しにマネージャーサービスを公開する場合、セキュリティリスクが増大します。適切な認証、暗号化、アクセス制限を検討する必要があります。 - ネットワークの安定性
ネットワーク接続が不安定な場合、通信エラーが発生しやすくなります。 - 認証キーの一致
Manager()
のauthkey
引数を使用して認証キーを設定している場合、クライアントとサーバーの両方で同じキーを設定する必要があります。 - ファイアウォール設定の確認
マネージャーサーバーが使用するポートが、クライアント側からアクセス可能になっていることを確認します。
SyncManager
は強力ですが、プロセス間通信の複雑さを内包しています。エラーが発生した際は、以下の点を順に確認していくと良いでしょう。
if __name__ == "__main__":
ガードの確認with multiprocessing.Manager() as manager:
の使用- 共有するオブジェクトがpickle化可能であるか
- プロセスの
join()
が適切に行われているか - ロックやセマフォの使用方法(デッドロックの可能性)
- メモリ使用量の推移
例1: 共有リスト (manager.list()
) の使用
最も基本的な例で、複数のプロセスが共通のリストに要素を追加する様子を示します。
import multiprocessing
import time
def worker_add_to_list(shared_list, process_id):
"""
共有リストにプロセスIDを追加するワーカー関数
"""
print(f"プロセス {process_id}: 共有リストに {process_id} を追加します。")
shared_list.append(process_id)
time.sleep(0.1) # 少し待機して、並行実行をシミュレート
print(f"プロセス {process_id}: 追加後、共有リスト: {list(shared_list)}") # list() でコピーを取得して表示
if __name__ == "__main__":
print("--- 共有リスト (manager.list()) の例 ---")
# SyncManager のインスタンスを作成
# 'with' ステートメントを使うことで、マネージャーのライフサイクルを適切に管理できます。
with multiprocessing.Manager() as manager:
# マネージャーを通じて共有リストを作成
# このリストは、マネージャーサーバーによって管理され、複数のプロセスからアクセス可能です。
shared_list = manager.list()
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker_add_to_list, args=(shared_list, i))
processes.append(p)
p.start() # プロセスを開始
# すべてのワーカープロセスが終了するのを待つ
for p in processes:
p.join()
print(f"\nメインプロセス: すべてのプロセスが終了しました。")
print(f"メインプロセス: 最終的な共有リスト: {list(shared_list)}") # 共有リストの最終状態を表示
print("\n--- 例1 終了 ---")
解説
- 出力を見ると、異なるプロセスが同じリストに同時に要素を追加していることがわかります。
list(shared_list)
のように明示的にlist()
コンストラクタでコピーを作成しないと、プロキシオブジェクトがそのまま表示される場合があります。ここでは内容確認のためコピーを取得しています。manager.list()
で作成されたshared_list
は、各プロセスに「プロキシ」として渡されます。各プロセスがこのプロキシを通じてリストを操作すると、実際の操作はマネージャーサーバー上で行われます。multiprocessing.Manager()
をwith
ステートメントで使うことで、マネージャーサーバーが自動的に起動し、ブロックを抜けるときにシャットダウンされます。
例2: 共有辞書 (manager.dict()
) の使用
複数のプロセスが共通の辞書にキーと値を設定する例です。
import multiprocessing
import time
import random
def worker_update_dict(shared_dict, process_id):
"""
共有辞書にプロセスIDとランダムな値を設定するワーカー関数
"""
key = f"process_{process_id}"
value = random.randint(100, 999)
print(f"プロセス {process_id}: 辞書に '{key}': {value} を設定します。")
shared_dict[key] = value
time.sleep(0.1)
print(f"プロセス {process_id}: 設定後、共有辞書: {dict(shared_dict)}")
if __name__ == "__main__":
print("\n--- 共有辞書 (manager.dict()) の例 ---")
with multiprocessing.Manager() as manager:
# マネージャーを通じて共有辞書を作成
shared_dict = manager.dict()
processes = []
for i in range(4):
p = multiprocessing.Process(target=worker_update_dict, args=(shared_dict, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"\nメインプロセス: すべてのプロセスが終了しました。")
print(f"メインプロセス: 最終的な共有辞書: {dict(shared_dict)}")
print("\n--- 例2 終了 ---")
解説
dict(shared_dict)
で共有辞書のコピーを取得し、その時点の内容を確認しています。- 各プロセスは、自身のIDをキーとしてランダムな値を辞書に設定します。
manager.dict()
を使って共有辞書を作成しています。
例3: 共有ロック (manager.Lock()
) と共有値 (manager.Value()
) の使用
複数のプロセスが共有のカウンタをインクリメントする際、競合状態を避けるためにロックを使用する例です。
import multiprocessing
import time
def worker_increment_counter(shared_value, shared_lock, process_id):
"""
共有カウンタを安全にインクリメントするワーカー関数
"""
for _ in range(5):
# ロックを獲得
# 'with' ステートメントを使うことで、ロックの解放が自動的に行われます。
with shared_lock:
current_value = shared_value.value
print(f"プロセス {process_id}: ロック取得。現在値: {current_value}")
time.sleep(0.01) # ロックを保持したまま少し待機
shared_value.value = current_value + 1
print(f"プロセス {process_id}: ロック解放。新値: {shared_value.value}")
time.sleep(0.05) # ロック解放後、少し待機
if __name__ == "__main__":
print("\n--- 共有ロック (manager.Lock()) と共有値 (manager.Value()) の例 ---")
with multiprocessing.Manager() as manager:
# 共有整数値を作成 (初期値は0)
# 'i' は整数型 (integer) を意味します。
shared_counter = manager.Value('i', 0)
# 共有ロックを作成
shared_lock = manager.Lock()
processes = []
num_processes = 3
for i in range(num_processes):
p = multiprocessing.Process(target=worker_increment_counter,
args=(shared_counter, shared_lock, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"\nメインプロセス: すべてのプロセスが終了しました。")
# 期待される最終値は 3プロセス * 5回インクリメント = 15
print(f"メインプロセス: 最終的なカウンタ値: {shared_counter.value}")
print("\n--- 例3 終了 ---")
解説
- もしロックなしで実行した場合、最終的なカウンタ値は期待通りに15にならない可能性が高いです。
- 各プロセスは、
shared_lock
をwith
ステートメントで獲得してからshared_counter.value
をインクリメントします。これにより、複数のプロセスが同時にカウンタを更新しようとしてデータが壊れる(競合状態)ことを防ぎます。 manager.Lock()
は、共有リソースへのアクセスを排他的に制御するためのロックオブジェクトを生成します。manager.Value('i', 0)
は、型コード'i'
(整数) で初期値0
の共有可能な単一の値を生成します。
SyncManager
は、Pythonの標準のデータ構造だけでなく、ユーザー定義のクラスのインスタンスも共有できます。これには、マネージャーにカスタムのプロキシオブジェクトを登録する必要があります。
注意
カスタムオブジェクトを共有する場合、そのオブジェクトはpickle化可能である必要があります。また、プロキシを通じて行われる操作は、マネージャーサーバー上で実行されるため、パフォーマンスの考慮が必要です。
import multiprocessing
import time
# 共有したいカスタムクラス
class MyData:
def __init__(self, name, value):
self.name = name
self.value = value
self.updates = [] # 更新履歴
def increment(self, amount):
self.value += amount
self.updates.append(f"Incremented by {amount}")
def get_info(self):
return f"Name: {self.name}, Value: {self.value}, Updates: {self.updates}"
# カスタムクラスのプロキシを登録するための関数
# マネージャーが認識できるように、クラス名と操作を登録する
def register_my_data(manager):
# 'MyData' という型名を指定し、MyData クラスと、そのメソッドのリストを登録
manager.register('MyData', MyData, MyData.increment, MyData.get_info)
def worker_update_custom_object(shared_data_proxy, process_id):
"""
共有カスタムオブジェクトを更新するワーカー関数
"""
print(f"プロセス {process_id}: 共有オブジェクトの現在: {shared_data_proxy.get_info()}")
time.sleep(0.1)
shared_data_proxy.increment(process_id + 1)
print(f"プロセス {process_id}: 更新後: {shared_data_proxy.get_info()}")
time.sleep(0.1)
if __name__ == "__main__":
print("\n--- カスタムオブジェクトの共有 (Proxy オブジェクトの登録) の例 ---")
# BaseManager を継承してカスタムマネージャーを作成
# このマネージャーは、MyData オブジェクトを管理できるようにする
class MyCustomManager(multiprocessing.managers.BaseManager):
pass
# カスタムマネージャーに MyData を登録
MyCustomManager.register('MyData', MyData)
# カスタムマネージャーのインスタンスを作成
with MyCustomManager() as manager:
# マネージャーを通じてカスタムオブジェクトを作成
# ここで MyData() のコンストラクタがマネージャーサーバー上で呼び出される
shared_custom_object = manager.MyData("Shared Item", 100)
processes = []
num_processes = 3
for i in range(num_processes):
p = multiprocessing.Process(target=worker_update_custom_object,
args=(shared_custom_object, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"\nメインプロセス: すべてのプロセスが終了しました。")
print(f"メインプロセス: 最終的なカスタムオブジェクトの状態: {shared_custom_object.get_info()}")
print("\n--- 例4 終了 ---")
- ワーカープロセスは、このプロキシを通じて
increment
やget_info
といったメソッドを呼び出すことができます。これらのメソッドの実行はマネージャーサーバーで行われます。 manager.MyData(...)
のように、登録した名前でカスタムオブジェクトのインスタンスをマネージャーサーバー上に作成できます。- MyCustomManager.register('MyData', MyData)
ここが重要です。register
メソッドを使って、マネージャーがMyData
という名前でMyData
クラスのインスタンスを生成・管理できるように登録します。第2引数は、インスタンス化する実際のクラスです。 - MyCustomManager クラス
multiprocessing.managers.BaseManager
を継承して、カスタムマネージャーを作成します。 - MyData クラス
共有したいカスタムデータ構造を定義します。
ここでは、SyncManager
の主な代替方法をいくつか紹介します。
キュー (multiprocessing.Queue)
利点
- スケーラビリティ
多くのケースで、SyncManager
よりも高いスループットを達成できます。 - デッドロックの回避
適切な使用により、共有メモリにおけるデッドロックの問題を回避しやすいです。 - 堅牢性
プロセスがクラッシュした場合でも、キューは比較的安全に処理されます。 - シンプルさ
メッセージングモデルは直感的で理解しやすいです。
欠点
- データのコピー
キューを通じてデータを渡す際、通常はデータがシリアライズされてコピーされるため、非常に大きなデータを頻繁にやり取りする場合にはオーバーヘッドが生じます。 - 複雑なデータ構造の共有が難しい
共有オブジェクトを直接変更するのではなく、データを「送受信」するモデルのため、複雑な状態を持つオブジェクトを継続的に同期させるのには向きません。
使用例
タスクの配布や結果の収集など、メッセージパッシングに適しています。
import multiprocessing
import time
def worker_queue(task_queue, result_queue):
while True:
task = task_queue.get() # タスクを取得
if task is None: # 終了シグナル
break
print(f"プロセス {multiprocessing.current_process().name}: タスク {task} を処理中...")
time.sleep(0.1) # 処理のシミュレーション
result_queue.put(task * 2) # 結果を返す
if __name__ == "__main__":
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker_queue, args=(task_queue, result_queue))
processes.append(p)
p.start()
# タスクを投入
for i in range(10):
task_queue.put(i)
# 終了シグナルを送信
for _ in range(3):
task_queue.put(None)
# 結果を収集
results = []
for _ in range(10):
results.append(result_queue.get())
for p in processes:
p.join()
print(f"収集された結果: {sorted(results)}")
パイプ (multiprocessing.Pipe)
利点
- 双方向通信
Pipe(duplex=True)
で双方向パイプを作成できます。 - シンプルで高速
非常に低レベルなIPCメカニズムであり、直接的なデータ転送に適しています。
欠点
- データのコピー
キューと同様に、データはコピーされます。 - 2つのプロセス間の通信
2つ以上のプロセス間の通信には向いていません(多対多の通信にはキューの方が適しています)。
使用例
親プロセスが子プロセスにコマンドを送り、子プロセスが結果を返すような、1対1の通信に適しています。
import multiprocessing
import time
def worker_pipe(conn):
while True:
try:
msg = conn.recv() # メッセージを受信
if msg == "終了":
break
print(f"プロセス {multiprocessing.current_process().name}: メッセージ '{msg}' を受信")
conn.send(f"応答: {msg.upper()}") # 応答を送信
except EOFError: # パイプが閉じられた場合
break
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe() # 双方向パイプを作成
p = multiprocessing.Process(target=worker_pipe, args=(child_conn,))
p.start()
messages = ["Hello", "World", "Python", "終了"]
for msg in messages:
print(f"メインプロセス: '{msg}' を送信")
parent_conn.send(msg)
if msg != "終了":
response = parent_conn.recv()
print(f"メインプロセス: 応答 '{response}' を受信")
time.sleep(0.1)
p.join()
print("メインプロセス: 終了")
共有メモリ (multiprocessing.shared_memory) - Python 3.8以降
利点
- インプレース更新
共有メモリ上のデータを直接更新できます。 - 高速なデータアクセス
データをコピーする必要がないため、非常に大きなデータセットを扱う場合にパフォーマンス上の大きな利点があります。
欠点
- 複雑性
低レベルなAPIのため、使用がより複雑になります。 - 同期メカニズムが必要
共有メモリを安全に利用するためには、別途ロックやセマフォなどの同期プリミティブ(multiprocessing.Lock
,multiprocessing.Semaphore
など)を使用して、競合状態を避ける必要があります。SyncManager
のように、共有データ構造自体に同期機能が組み込まれているわけではありません。 - 生のバイト列
基本的に生バイト列を扱います。NumPy配列のような高レベルなデータ構造を扱うには、別途ラッパーやアライメントの考慮が必要です。
使用例
大規模なNumPy配列や画像データなど、共有・更新する必要がある場合に非常に有効です。
import multiprocessing
from multiprocessing import shared_memory
import numpy as np
import time
def worker_shm(shm_name, shape, dtype, lock):
# 共有メモリにアタッチ
existing_shm = shared_memory.SharedMemory(name=shm_name)
# NumPy配列として扱う
shared_array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
for _ in range(5):
with lock: # 共有メモリへのアクセスをロックで保護
current_sum = np.sum(shared_array)
print(f"プロセス {multiprocessing.current_process().name}: 現在の合計値: {current_sum}")
shared_array[:] += 1 # 配列の全要素をインクリメント
print(f"プロセス {multiprocessing.current_process().name}: インクリメント後: {np.sum(shared_array)}")
time.sleep(0.1)
existing_shm.close() # 共有メモリを切断
if __name__ == "__main__":
print("\n--- 共有メモリ (multiprocessing.shared_memory) の例 ---")
data_shape = (5, 5)
data_dtype = np.int32
initial_data = np.zeros(data_shape, dtype=data_dtype)
# 共有メモリブロックを作成
shm = shared_memory.SharedMemory(create=True, size=initial_data.nbytes)
# NumPy配列としてラップ
shared_array = np.ndarray(data_shape, dtype=data_dtype, buffer=shm.buf)
shared_array[:] = initial_data[:] # 初期データをコピー
lock = multiprocessing.Lock() # 共有メモリへのアクセスを保護するためのロック
processes = []
for i in range(2):
p = multiprocessing.Process(target=worker_shm,
args=(shm.name, data_shape, data_dtype, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"\nメインプロセス: 最終的な共有配列の合計値: {np.sum(shared_array)}")
# 共有メモリをクローズし、解放
shm.close()
shm.unlink() # 共有メモリブロックをOSから削除
print("\n--- 例3 終了 ---")
データベース (SQLite, Redis など)
利点
- 同時アクセス制御
多くのデータベースには、複数のクライアントからの同時アクセスを管理するための堅牢なメカニズムが組み込まれています。 - 言語非依存
Pythonだけでなく、他の言語のプロセスともデータを共有できます。 - 構造化データ
複雑なデータを構造化して管理できます。 - 永続性
プロセスが終了してもデータが失われません(インメモリDBを除く)。
欠点
- セットアップ
データベースのセットアップと管理が必要になる場合があります。 - オーバーヘッド
データベースへの接続、クエリの実行、結果の取得には、ネイティブなIPCよりも大きなオーバーヘッドが生じます。
使用例
複数のワーカープロセスが、共有の作業キューや結果ストアとしてデータベースを利用する。
ファイルシステム (ファイルロックなど)
利点
- 永続性
データはファイルとして永続的に保存されます。 - シンプル
最も基本的な共有方法の一つ。
欠点
- 競合状態の管理
ファイルロックが正しく実装されていないと、データの破損を招きやすいです。 - 複雑なデータ
構造化されたデータを扱うのが難しい場合があります。 - パフォーマンス
ディスクI/Oがボトルネックになるため、パフォーマンスは一般的に低いです。
使用例
設定ファイル、進捗状況のログ、非常にまれにしか更新されない共有データなど。
- 最もシンプルで基本的なデータ共有、または永続性が必要だがパフォーマンスが重視されない場合
ファイルシステム - データが永続的である必要があり、複数の異なるアプリケーションやシステムからのアクセスがある場合
データベース (SQLite, Redis, PostgreSQLなど) - 非常に大きなデータセット(NumPy配列など)を、コピーせずに高速に共有したい場合
multiprocessing.shared_memory
(別途同期が必要) - 複数のプロセス間で複雑なPythonオブジェクトを共有し、変更を同期させたい場合
multiprocessing.managers.SyncManager
(ただし、パフォーマンスとメモリオーバーヘッドに注意) - 1対1の親子プロセス通信
multiprocessing.Pipe
- 簡単なメッセージング、タスクキュー、結果収集
multiprocessing.Queue