分散訓練のトラブルシューティングにも役立つ! PyTorch Distributed Elasticの_assign_worker_ranks() 関数詳細解説


「torch.distributed.elastic.agent.server.SimpleElasticAgent._assign_worker_ranks()」は、PyTorch Distributed Elasticの「SimpleElasticAgent」クラスにおける重要な関数の一つです。この関数は、分散訓練ジョブにおいて、ワーカープロセスに適切なランクを割り当てる役割を担っています。

役割

分散訓練ジョブを実行する場合、複数のワーカープロセスが協調して学習を行います。各ワーカープロセスは、異なる部分のデータを処理し、互いに通信しながらモデルを更新していきます。適切な訓練を進めるためには、各ワーカープロセスが自身の役割を認識し、連携を取ることが重要です。

「_assign_worker_ranks()」関数は、このワーカープロセスの役割を決定する上で重要な役割を果たします。具体的には、以下の処理を行います。

  1. ワーカー情報の取得
    ジョブ構成情報から、起動するワーカープロセスの数と、各ワーカープロセスが使用するGPUデバイスの情報を取得します。
  2. ランクの割り当て
    取得した情報に基づいて、各ワーカープロセスに0から始まるランクを割り当てます。ランクは、ワーカープロセスの順序を表す番号であり、通信やデータの分割において重要な役割を果たします。
  3. ランク情報の格納
    割り当てられたランク情報は、各ワーカープロセスの環境変数に格納されます。ワーカープロセスは、この環境変数を通じて自身のランク情報にアクセスすることができます。

コード解説

def _assign_worker_ranks(self):
    world_size = len(self.worker_specs)
    local_world_size = len(self._local_worker_specs)

    if local_world_size == 0:
        return

    for i, worker_spec in enumerate(self._local_worker_specs):
        local_rank = i
        global_rank = local_rank + self._global_rank_offset

        worker_spec.rank = global_rank
        os.environ["RANK"] = str(global_rank)
        os.environ["LOCAL_RANK"] = str(local_rank)
        os.environ["WORLD_SIZE"] = str(world_size)
        os.environ["LOCAL_WORLD_SIZE"] = str(local_world_size)

上記のコードは、「_assign_worker_ranks()」関数の主要な部分です。コードを順を追って解説していきます。

    • world_size: 起動するワーカープロセスの総数
    • local_world_size: ローカルノード上で起動するワーカープロセスの数
  1. ローカルワーカー情報の確認

    • local_world_sizeが0の場合、処理をスキップします。これは、ローカルノード上でワーカープロセスが起動しないことを意味します。
  2. 各ワーカープロセスへのランク割り当て

    • i: ワーカープロセスのインデックス
    • local_rank: ローカルノード上でのワーカープロセスのインデックス
    • global_rank: グローバルなワーカープロセスインデックス (local_rank + _global_rank_offset)
  3. ランク情報の格納

    • 割り当てられたランク情報は、各ワーカープロセスのworker_spec属性と、環境変数に格納されます。
      • worker_spec.rank: グローバルランク
      • RANK: グローバルランク
      • LOCAL_RANK: ローカルランク
      • WORLD_SIZE: ワーカープロセスの総数
      • LOCAL_WORLD_SIZE: ローカルノード上でのワーカープロセスの数

「torch.distributed.elastic.agent.server.SimpleElasticAgent._assign_worker_ranks()」関数は、分散訓練ジョブにおいてワーカープロセスに適切なランクを割り当てる重要な役割を担っています。この関数は、各ワーカープロセスの役割を決定し、連携した訓練を可能にする基盤となります。



import os

def _assign_worker_ranks(worker_specs, global_rank_offset=0):
    world_size = len(worker_specs)

    for i, worker_spec in enumerate(worker_specs):
        local_rank = i
        global_rank = local_rank + global_rank_offset

        worker_spec.rank = global_rank
        os.environ["RANK"] = str(global_rank)
        os.environ["LOCAL_RANK"] = str(local_rank)
        os.environ["WORLD_SIZE"] = str(world_size)
        os.environ["LOCAL_WORLD_SIZE"] = str(1)  # ローカルノード上のワーカープロセス数は1と仮定

# サンプルワーカー情報
worker_specs = [
    WorkerSpec(role="worker", local_rank=0, port=12345),
    WorkerSpec(role="worker", local_rank=1, port=12346),
]

# ランク割り当てを実行
_assign_worker_ranks(worker_specs)

