Python multiprocessing.Pool.apply()徹底解説:同期処理の基本と活用

2025-05-27

multiprocessing.pool.Pool.apply()とは?

Pool.apply(func, args=(), kwds={})は、Pool内の単一のワーカープロセスで指定された関数funcを実行するためのメソッドです。

重要な特徴は以下の通りです。

  1. ブロッキング呼び出し: apply()メソッドを呼び出すと、指定された関数が完了し、結果が返されるまで現在のプロセス(呼び出し元のプロセス)はブロックされます。つまり、関数が終了するまで次の処理には進みません。

  2. 単一タスクの実行: apply()は、一度に一つのタスクを実行するために設計されています。複数の異なる引数で同じ関数を並列に実行したい場合は、後述するmap()apply_async()の方が適しています。

  3. 引数の指定:

    • func: 実行したい関数。
    • args: 関数に渡す位置引数をタプルで指定します。
    • kwds: 関数に渡すキーワード引数を辞書で指定します。

apply()の使用例

import multiprocessing
import time

def my_function(x, y):
    """時間がかかる処理をシミュレートする関数"""
    print(f"プロセス {multiprocessing.current_process().name}{x}{y} を処理中...")
    time.sleep(2) # 2秒待機
    return x * y

if __name__ == '__main__':
    # 3つのワーカープロセスを持つPoolを作成
    with multiprocessing.Pool(processes=3) as pool:
        print("apply()を呼び出します。関数が完了するまでブロックされます。")
        
        # my_functionを単一のプロセスで実行
        # この呼び出しは、my_functionが完了するまで待機します
        result = pool.apply(my_function, args=(5, 10))
        
        print(f"結果: {result}")
        print("apply()が完了し、次の処理に進みました。")

        print("\n別のapply()呼び出し(これもブロックされます)")
        result2 = pool.apply(my_function, args=(7, 3))
        print(f"2番目の結果: {result2}")

このコードを実行すると、pool.apply(my_function, args=(5, 10))が呼び出された後、my_functionが完了するまでプログラムの実行が一時停止し、結果が返されてから次のprint文が実行されることがわかります。

apply()はシンプルで直感的ですが、並列処理の恩恵を最大限に受けるためには、他の非ブロッキングメソッドや一括処理メソッドを理解することが重要です。

  • starmap(func, iterable, chunksize=None):

    • map()に似ていますが、iterableの各要素が関数の引数のタプルである場合に便利です。つまり、func(*args)のようにアンパックして引数を渡します。
  • map(func, iterable, chunksize=None):

    • ブロッキング: map()は、iterable内の各要素に対してfuncを実行し、すべての結果が揃うまでブロックされます。
    • データ並列処理: 多数の同じようなタスク(同じ関数を異なる入力データで実行する)を並列処理するのに最適です。Pythonの組み込みmap()関数に似ていますが、複数のプロセスで実行されます。
    • 結果の順序保証: 入力の順序と結果の順序が一致することが保証されます。
  • apply_async(func, args=(), kwds={}, callback=None, error_callback=None):

    • 非ブロッキング: apply_async()は関数をワーカープロセスに投入するとすぐに制御を返します。結果はAsyncResultオブジェクトとして返され、後からget()メソッドで取得できます。get()を呼び出すと、結果が利用可能になるまでブロックされます。
    • コールバック: 処理が完了したときに実行されるコールバック関数や、エラー発生時に実行されるコールバック関数を指定できます。
    • 用途: 個々のタスクを非同期に実行し、メインプロセスで他の処理を継続したい場合に適しています。


PicklingError: Can't pickle ... (Pickleできないオブジェクトのエラー)

これがmultiprocessingを使用する上で最も頻繁に遭遇するエラーの一つです。multiprocessingは、タスクをワーカープロセスに送信するために、関数やその引数、戻り値を「pickle化」(直列化)してプロセス間で通信します。もし、pickle化できないオブジェクト(例えば、ローカル関数、lambda関数、クラス内のインスタンスメソッドで、トップレベルに定義されていないものなど)を渡そうとすると、このエラーが発生します。

