【PyTorch Distributed Elastic】SubprocessContextで分散トレーニングを理解しよう!


SubprocessContext の役割

  • ライフサイクル管理: SubprocessContext は、ワーカープロセスのライフサイクルを管理し、必要に応じて再起動します。
  • イベント管理: SubprocessContext は、ワーカープロセスの起動、終了、エラーなどのイベントを管理します。
  • ワーカープロセスの管理: ランチャープロセスは、SubprocessContext を使用して、サブプロセスでワーカープロセスを起動し、監視します。

SubprocessContext の使用方法

SubprocessContext は、torch.distributed.elastic.multiprocessing.start_processes 関数を使用して取得できます。この関数は、ワーカープロセスの起動に必要な情報を指定するための引数を受け取ります。

import torch.distributed.elastic.multiprocessing.api as api

def main_worker(rank: int, world_size: int):
    # ワーカープロセスのトレーニングロジックを記述する

context = api.start_processes(
    main_function=main_worker,
    world_size=world_size,
    num_processes_per_machine=1,
    main_process_port=0,
)

# ランチャープロセスのロジックを記述する

上記の例では、start_processes 関数は main_worker 関数を実行するワーカープロセスを 1 つ起動します。main_worker 関数は、rankworld_size 引数を受け取ります。これらの引数は、ワーカープロセスの ID と、全体的なワーカープロセスの数に対応します。

SubprocessContext には、ワーカープロセスの管理と通信に使用できる属性とメソッドがいくつか用意されています。

  • shutdown(): すべてのワーカープロセスをシャットダウンします。
  • wait(): すべてのワーカープロセスが完了するまで待機します。
  • get_tensor(key): 指定されたキーでグローバルストレージからテンソルを取得します。
  • put_tensor(key, tensor): 指定されたキーでテンソルをグローバルストレージに格納します。
  • get_backend(): 使用されている分散バックエンド (Nccl、Gloo など) を取得します。
  • get_master_rank(): マスターワーカープロセスの ID を取得します。
  • get_global_rank(): 全体的なワーカープロセスの中で、現在のワーカープロセスの ID を取得します。
  • get_local_rank(): ランチャープロセスが起動したワーカープロセスの中で、現在のワーカープロセスの ID を取得します。
  • get_world_size(): 全体的なワーカープロセスの数を取得します。
  • get_rank(): ワーカープロセスの ID を取得します。


例 1: 単純な分散トレーニング

この例では、SubprocessContext を使用して、ワーカープロセス間でテンソルを共有する単純な分散トレーニングタスクを実行します。

import torch
import torch.distributed.elastic.multiprocessing.api as api

def main_worker(rank: int, world_size: int):
    # ランダムなテンソルを作成
    tensor = torch.rand(32, 10)

    # グローバルストレージにテンソルを格納
    context.put_tensor('global_tensor', tensor)

    # グローバルストレージからテンソルを取得
    global_tensor = context.get_tensor('global_tensor')

    # 各ワーカープロセスでグローバルテンソルを更新
    global_tensor += rank

    # 更新されたグローバルテンソルをグローバルストレージに格納
    context.put_tensor('global_tensor', global_tensor)

context = api.start_processes(
    main_function=main_worker,
    world_size=4,
    num_processes_per_machine=1,
    main_process_port=0,
)

# ランチャープロセスのロジックを記述する

この例では、各ワーカープロセスはランダムなテンソルを作成し、グローバルストレージに格納します。その後、ワーカープロセスはグローバルストレージからテンソルを取得し、自身のランクで更新します。最後に、更新されたテンソルをグローバルストレージに格納します。

例 2: モデルの分散トレーニング

この例では、SubprocessContext を使用して、モデルの分散トレーニングを実行します。

import torch
import torch.distributed.elastic.multiprocessing.api as api
import torch.nn as nn

class SimpleNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(10, 1)

def main_worker(rank: int, world_size: int):
    # モデルを作成
    model = SimpleNet()

    # モデルをワーカープロセスのデバイスに配置
    model.cuda(rank)

    # データローダーを作成
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=32)

    # 損失関数と最適化アルゴリズムを定義
    criterion = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    # トレーニングループ
    for epoch in range(10):
        for batch, target in dataloader:
            # バッチをワーカープロセスのデバイスに配置
            batch = batch.cuda(rank)
            target = target.cuda(rank)

            # モデルの推論
            output = model(batch)

            # 損失を計算
            loss = criterion(output, target)

            # 勾配を計算
            optimizer.zero_grad()
            loss.backward()

            # 勾配を同期
            context.synchronize()

            # パラメータを更新
            optimizer.step()

context = api.start_processes(
    main_function=main_worker,
    world_size=4,
    num_processes_per_machine=1,
    main_process_port=0,
)

# ランチャープロセスのロジックを記述する

この例では、各ワーカープロセスはモデルを作成し、自身のデバイスに配置します。その後、ワーカープロセスはデータローダーを作成し、トレーニングループに入ります。トレーニングループでは、各ワーカープロセスはバッチを処理し、モデルの推論、損失の計算、勾配の計算を行います。その後、context.synchronize() を使用して勾配を同期し、optimizer.step() を使用してパラメータを更新します。

これらの例は、torch.distributed.elastic.multiprocessing.api.SubprocessContext の基本的な使用方法を示しています。より複雑な分散トレーニングタスクでは、SubprocessContext の他の属性とメソッドを使用することができます。

  • torch.distributed.elastic.multiprocessing.api.SubprocessContext ドキュ


代替方法の選択

SubprocessContext の代替方法を選択する際には、以下の要素を考慮する必要があります。

  • 使いやすさ: 代替方法によっては、SubprocessContext よりも使い方が複雑な場合があります。
  • パフォーマンス: 代替方法によっては、SubprocessContext よりもパフォーマンスが劣る場合があります。
  • 分散バックエンド: 使用する分散バックエンド (Nccl、Gloo など) によって、利用可能な代替方法が異なります。

代替方法の例

以下に、SubprocessContext の代替方法の例をいくつか示します。

  • サードパーティ製のライブラリ: HorovodDeepSpeed などのサードパーティ製のライブラリは、SubprocessContext よりも高機能な分散トレーニング機能を提供する場合があります。
  • torch.distributed.autograd: 分散バックエンドに依存せずに、自動的に勾配を同期することができます。この方法は、シンプルな分散タスクに適しています。
  • torch.distributed.rpc: ランチャープロセスとワーカープロセス間、およびワーカープロセス間で直接 RPC (Remote Procedure Call) を行うことができます。この方法は、SubprocessContext よりも柔軟性が高く、複雑な分散タスクに適しています。

具体的な代替方法

具体的な代替方法は、使用している分散バックエンド、パフォーマンス要件、使いやすさの要件によって異なります。以下に、いくつかのユースケースとそれに適した代替方法を示します。

  • 高パフォーマンスな分散トレーニング: サードパーティ製のライブラリ (Horovod、DeepSpeed など)
  • 柔軟性と制御が必要な分散タスク: torch.distributed.rpc
  • シンプルな分散トレーニング: torch.distributed.autograd

それぞれの代替方法の詳細については、以下のドキュメントを参照してください。