# 割り当てられたランク情報の確認
for worker_spec in worker_specs:
    print(f"worker_spec.rank: {worker_spec.rank}")
    print(f"os.environ['RANK']: {os.environ['RANK']}")
    print(f"os.environ['LOCAL_RANK']: {os.environ['LOCAL_RANK']}")
    print(f"os.environ['WORLD_SIZE']: {os.environ['WORLD_SIZE']}")
    print(f"os.environ['LOCAL_WORLD_SIZE']: {os.environ['LOCAL_WORLD_SIZE']}")
    print("------------------")

コードの説明

    • os: ランク情報などを格納する環境変数へのアクセスに使用
  1. _assign_worker_ranks()関数の定義

    • 引数:
      • worker_specs: ワーカー情報を含むリスト
      • global_rank_offset: グローバルランクのオフセット値 (デフォルト: 0)
    • 処理内容:
      • world_size: 起動するワーカープロセスの総数
      • 各ワーカープロセスに対して:
        • local_rank: ローカルノード上でのワーカープロセスのインデックス
        • global_rank: グローバルなワーカープロセスインデックス (local_rank + global_rank_offset)
        • 割り当てられたランク情報を、ワーカー情報と環境変数に格納
  2. サンプルワーカー情報の作成

    • worker_specsリストには、2つのワーカー情報が含まれています。
      • role: ワーカーの役割 ("worker" を指定)
      • local_rank: ローカルノード上でのワーカープロセスのインデックス
      • port: ワーカープロセスが使用するポート番号
  3. ランク割り当ての実行

    • _assign_worker_ranks()関数を実行し、サンプルワーカー情報にランクを割り当て
  4. 割り当てられたランク情報の確認

    • 各ワーカー情報と、環境変数に格納されたランク情報を出力

実行例

worker_spec.rank: 0
os.environ['RANK']: 0
os.environ['LOCAL_RANK']: 0
os.environ['WORLD_SIZE']: 2
os.environ['LOCAL_WORLD_SIZE']: 1
------------------
worker_spec.rank: 1
os.environ['RANK']: 1
os.environ['LOCAL_RANK']: 1
os.environ['WORLD_SIZE']: 2
os.environ['LOCAL_WORLD_SIZE']: 1
------------------


カスタムエージェントを実装する

最も柔軟性の高い方法は、ElasticAgentクラスを継承して独自のカスタムエージェントを実装することです。この方法では、_assign_worker_ranks()関数を独自に実装し、ワーカーのランク割り当てロジックを完全に制御することができます。

利点

  • 特殊な要件への対応: 特殊なネットワーク構成やワーカー配置要件にも対応できます。
  • 完全な柔軟性: ランク割り当てロジックを自由に設計できます。

欠点

  • テストとデバッグ: 実装したロジックのテストとデバッグを自分で行う必要があります。
  • 開発コスト: カスタムエージェントの開発には、追加の労力と時間がかかります。

rankオプションを使用する

SimpleElasticAgentを使用する場合、rankオプションを使用して、ワーカーごとに個別にランクを指定することができます。このオプションを使用すると、_assign_worker_ranks()関数はスキップされ、指定されたランクが各ワーカーに割り当てられます。

利点

  • コード変更の必要なし: SimpleElasticAgentを使用している場合は、コードを変更する必要はありません。
  • シンプルさ: ランク割り当てを個別に設定するだけで済みます。

欠点

  • 複雑な構成への不適: 複雑なネットワーク構成やワーカー配置要件には対応できません。
  • 柔軟性の低さ: ランク割り当てロジックを完全に制御することはできません。

ランク情報を手動で設定する

ワーカープロセス起動前に、各ワーカープロセスの環境変数にRANKとLOCAL_RANKを設定する方法です。この方法は、最もシンプルな方法ですが、スケーラビリティや柔軟性に欠けます。

利点

  • 最もシンプル: コード変更が不要で、設定も簡単です。

欠点

  • エラー発生時の問題: 手動で設定した場合、エラー発生時に問題が発生する可能性があります。
  • 柔軟性の低さ: ランク割り当てロジックを制御することはできません。
  • スケーラビリティの低さ: 大規模なジョブや複雑な構成には向いていません。

代替方法の選択

適切な代替方法は、具体的な要件と状況によって異なります。

  • 非常にシンプルなジョブを実行する場合
    ランク情報を手動で設定する
  • シンプルで使いやすい方法が必要な場合
    rankオプションを使用する
  • 柔軟性と制御が必要な場合
    カスタムエージェントを実装する

「torch.distributed.elastic.agent.server.SimpleElasticAgent._assign_worker_ranks()」には、代替方法として以下の選択肢があります。

  • ランク情報を手動で設定する
  • rankオプションを使用する
  • カスタムエージェントを実装する