PyTorchで複数GPU・複数マシンを使って効率的にニューラルネットワークを訓練する方法とは? DDP徹底解説


DDP の仕組み

DDPは、モデルを複数のプロセスに分割し、各プロセスでモデルの一部をトレーニングします。各プロセスは、割り当てられたバッチのデータに対してモデルを更新し、その後、すべてのプロセスでモデルの勾配を同期します。最終的に、モデルの更新はすべてのプロセスで平均化され、すべてのプロセスに適用されます。

DDP のメリット

  • スケーラビリティ
    DDPは、追加のGPUやマシンを追加することで、簡単にスケールアップすることができます。
  • 大規模なモデルのトレーニング
    DDPを使用すると、単一のGPUではメモリに収まらないような大規模なモデルのトレーニングが可能になります。
  • 高速なトレーニング
    DDPは、複数のGPUやマシンでトレーニングを分散させることで、トレーニング時間を大幅に短縮することができます。

DDP の使用方法

DDPを使用するには、以下の手順が必要です。

  1. 分散環境を初期化する
    torch.distributed モジュールを使用して、分散環境を初期化します。
  2. モデルをラップする
    torch.nn.parallel.DistributedDataParallel を使用して、モデルをラップします。
  3. データローダーをラップする
    torch.utils.data.DistributedSampler を使用して、データローダーをラップします。
  4. トレーニングを実行する
    通常のPyTorchトレーニングコードを実行します。

DDP の例

import torch
import torch.nn as nn
import torch.distributed as dist
import torch.utils.data as data

# 分散環境を初期化する
dist.init_process_group(backend='nccl')

# モデルを定義する
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear1 = nn.Linear(10, 20)
        self.linear2 = nn.Linear(20, 1)

# モデルをラップする
model = MyModel().cuda()
model = DistributedDataParallel(model)

# データローダーをラップする
train_dataset = MyDataset()
train_sampler = DistributedSampler(train_dataset)
train_loader = data.DataLoader(train_dataset, batch_size=32, sampler=train_sampler)

# トレーニングを実行する
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
for epoch in range(10):
    for i, (data, target) in enumerate(train_loader):
        data = data.cuda()
        target = target.cuda()

        output = model(data)
        loss = nn.MSELoss()(output, target)
        loss.backward()
        optimizer.step()

        if i % 100 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, 10, i + 1, len(train_loader), loss.item()))


import torch
import torch.nn as nn
import torch.distributed as dist
import torch.utils.data as data
import torchvision
import argparse

# 引数を設定する
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', type=int, default=0)
args = parser.parse_args()


# 分散環境を初期化する
dist.init_process_group(backend='nccl', rank=args.local_rank)


# データセットを準備する
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True)
train_sampler = DistributedSampler(train_dataset)
train_loader = data.DataLoader(train_dataset, batch_size=32, sampler=train_sampler)

test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True)
test_loader = data.DataLoader(test_dataset, batch_size=32)


# モデルを定義する
class LeNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool1 = nn.MaxPool2d(2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.pool2 = nn.MaxPool2d(2)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 10)

    def forward(self, x):
        x = self.pool1(self.conv1(x))
        x = self.pool2(self.conv2(x))
        x = x.view(-1, 16 * 4 * 4)
        x = self.fc1(x)
        x = self.fc2(x)
        return x


# モデルをラップする
model = LeNet().cuda()
model = DistributedDataParallel(model)


# 損失関数と最適化アルゴリズムを定義する
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)


# トレーニングを実行する
def train(epoch):
    model.train()
    for i, (images, labels) in enumerate(train_loader):
        images = images.cuda()
        labels = labels.cuda()

        output = model(images)
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()

        if i % 100 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, 10, i + 1, len(train_loader), loss.item()))


# テストを実行する
def test():
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in test_loader:
            images = images.cuda()
            labels = labels.cuda()

            output = model(images)
            _, predicted = torch.max(output.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print('Accuracy of the network on the 10000 test images: {} %'.format(100 * correct / total))


for epoch in range(10):
    train(epoch)
    test()

このコードは、以下の手順を実行します。

  1. torch.distributed モジュールを使用して、分散環境を初期化します。
  2. torchvision モジュールを使用して、CIFAR-10 データセットを準備します。
  3. LeNet モデルを定義します。
  4. torch.nn.parallel.DistributedDataParallel を使用して、モデルをラップします。
  5. torch.nn.CrossEntropyLosstorch.optim.SGD を使用して、損失関数と最適化アルゴリズムを定義します。
  6. train 関数を定義


torch.nn.parallel.DataParallel

単一マシンで複数のGPUを使用してトレーニングする場合、torch.nn.parallel.DataParallel が DDP の代替手段として有効です。DDP と同様に、モデルを複数の GPU に分散し、トレーニングを並列化します。

しかし、DDP と比較すると、以下の点で劣ります。

  • 通信オーバーヘッド
    DDP は、NCCL などの高速通信ライブラリを使用して、GPU 間の通信を効率化します。一方、DataParallel はより基本的な通信メカニズムを使用するため、通信オーバーヘッドが大きくなる可能性があります。
  • スケーラビリティ
    DDP は、複数のマシンにスケールアップできますが、DataParallel は単一マシンに限定されます。

Horovod

Horovod は、分散ディープラーニングのためのオープンソースライブラリです。Keras、MXNet、PyTorch などの主要な深層学習フレームワークと互換性があり、DDP と同様の機能を提供します。

Horovod の利点は以下の通りです。

  • サポート
    Horovod は、活発なコミュニティと包括的なドキュメントを備えており、問題が発生した場合にサポートを受けやすいです。
  • 柔軟性
    Horovod は、DDP よりも柔軟な API を提供しており、さまざまな分散環境で動作させることができます。

一方、Horovod の欠点は以下の通りです。

  • 互換性
    Horovod は、DDP ほど多くの深層学習フレームワークと互換性がありません。
  • 複雑性
    Horovod は DDP よりも複雑であり、設定と使用に時間がかかる場合があります。

自作の分散トレーニングソリューション

高度な制御と柔軟性を必要とする場合は、torch.distributed モジュールを使用して、独自の分散トレーニングソリューションを構築することもできます。

これは、経験豊富な開発者向けのオプションであり、複雑で時間がかかる可能性があります。

上記以外にも、SimpleML、Ray AI、Microsoft DeepSpeed などの分散トレーニングフレームワークがいくつかあります。

最適な代替手段の選択

最適な代替手段は、特定のニーズと要件によって異なります。

  • 高度な制御と柔軟性 が必要な場合は、独自の分散トレーニングソリューション を構築することを検討してください。
  • 柔軟性とサポート を重視する場合は、Horovod を検討してください。
  • 大規模なモデルのトレーニング複数のマシンにスケールアップ する必要がある場合は、DDP が最良の選択肢です。
  • シンプルで使いやすいソリューション を探している場合は、torch.nn.parallel.DataParallel が良い選択です。

それぞれのオプションを調査し、ニーズに合ったものを選択することが重要です。

torch.nn.parallel.DistributedDataParallel は、PyTorch で分散トレーニングを行うための強力なツールですが、いくつかの代替手段が存在します。