原因

  • クローズされたファイルハンドルやネットワーク接続など、pickleできないオブジェクトが引数に含まれている。
  • クラスのインスタンスメソッドを直接渡そうとしている(クラスのインスタンス自体がpickle可能である必要がある)。
  • lambda関数やネストされた関数を使用している。
  • 関数がグローバルスコープ(モジュールのトップレベル)に定義されていない。

解決策

  • 関数をモジュールのトップレベルに定義する: ほとんどの場合、これが最も簡単で推奨される解決策です。

<!-- end list -->

import multiprocessing

# グローバルスコープに定義された関数
def worker_function(data):
    return data * 2

if __name__ == '__main__':
    with multiprocessing.Pool(processes=2) as pool:
        result = pool.apply(worker_function, args=(10,))
        print(result) # 20
  • クラスのインスタンスメソッドを使用する場合: メソッドを持つクラスのインスタンスがpickle可能であること、そしてそのメソッドが呼び出せる形になっていることを確認してください。

  • Windows環境での注意: Windowsでは、multiprocessingforkではなくspawn方式でプロセスを起動します。このため、if __name__ == '__main__':ブロック内にマルチプロセシング関連のコードを記述することが必須です。そうしないと、子プロセスがメインモジュールを再インポートしようとしたときに無限ループに陥る可能性があります。

import multiprocessing

class MyProcessor:
    def __init__(self, multiplier):
        self.multiplier = multiplier

    def process(self, data):
        return data * self.multiplier

if __name__ == '__main__':
    processor = MyProcessor(5)
    with multiprocessing.Pool(processes=2) as pool:
        # クラスのインスタンスメソッドを渡す
        result = pool.apply(processor.process, args=(10,))
        print(result) # 50

この場合、processorインスタンスがpickle可能であることが前提です。

デッドロック(処理が停止する)

apply()はブロッキング呼び出しであるため、誤った使い方をするとデッドロックが発生することがあります。

原因

  • I/Oブロッキング: ワーカープロセス内の関数がネットワークI/OやディスクI/Oなどで長時間ブロックされると、apply()を呼び出したメインプロセスもブロックされ続けるため、全体の処理が停止したように見えることがあります。
  • ワーカープロセス内でさらにPoolを作成する: ワーカープロセス内でさらにPoolを作成しようとすると、デッドロックやリソース枯渇を引き起こす可能性が高いです。
  • Pooljoin()またはclose()を忘れる: withステートメントを使用しない場合、明示的にpool.close()を呼び出してワーカープロセスの終了をマークし、その後pool.join()を呼び出してすべてのワーカープロセスが完了するのを待つ必要があります。これを怠ると、プログラムが終了せず、ワーカープロセスがリソースを占有し続ける可能性があります。

解決策

  • withステートメントの使用: 最も推奨される方法です。with multiprocessing.Pool(...) as pool:を使用すると、pool.close()pool.join()が自動的に適切なタイミングで呼び出されるため、デッドロックのリスクが大幅に軽減されます。
import multiprocessing
import time

