PyTorch Distributed Elasticでジョブを安全に実行:FileTimerClientによるタイムアウト管理
torch.distributed.elastic.timer.FileTimerClient
は、PyTorch Distributed Elastic における分散トレーニングにおける ジョブのタイムアウト管理 を行うためのクライアント側クラスです。
このクラスは、ファイルベースのタイマーサーバー と連携し、各ワーカープロセス (ジョブを実行するプロセス) の 実行時間を監視 し、設定された 最大実行時間 を超えると ワーカープロセスを終了 させる役割を担います。
主な機能
- ファイルベースのタイマーサーバー との通信
- 最大実行時間を超えた場合の ワーカープロセスの強制終了
- 設定された 最大実行時間 の監視
- 各ワーカープロセスにおける 経過時間の計測
利点
- ジョブの 進行状況を監視 し、問題が発生した場合に迅速な対応を可能にする
- 長時間実行し続けるワーカープロセスによる リソース枯渇 を防ぐ
- 分散トレーニングにおけるジョブの 安定性と信頼性 を向上させる
使用方法
torch.distributed.elastic.timer.FileTimerClient
を使用する際には、以下の手順に従う必要があります。
- タイマーサーバーの起動
各ホスト上でtorch.distributed.elastic.timer.FileTimerServer
を起動する必要があります。 - クライアントの初期化
各ワーカープロセス上でtorch.distributed.elastic.timer.FileTimerClient
を初期化する必要があります。 - タイマーの開始
ジョブの実行前にstart()
メソッドを呼び出してタイマーを開始する必要があります。 - ジョブの実行
ジョブの実行コードを記述し、start()
とstop()
メソッドの間で実行します。 - タイマーの停止
ジョブの実行完了後にstop()
メソッドを呼び出してタイマーを停止する必要があります。
例
import torch.distributed.elastic as te
# タイマーサーバーの起動 (各ホスト上で実行)
te.timer.FileTimerServer.start(file_path="/tmp/timer.sock", max_interval=60)
# クライアントの初期化 (各ワーカープロセス上で実行)
timer_client = te.timer.FileTimerClient(file_path="/tmp/timer.sock")
# タイマーの開始
timer_client.start()
# ジョブの実行
# ...
# タイマーの停止
timer_client.stop()
- ワーカープロセスが強制終了されると、ジョブは失敗 します。
- 設定された最大実行時間は、すべてのワーカープロセスに適用 されます。
- タイマーサーバーは、各ホスト上で個別に起動 する必要があります。
torch.distributed.elastic.timer.FileTimerClient
は、単一ノードジョブ と マルチノードジョブ の両方で利用できます。
- 代替手段として、
torch.distributed.elastic.heartbeat
モジュールを使用してワーカープロセスの健全性を監視することもできます。 torch.distributed.elastic.timer.FileTimerClient
は、比較的新しい機能であり、まだ発展途上です。
タイマーサーバーの起動
import torch.distributed.elastic as te
# タイマーサーバーの起動
te.timer.FileTimerServer.start(file_path="/tmp/timer.sock", max_interval=60)
クライアントの初期化とタイマーの開始
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.distributed.elastic as te
# クライアントの初期化
timer_client = te.timer.FileTimerClient(file_path="/tmp/timer.sock")
# タイマーの開始
timer_client.start()
# データセットとモデルの定義
train_dataset = torchvision.datasets.MNIST(root="~/data", train=True, download=True, transform=torchvision.transforms.ToTensor())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
model = nn.Sequential(
nn.Linear(784, 128),
nn.ReLU(),
nn.Linear(128, 10),
)
# 損失関数と最適化アルゴリズムの定義
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
# ジョブの実行
for epoch in range(10):
for i, (images, labels) in enumerate(train_loader):
# 画像とラベルをデバイスに転送
images = images.to(device)
labels = labels.to(device)
# 予測と損失計算
outputs = model(images)
loss = criterion(outputs, labels)
# 勾配の計算とパラメータの更新
optimizer.zero_grad()
loss.backward()
optimizer.step()
if i % 100 == 0:
print(f"Epoch {epoch + 1}/{10}, Step {i + 1}/{len(train_loader)}, Loss: {loss.item():.4f}")
# タイマーの停止
timer_client.stop()
実行
上記のコードを実行するには、以下のコマンドを実行します。
python example.py
説明
- 最初の部分では、タイマーサーバーを起動します。タイマーサーバーは、各ワーカープロセスの実行時間を監視し、設定された最大実行時間を超えるとワーカープロセスを終了します。
- 次の部分では、クライアントを初期化し、タイマーを開始します。クライアントは、タイマーサーバーと通信し、ワーカープロセスの経過時間を報告します。
- その後、データセット、モデル、損失関数、最適化アルゴリズムを定義します。
- 最後に、ジョブを実行します。ジョブは、10 エポックにわたってトレーニングループを実行します。トレーニングループの各イテレーションで、モデルはバッチの画像に対して予測を行い、損失を計算し、勾配を計算してパラメータを更新します。
- ジョブが完了したら、タイマーを停止します。
- ワーカープロセスが強制終了されると、ジョブは失敗します。
- 設定された最大実行時間は、すべてのワーカープロセスに適用されます。
torch.distributed.elastic.heartbeat モジュール
torch.distributed.elastic.heartbeat
モジュールは、ワーカープロセスの健全性を監視するための機能を提供します。このモジュールを使用して、ワーカープロセスが定期的にハートビートを送信し、他のワーカープロセスが生存していることを確認することができます。ハートビートが送信されなくなったら、ワーカープロセスが失敗したとみなされ、ジョブは停止されます。
torch.distributed.elastic.heartbeat
モジュールを使用する利点は以下の通りです。
- ワーカープロセスの健全性だけでなく、ネットワーク接続の状態も監視できる
- ファイルベースのタイマーサーバーを使用する必要がない
torch.distributed.elastic.timer.FileTimerClient
よりも軽量でシンプル
torch.distributed.elastic.heartbeat
モジュールの欠点は以下の通りです。
- ワーカープロセスが長時間実行し続ける場合、リソース枯渇が発生する可能性がある
torch.distributed.elastic.timer.FileTimerClient
のように、ワーカープロセスを強制終了する機能はない
カスタムタイマー実装
torch.distributed.elastic.timer.FileTimerClient
以外にも、カスタムのタイマー実装を作成することもできます。これにより、独自のタイムアウト管理ロジックを実装し、特定のニーズに合わせた機能を追加することができます。
カスタムタイマーを実装する利点は以下の通りです。
- 特定のニーズに合わせた機能を追加できる
- 完全な制御と柔軟性
- テストとデバッグがより困難
torch.distributed.elastic.timer.FileTimerClient
よりも複雑で時間のかかる
シグナル処理
シグナル処理を使用して、ワーカープロセスを強制終了することもできます。これは、ワーカープロセスが長時間実行し続ける場合や、メモリ不足などの問題が発生した場合に役立ちます。
シグナル処理を使用する利点は以下の通りです。
- 他のライブラリやツールとの連携が容易
- シンプルで使いやすい
- ワーカープロセスの状態を監視する機能はない
- プラットフォームによって動作が異なる