PyTorch Distributed Elasticでジョブを安全に実行:FileTimerClientによるタイムアウト管理


torch.distributed.elastic.timer.FileTimerClient は、PyTorch Distributed Elastic における分散トレーニングにおける ジョブのタイムアウト管理 を行うためのクライアント側クラスです。

このクラスは、ファイルベースのタイマーサーバー と連携し、各ワーカープロセス (ジョブを実行するプロセス) の 実行時間を監視 し、設定された 最大実行時間 を超えると ワーカープロセスを終了 させる役割を担います。

主な機能

  • ファイルベースのタイマーサーバー との通信
  • 最大実行時間を超えた場合の ワーカープロセスの強制終了
  • 設定された 最大実行時間 の監視
  • 各ワーカープロセスにおける 経過時間の計測

利点

  • ジョブの 進行状況を監視 し、問題が発生した場合に迅速な対応を可能にする
  • 長時間実行し続けるワーカープロセスによる リソース枯渇 を防ぐ
  • 分散トレーニングにおけるジョブの 安定性と信頼性 を向上させる

使用方法

torch.distributed.elastic.timer.FileTimerClient を使用する際には、以下の手順に従う必要があります。

  1. タイマーサーバーの起動
    各ホスト上で torch.distributed.elastic.timer.FileTimerServer を起動する必要があります。
  2. クライアントの初期化
    各ワーカープロセス上で torch.distributed.elastic.timer.FileTimerClient を初期化する必要があります。
  3. タイマーの開始
    ジョブの実行前に start() メソッドを呼び出してタイマーを開始する必要があります。
  4. ジョブの実行
    ジョブの実行コードを記述し、start()stop() メソッドの間で実行します。
  5. タイマーの停止
    ジョブの実行完了後に stop() メソッドを呼び出してタイマーを停止する必要があります。

import torch.distributed.elastic as te

# タイマーサーバーの起動 (各ホスト上で実行)
te.timer.FileTimerServer.start(file_path="/tmp/timer.sock", max_interval=60)

# クライアントの初期化 (各ワーカープロセス上で実行)
timer_client = te.timer.FileTimerClient(file_path="/tmp/timer.sock")

# タイマーの開始
timer_client.start()

# ジョブの実行
# ...

# タイマーの停止
timer_client.stop()
  • ワーカープロセスが強制終了されると、ジョブは失敗 します。
  • 設定された最大実行時間は、すべてのワーカープロセスに適用 されます。
  • タイマーサーバーは、各ホスト上で個別に起動 する必要があります。
  • torch.distributed.elastic.timer.FileTimerClient は、単一ノードジョブマルチノードジョブ の両方で利用できます。
  • 代替手段として、torch.distributed.elastic.heartbeat モジュールを使用してワーカープロセスの健全性を監視することもできます。
  • torch.distributed.elastic.timer.FileTimerClient は、比較的新しい機能であり、まだ発展途上です。


タイマーサーバーの起動

import torch.distributed.elastic as te

# タイマーサーバーの起動
te.timer.FileTimerServer.start(file_path="/tmp/timer.sock", max_interval=60)

クライアントの初期化とタイマーの開始

import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.distributed.elastic as te

# クライアントの初期化
timer_client = te.timer.FileTimerClient(file_path="/tmp/timer.sock")

# タイマーの開始
timer_client.start()

# データセットとモデルの定義
train_dataset = torchvision.datasets.MNIST(root="~/data", train=True, download=True, transform=torchvision.transforms.ToTensor())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)

model = nn.Sequential(
    nn.Linear(784, 128),
    nn.ReLU(),
    nn.Linear(128, 10),
)

# 損失関数と最適化アルゴリズムの定義
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

# ジョブの実行
for epoch in range(10):
    for i, (images, labels) in enumerate(train_loader):
        # 画像とラベルをデバイスに転送
        images = images.to(device)
        labels = labels.to(device)

        # 予測と損失計算
        outputs = model(images)
        loss = criterion(outputs, labels)

        # 勾配の計算とパラメータの更新
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if i % 100 == 0:
            print(f"Epoch {epoch + 1}/{10}, Step {i + 1}/{len(train_loader)}, Loss: {loss.item():.4f}")

# タイマーの停止
timer_client.stop()

実行

上記のコードを実行するには、以下のコマンドを実行します。

python example.py

説明

  1. 最初の部分では、タイマーサーバーを起動します。タイマーサーバーは、各ワーカープロセスの実行時間を監視し、設定された最大実行時間を超えるとワーカープロセスを終了します。
  2. 次の部分では、クライアントを初期化し、タイマーを開始します。クライアントは、タイマーサーバーと通信し、ワーカープロセスの経過時間を報告します。
  3. その後、データセット、モデル、損失関数、最適化アルゴリズムを定義します。
  4. 最後に、ジョブを実行します。ジョブは、10 エポックにわたってトレーニングループを実行します。トレーニングループの各イテレーションで、モデルはバッチの画像に対して予測を行い、損失を計算し、勾配を計算してパラメータを更新します。
  5. ジョブが完了したら、タイマーを停止します。
  • ワーカープロセスが強制終了されると、ジョブは失敗します。
  • 設定された最大実行時間は、すべてのワーカープロセスに適用されます。


torch.distributed.elastic.heartbeat モジュール

torch.distributed.elastic.heartbeat モジュールは、ワーカープロセスの健全性を監視するための機能を提供します。このモジュールを使用して、ワーカープロセスが定期的にハートビートを送信し、他のワーカープロセスが生存していることを確認することができます。ハートビートが送信されなくなったら、ワーカープロセスが失敗したとみなされ、ジョブは停止されます。

torch.distributed.elastic.heartbeat モジュールを使用する利点は以下の通りです。

  • ワーカープロセスの健全性だけでなく、ネットワーク接続の状態も監視できる
  • ファイルベースのタイマーサーバーを使用する必要がない
  • torch.distributed.elastic.timer.FileTimerClient よりも軽量でシンプル

torch.distributed.elastic.heartbeat モジュールの欠点は以下の通りです。

  • ワーカープロセスが長時間実行し続ける場合、リソース枯渇が発生する可能性がある
  • torch.distributed.elastic.timer.FileTimerClient のように、ワーカープロセスを強制終了する機能はない

カスタムタイマー実装

torch.distributed.elastic.timer.FileTimerClient 以外にも、カスタムのタイマー実装を作成することもできます。これにより、独自のタイムアウト管理ロジックを実装し、特定のニーズに合わせた機能を追加することができます。

カスタムタイマーを実装する利点は以下の通りです。

  • 特定のニーズに合わせた機能を追加できる
  • 完全な制御と柔軟性
  • テストとデバッグがより困難
  • torch.distributed.elastic.timer.FileTimerClient よりも複雑で時間のかかる

シグナル処理

シグナル処理を使用して、ワーカープロセスを強制終了することもできます。これは、ワーカープロセスが長時間実行し続ける場合や、メモリ不足などの問題が発生した場合に役立ちます。

シグナル処理を使用する利点は以下の通りです。

  • 他のライブラリやツールとの連携が容易
  • シンプルで使いやすい
  • ワーカープロセスの状態を監視する機能はない
  • プラットフォームによって動作が異なる