def my_function(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    # withステートメントを使用することで、close()とjoin()が自動で呼ばれる
    with multiprocessing.Pool(processes=2) as pool:
        print("apply()を呼び出し中...")
        result = pool.apply(my_function, args=(5,))
        print(f"結果: {result}")
    print("Poolが正常に終了しました。")
  • タイムアウトの設定: apply()にはタイムアウトの引数はありませんが、apply_async()にはtimeout引数があり、デッドロックが疑われる場合に特定の時間で処理を打ち切ることができます。
  • ネストされたPoolの回避: ワーカープロセス内で新たなPoolを作成するような設計は避けるべきです。

メモリ消費の増加

multiprocessingはプロセスベースの並列処理であるため、各ワーカープロセスは独立したメモリ空間を持ちます。

原因

  • ワーカープロセス内でのメモリリーク: ワーカープロセス内でメモリが解放されずに蓄積されると、全体のメモリ消費が増大します。
  • 大きなデータのコピー: メインプロセスからワーカープロセスに大きなデータを渡す場合、データはpickle化されてコピーされるため、メモリ使用量が大幅に増加する可能性があります。

解決策

  • データの分割: 大きなデータを小さなチャンクに分割し、それぞれのチャンクをワーカープロセスに渡すことを検討します。
  • ジェネレータやイテレータの使用: map()imap()のようなメソッドで大きなイテラブルを扱う際に、一度にすべてのデータをメモリにロードしないようにします。ただし、apply()は単一のタスクを処理するため、この恩恵は小さいです。
  • 共有メモリの使用: multiprocessing.shared_memorymultiprocessing.Arraymultiprocessing.Valueなどを使用して、プロセス間でデータを共有することで、メモリコピーを減らせます。

例外処理が難しい

ワーカープロセスで発生した例外は、メインプロセスに伝播されますが、デバッグが難しい場合があります。

原因

  • 例外のトレースバックが不完全な場合がある。
  • ワーカープロセスで発生した例外が、メインプロセスでそのまま表示されないことがある。

解決策

  • apply_async()error_callbackを使用する: apply_async()を使用している場合、エラーが発生したときに実行されるコールバック関数を指定できます。これにより、エラーの詳細を記録したり、適切に処理したりすることが可能です。apply()にはerror_callbackはありません。
  • try-exceptブロックでワーカー関数を囲む: ワーカー関数内で明示的に例外を捕捉し、ロギングを行うことで、問題の特定を容易にできます。

特にWindows環境では、multiprocessingのコードをif __name__ == '__main__':ブロックの中に記述することが必須です。

原因

  • LinuxなどのUnix系OSではforkを使用するため、通常は必須ではありませんが、コードの移植性を考慮すると常にこのブロックを使用することが推奨されます。
  • Windowsでは、新しいプロセスを起動する際に、元のスクリプト全体を再インポートします。このとき、if __name__ == '__main__':がないと、multiprocessing.Pool()の呼び出しが無限に繰り返され、エラーやプロセスが停止する原因となります。

解決策

  • すべてのmultiprocessing関連のコードを、このブロックの中に記述します。
import multiprocessing

def some_task(data):
    return data * 2

if __name__ == '__main__': # このブロックが重要!
    with multiprocessing.Pool(processes=2) as pool:
        result = pool.apply(some_task, args=(10,))
        print(result)


基本的な使用例

例1: 簡単な計算の実行

この例では、シンプルな計算を行う関数をapply()を使って実行します。

import multiprocessing
import time
import os

# ワーカープロセスで実行される関数
def calculate_square(number):
    """与えられた数値の二乗を計算し、プロセス情報を表示する"""
    process_id = os.getpid()
    process_name = multiprocessing.current_process().name
    print(f"プロセスID: {process_id}, プロセス名: {process_name} - {number} の二乗を計算中...")
    time.sleep(1) # 処理に時間がかかることをシミュレート
    return number * number

if __name__ == '__main__':
    print("メインプロセス開始...")
    
    # 2つのワーカープロセスを持つPoolを作成
    # 'with' ステートメントを使うことで、Poolが自動的に閉じられ、プロセスが結合されます
    with multiprocessing.Pool(processes=2) as pool:
        print("\n--- apply() 呼び出し1 ---")
        # apply()はブロッキングなので、この行で処理が一時停止します
        result1 = pool.apply(calculate_square, args=(5,))
        print(f"結果1: {result1}") # 25

        print("\n--- apply() 呼び出し2 ---")
        # 別のapply()呼び出し。これも前の処理が完了してから実行され、完了までブロックされます。
        result2 = pool.apply(calculate_square, args=(10,))
        print(f"結果2: {result2}") # 100

    print("\nメインプロセス終了。")

解説

  • 2回目のapply()呼び出しも同様にブロックされます。
  • pool.apply(calculate_square, args=(5,))は、calculate_square関数を引数5Pool内の利用可能なワーカープロセスの一つで実行します。この呼び出しは、calculate_square(5)が完了し、25という結果が返されるまで、メインプロセスをブロックします。
  • multiprocessing.Pool(processes=2)で、最大2つのワーカープロセスを持つプールを作成します。
  • if __name__ == '__main__':ブロックは、multiprocessingを使用する際に非常に重要です。特にWindows環境では必須であり、これがないと子プロセスがスクリプトを再インポートしようとして無限ループに陥る可能性があります。
  • calculate_square関数は、与えられた数値の二乗を計算し、どのプロセスで実行されたかを表示します。time.sleep(1)で、処理に時間がかかることをシミュレートしています。

より複雑な引数を渡す例

例2: 複数の引数とキーワード引数

apply()は、位置引数(タプル)とキーワード引数(辞書)の両方を受け取ることができます。

import multiprocessing
import time
import os

def process_data(value1, value2, operation="add"):
    """
    2つの値に対して指定された操作を実行する。
    デフォルトは加算。
    """
    process_id = os.getpid()
    process_name = multiprocessing.current_process().name
    print(f"プロセスID: {process_id}, プロセス名: {process_name} - {value1}{value2}{operation} を実行中...")
    time.sleep(0.5)

    if operation == "add":
        return value1 + value2
    elif operation == "multiply":
        return value1 * value2
    else:
        return "無効な操作"

if __name__ == '__main__':
    with multiprocessing.Pool(processes=3) as pool:
        print("\n--- apply() (位置引数のみ) ---")
        result1 = pool.apply(process_data, args=(10, 5))
        print(f"結果1 (加算): {result1}") # 15

        print("\n--- apply() (位置引数とキーワード引数) ---")
        # argsはタプル、kwdsは辞書
        result2 = pool.apply(process_data, args=(7, 3), kwds={"operation": "multiply"})
        print(f"結果2 (乗算): {result2}") # 21

        print("\n--- apply() (デフォルト操作) ---")
        result3 = pool.apply(process_data, args=(20, 8))
        print(f"結果3 (デフォルト加算): {result3}") # 28

解説

  • 3回目のapply()では、再度デフォルトの操作が適用されます。
  • 2回目のapply()では、args=(7, 3)に加えてkwds={"operation": "multiply"}を渡し、乗算を実行します。
  • 1回目のapply()では、args=(10, 5)のみを渡し、operationはデフォルトの"add"になります。
  • process_data関数は、2つの数値とoperationというキーワード引数を受け取ります。

クラスのインスタンスメソッドを呼び出す例

例3: クラスのインスタンスメソッドの使用

apply()は、トップレベルで定義された関数だけでなく、pickle可能なクラスのインスタンスメソッドも呼び出すことができます。

import multiprocessing
import time
import os

class Calculator:
    def __init__(self, offset):
        self.offset = offset
        print(f"Calculatorインスタンス作成 (オフセット: {self.offset})")

    def add_offset(self, number):
        """数値にオフセットを加えて返す"""
        process_id = os.getpid()
        process_name = multiprocessing.current_process().name
        print(f"プロセスID: {process_id}, プロセス名: {process_name} - {number}{self.offset} を加算中...")
        time.sleep(0.7)
        return number + self.offset

if __name__ == '__main__':
    print("メインプロセス開始...")
    
    my_calculator = Calculator(100) # メインプロセスでCalculatorインスタンスを作成

    with multiprocessing.Pool(processes=2) as pool:
        print("\n--- apply()でインスタンスメソッドを呼び出し ---")
        # my_calculator.add_offset メソッドを渡す
        # my_calculator オブジェクトがpickle化されて子プロセスに送られる
        result1 = pool.apply(my_calculator.add_offset, args=(50,))
        print(f"結果1: {result1}") # 150

        print("\n--- 別のインスタンスとapply() ---")
        another_calculator = Calculator(200)
        result2 = pool.apply(another_calculator.add_offset, args=(30,))
        print(f"結果2: {result2}") # 230

    print("\nメインプロセス終了。")

解説

  • 別のanother_calculatorインスタンスを作成し、それもapply()で実行できることを示しています。
  • pool.apply(my_calculator.add_offset, args=(50,))のように、インスタンスのメソッドを直接渡すことができます。このとき、my_calculatorインスタンス(およびその状態)がpickle化され、ワーカープロセスにコピーされます。
  • Calculatorクラスは、コンストラクタでoffsetを受け取り、add_offsetメソッドでそのオフセットを数値に加えます。

例4: ブロッキング動作の明確な確認

この例では、apply()が本当にブロッキングであるかを確認します。

import multiprocessing
import time

def long_running_task(duration):
    """指定された時間だけ処理を停止する関数"""
    print(f"ワーカープロセス開始: {duration}秒間スリープします...")
    time.sleep(duration)
    print(f"ワーカープロセス終了: {duration}秒間のスリープが完了しました。")
    return f"完了: {duration}秒"

if __name__ == '__main__':
    print("メインプロセス: Poolを初期化中...")
    with multiprocessing.Pool(processes=1) as pool: # プロセス数を1に設定
        print("メインプロセス: apply()を呼び出し中...")
        
        # このapply()呼び出しは、long_running_taskが完了するまでメインプロセスをブロックします
        start_time = time.time()
        result = pool.apply(long_running_task, args=(3,)) # 3秒間スリープするタスク
        end_time = time.time()
        
        print(f"メインプロセス: apply()が完了しました。結果: {result}")
        print(f"メインプロセス: 処理にかかった時間: {end_time - start_time:.2f}秒")

    print("メインプロセス: 全ての処理が終了しました。")
  • 出力を見ると、apply()の呼び出し後、long_running_taskが「ワーカープロセス終了」と表示するまで、メインプロセスの次のprint文が実行されないことがわかります。そして、処理にかかった時間が約3秒になることから、apply()がブロッキングであることが確認できます。
  • メインプロセスでapply()を呼び出す前と後にタイムスタンプを取り、その差を計算します。
  • long_running_taskは、指定された秒数だけスリープする関数です。


apply_async(): 非同期実行とコールバック

apply()がタスクの完了を待ってブロックするのに対し、apply_async()はタスクをワーカープロセスに投入するとすぐに制御を返します。これにより、メインプロセスは他の処理を継続しながら、並行して複数のタスクを実行できます。

特徴

  • 単一タスクの実行: apply()と同様に、一度に一つのタスクを実行します。
  • コールバック関数の指定: タスクが完了したときに自動的に呼び出されるcallback関数や、エラーが発生したときに呼び出されるerror_callback関数を指定できます。これにより、非同期処理の管理が容易になります。
  • AsyncResultオブジェクトを返す: 実行結果はすぐに利用可能にならず、AsyncResultオブジェクトが返されます。結果を取得するには、このオブジェクトのget()メソッドを呼び出す必要があります。get()は結果が利用可能になるまでブロックします。
  • 非ブロッキング: 関数が実行されている間、メインプロセスはブロックされません。

使用例

import multiprocessing
import time
import os

def my_task(number):
    """時間がかかるタスクのシミュレーション"""
    process_id = os.getpid()
    print(f"プロセスID: {process_id} - {number} の処理を開始...")
    time.sleep(2) # 2秒間スリープ
    result = number * number
    print(f"プロセスID: {process_id} - {number} の処理が完了しました。結果: {result}")
    return result

def my_callback(result):
    """タスク完了時に呼び出されるコールバック関数"""
    print(f"** コールバック: タスクが正常に完了しました。結果: {result} **")

def my_error_callback(error):
    """エラー発生時に呼び出されるコールバック関数"""
    print(f"** エラーコールバック: エラーが発生しました: {error} **")

if __name__ == '__main__':
    print("メインプロセス開始")

    with multiprocessing.Pool(processes=3) as pool:
        # 非同期でタスクを投入
        # result1_async は AsyncResult オブジェクト
        result1_async = pool.apply_async(my_task, args=(5,), callback=my_callback, error_callback=my_error_callback)
        result2_async = pool.apply_async(my_task, args=(10,), callback=my_callback, error_callback=my_error_callback)
        result3_async = pool.apply_async(my_task, args=(20,), callback=my_callback, error_callback=my_error_callback)

        print("メインプロセスは他の処理を継続中...")
        time.sleep(1) # メインプロセスで何か別の処理をする

        # 結果を待機して取得
        # .get() は結果が利用可能になるまでブロックする
        try:
            final_result1 = result1_async.get()
            final_result2 = result2_async.get()
            final_result3 = result3_async.get()
            print(f"最終結果: {final_result1}, {final_result2}, {final_result3}")
        except Exception as e:
            print(f"結果取得中にエラー: {e}")

    print("メインプロセス終了")

map(): 複数の要素への関数適用(同期)

map()メソッドは、Pythonの組み込み関数map()に似ていますが、複数のワーカープロセスで並列に実行されます。同じ関数を異なる入力データのリストに適用する場合に非常に効率的です。

特徴

  • 効率的なデータ分割: chunksize引数を指定することで、大きなイテラブルをワーカープロセスに効率的に分散できます。
  • 結果の順序保証: 入力リストの順序と、対応する結果の順序は保証されます。
  • ブロッキング: apply()と同様に、すべての入力データに対する関数の実行が完了し、結果がすべて揃うまでメインプロセスはブロックされます。

使用例

import multiprocessing
import time
import os

def square_and_sum(number):
    """数値の二乗と、それに10を足した値を返す"""
    process_id = os.getpid()
    print(f"プロセスID: {process_id} - {number} を処理中...")
    time.sleep(0.5)
    return number * number + 10

if __name__ == '__main__':
    print("メインプロセス開始")

    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    with multiprocessing.Pool(processes=4) as pool:
        print("\n--- map() を使って複数のタスクを同期的に実行 ---")
        # map() は全てのタスクが完了するまでブロックします
        results = pool.map(square_and_sum, numbers)
        print(f"全てのタスクが完了しました。結果: {results}")

    print("メインプロセス終了")

解説: map()は、numbersリストの各要素に対してsquare_and_sum関数を並列に適用し、すべての結果が計算されるまで待機します。結果は、入力と同じ順序でリストとして返されます。

map_async(): 複数の要素への関数適用(非同期)

map_async()map()の非同期版です。タスクを投入するとすぐに制御を返し、結果はAsyncResultオブジェクトを通じて後で取得します。

特徴

  • 結果の順序保証: map()と同様に、結果の順序は保証されます。
  • コールバック関数の指定: apply_async()と同様に、callbackerror_callbackを指定できます。
  • AsyncResultオブジェクトを返す: 結果を取得するには、返されたAsyncResultオブジェクトのget()メソッドを呼び出します。
  • 非ブロッキング: apply_async()と同様に、タスクを投入した後もメインプロセスは継続して実行できます。

使用例

import multiprocessing
import time
import os

def cube(number):
    """数値の3乗を計算する"""
    process_id = os.getpid()
    print(f"プロセスID: {process_id} - {number} の3乗を計算中...")
    time.sleep(1)
    return number ** 3

def all_done_callback(results_list):
    """map_async の全てのタスク完了時に呼び出されるコールバック"""
    print(f"\n** 全てのタスクが非同期で完了しました!最終結果: {results_list} **")

if __name__ == '__main__':
    print("メインプロセス開始")

    numbers = [1, 2, 3, 4, 5, 6]

    with multiprocessing.Pool(processes=3) as pool:
        print("\n--- map_async() を使って複数のタスクを非同期に実行 ---")
        # map_async() はすぐに AsyncResult オブジェクトを返す
        async_result = pool.map_async(cube, numbers, callback=all_done_callback)

        print("メインプロセスは、map_async が進行中に他の処理を継続中...")
        time.sleep(2) # メインプロセスで別の処理を行う

        # 必要であれば、結果が完了するまで待機して取得
        # .get() は結果が利用可能になるまでブロックします
        final_cubes = async_result.get()
        print(f"メインプロセスで取得した結果: {final_cubes}")

    print("メインプロセス終了")

解説: map_async()async_resultを即座に返し、メインプロセスはスリープなどの他の処理を継続できます。全てのcube関数の実行が完了すると、all_done_callbackが呼び出され、その後async_result.get()で結果を取得できます。

starmap() / starmap_async(): 複数引数のタスク適用

map()が単一引数の関数にしか使えないのに対し、starmap()は各入力要素が関数の引数のタプルである場合に非常に便利です。これにより、複数引数を持つ関数を直接適用できます。

特徴

  • *argsのようにアンパックして関数に渡される。
  • map()/map_async()と同様だが、入力イテラブルの各要素が関数の引数のタプルとして扱われる。

使用例

import multiprocessing
import time
import os

def calculate_product_and_sum(x, y, z):
    """3つの数値の積と和を計算する"""
    process_id = os.getpid()
    print(f"プロセスID: {process_id} - ({x}, {y}, {z}) を処理中...")
    time.sleep(0.5)
    return (x * y * z, x + y + z)

if __name__ == '__main__':
    print("メインプロセス開始")

    # 各要素が関数の引数となるタプルになっているリスト
    data_for_starmap = [(1, 2, 3), (4, 5, 6), (7, 8, 9)]

    with multiprocessing.Pool(processes=2) as pool:
        print("\n--- starmap() を使って複数のタスクを同期的に実行 ---")
        # 各タプルが calculate_product_and_sum の引数としてアンパックされる
        results_starmap = pool.starmap(calculate_product_and_sum, data_for_starmap)
        print(f"starmap 結果: {results_starmap}")

        print("\n--- starmap_async() を使って複数のタスクを非同期に実行 ---")
        async_results_starmap = pool.starmap_async(calculate_product_and_sum, data_for_starmap)
        
        print("メインプロセスは starmap_async が進行中に他の処理を継続中...")
        time.sleep(1) # ここで何か他の処理

        final_starmap_results = async_results_starmap.get()
        print(f"starmap_async 結果: {final_starmap_results}")

    print("メインプロセス終了")

解説: starmap()およびstarmap_async()は、data_for_starmapの各タプルをcalculate_product_and_sum(x, y, z)x, y, zにそれぞれマッピングして実行します。

  • map() / map_async() / starmap() / starmap_async():

    • 同じ関数を多数の異なる入力データに適用したい場合(データ並列処理)。
    • apply()をループ内で何度も呼び出すよりも、これらのメソッドを使った方が通常は効率的です。
    • 特に大量のデータがある場合、chunksize引数を使ってタスクの配布を最適化できます。
    • map() / starmap() は同期処理、map_async() / starmap_async() は非同期処理です。
  • apply_async():

    • 非同期的に単一のタスクを1つのワーカープロセスで実行したい場合。
    • タスクの完了を待たずに、メインプロセスで他の処理を続けたい場合。
    • 結果の取得を後回しにしたり、コールバックを使って柔軟に処理したい場合に適しています。
  • apply():

    • 同期的に単一のタスクを1つのワーカープロセスで実行したい場合。
    • 前のタスクが完了するまで次の処理に進まないで良い場合。
    • 非常に単純なデバッグや、処理の流れを厳密に制御したい場合に適しています。