PyTorch 分散チェックポイント:効率的なチェックポイント読み込みを実現する LoadPlanner.set_up_planner()
詳細な説明
- 戻り値
LoadPlan
オブジェクトを返します。このオブジェクトには、各ランクにおける読み込みタスクに関する情報が含まれています。
- 処理内容
- チェックポイントファイルのメタデータを読み込み、チェックポイントの内容を分析します。
- 各ランクが読み込むべきデータ量と、読み込み順序を決定します。
- 分散通信プロトコルに基づいて、各ランク間のデータ転送計画を立てます。
- 役割
- 各ランクにおけるチェックポイントファイルの読み込み方法を決定します。
- 分散環境における効率的な読み込みを実現するために、複数のノード間でデータを分割して読み込みます。
- 呼び出しタイミング
torch.distributed.checkpoint.load_state_dict()
関数を実行する前に呼び出されます。
import torch.distributed as dist
def load_checkpoint(checkpoint_path):
# 分散チェックポイントマネージャーを初期化する
dist.init_process_group()
# LoadPlanner を作成する
planner = LoadPlanner()
# チェックポイントの読み込みを開始する
planner.set_up_planner(checkpoint_path)
# 各ランクにおける読み込みタスクを実行する
planner.load()
# 分散チェックポイントマネージャーを終了する
dist.destroy_process_group()
import torch
import torch.distributed as dist
import torch.distributed.checkpoint as dcp
import torch.nn as nn
import torch.optim as optim
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
# Define a toy model
class ToyModel(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(10, 1)
# Wrap the model with FSDP
model = ToyModel()
fsdp = FSDP(model)
# Train the model
optimizer = optim.SGD(fsdp.optimizer_params(), lr=0.01)
for _ in range(10):
# Generate some dummy input data
x = torch.randn(10)
# Forward pass
y = model(x)
# Compute loss
loss = y.sum()
# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Save the checkpoint
checkpoint_path = "checkpoint.pth"
dcp.save_state_dict(fsdp.model, checkpoint_path)
# Load the checkpoint on multiple processes
world_size = 4
dist.init_process_group(backend="gloo", world_size=world_size)
# Create a LoadPlanner on each process
planners = []
for rank in range(world_size):
planner = dcp.LoadPlanner()
planners.append(planner)
# Set up the LoadPlanner on each process
for rank, planner in enumerate(planners):
planner.set_up_planner(checkpoint_path)
# Load the checkpoint on each process
for rank, planner in enumerate(planners):
planner.load()
# Verify that the model is loaded on each process
for rank, planner in enumerate(planners):
loaded_model = planner.loaded_state_dict["model"]
assert torch.allclose(model.state_dict(), loaded_model.state_dict())
This code first defines a toy model, wraps it with FSDP, trains it, and saves the checkpoint. Then, it initializes a distributed process group and creates a LoadPlanner on each process. The LoadPlanner is then set up to load the checkpoint, and the checkpoint is loaded on each process. Finally, the code verifies that the model is loaded on each process.
- The
loaded_state_dict
attribute of the LoadPlanner contains the loaded state_dict. - The
load()
method is used to load the checkpoint. - The
set_up_planner()
method is used to initialize the LoadPlanner with the checkpoint path. - The
LoadPlanner
is created on each process.
代替方法
torch.distributed.checkpoint.LoadPlanner.set_up_planner()
の代替方法として、以下の方法が考えられます。
手動で読み込み計画を作成する
torch.distributed.checkpoint.load_state_dict()
関数には、map_location
引数があります。この引数を使用して、各ランクが読み込むべきデータ量と、読み込み順序を手動で指定することができます。
import torch
import torch.distributed as dist
import torch.distributed.checkpoint as dcp
def load_checkpoint(checkpoint_path):
# 分散チェックポイントマネージャーを初期化する
dist.init_process_group()
# チェックポイントファイルを読み込む
state_dict = dcp.load_state_dict(
checkpoint_path, map_location=lambda storage, loc: storage.cuda(loc)
)
# 各ランクにおけるデータ量と読み込み順序を**手動で**指定する
for rank, world_size in enumerate(dist.get_world_size()):
# 読み込むべきデータ量を計算する
data_per_rank = len(state_dict) // world_size
start_index = rank * data_per_rank
end_index = min((rank + 1) * data_per_rank, len(state_dict))
# 読み込み順序を**手動で**指定する
for key, value in state_dict.items():
if start_index <= key < end_index:
# 各ランクが読み込むべきデータにアクセスする
state_dict[key] = value
# 分散チェックポイントマネージャーを終了する
dist.destroy_process_group()
カスタムの読み込みロジックを実装する
torch.distributed.checkpoint.load_state_dict()
関数よりも柔軟な読み込みロジックが必要な場合は、カスタムの読み込みロジックを実装することができます。
import torch
import torch.distributed as dist
def load_checkpoint(checkpoint_path):
# 分散チェックポイントマネージャーを初期化する
dist.init_process_group()
# チェックポイントファイルを開く
with open(checkpoint_path, "rb") as f:
# チェックポイントデータを読み込む
checkpoint_data = f.read()
# 各ランクが読み込むべきデータ量と、読み込み順序を**カスタムのロジックで**決定する
for rank, world_size in enumerate(dist.get_world_size()):
# 読み込むべきデータ量を計算する
data_per_rank = len(checkpoint_data) // world_size
start_index = rank * data_per_rank
end_index = min((rank + 1) * data_per_rank, len(checkpoint_data))
# 読み込むべきデータを**カスタムのロジックで**抽出する
rank_data = checkpoint_data[start_index:end_index]
# 各ランクが読み込むべきデータを**カスタムのロジックで**処理する
# ...
# 分散チェックポイントマネージャーを終了する
dist.destroy_process_group()
サードパーティのライブラリを使用する
PyTorch の分散チェックポイント機能以外にも、分散チェックポイントを扱うためのサードパーティのライブラリがいくつか存在します。これらのライブラリは、より柔軟な機能や、より高いパフォーマンスを提供する場合があります。
方法 | メリット | デメリット |
---|---|---|
手動で読み込み計画を作成する | 柔軟性が高い | 複雑で、エラーが発生しやすい |
カスタムの読み込みロジックを実装する | 非常に柔軟性が高い | 非常に複雑で、時間と労力が必要 |
サードパーティのライブラリを使用する | 比較的簡単に使用できる | ライブラリの機能に制限される |