TorchDistributor を使用した分散 PyTorch トレーニング

完了

PyTorch は、TensorFlow などの他のディープ ラーニング フレームワークと同様に、1 台のコンピューター上の複数のプロセッサ (CPU または GPU) にわたってスケーリングするように設計されています。 ほとんどの場合、より高速なプロセッサを搭載したコンピューターを使用してスケール "アップ" するこのアプローチは、適切なトレーニング パフォーマンスを提供します。

ただし、複雑なニューラル ネットワークや大量のトレーニング データを操作する必要がある場合は、複数のワーカー ノード間で処理タスクをスケール "アウト" する Apache Spark 固有の機能が役立つ場合があります。

Azure Databricks では、複数のワーカー ノードを含めることができる Spark クラスターを使用します。 これらのクラスターを最大限に利用するには、TorchDistributor を使用します。これは、クラスター内のノード全体に PyTorch トレーニング ジョブを分散できるオープンソース ライブラリです。 TorchDistributor は Databricks Runtime ML 13.0 以降で使用できます。

PyTorch を使用してモデルを既にトレーニングしている場合は、次の方法で、単一プロセス トレーニングを TorchDistributor を使用した分散トレーニングに変換できます。

  1. 既存のコードを調整する: 分散トレーニングと互換性のある単一ノード トレーニング コードを変更します。 トレーニング ロジックが 1 つの関数内にカプセル化されていることを確認します。
  2. トレーニング関数内でインポートを移動する:一般的な pickle 化エラーを回避するには、import torch などの必要なインポートをトレーニング関数内に配置します。
  3. トレーニング関数を準備する: モデル、オプティマイザー、損失関数、トレーニング ループをトレーニング関数内に含めます。 モデルとデータが適切なデバイス (CPU または GPU) に移動されていることを確認します。
  4. TorchDistributor のインスタンスを作成して実行する:必要なパラメーターを指定して TorchDistributor のインスタンスを作成し、.run(*args) を呼び出して分散トレーニングを開始します。

既存のコードを調整する

まず、分散トレーニングと互換性があるように単一ノードのトレーニング コードを変更する必要があります。 コードを変更するときは、トレーニング ロジックを確実に 1 つの関数内にカプセル化する必要があります。 この関数は、トレーニングを複数のノードに分散するために TorchDistributor によって使用されます。

import torch.nn as nn

class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(10, 1)
    
    def forward(self, x):
        return self.fc(x)

これで、torch.utils.data.DataLoader を使用して PyTorch と互換性のある形式のデータセットを準備できるようになりました。

# Sample data
inputs = torch.randn(100, 10)
targets = torch.randn(100, 1)

# Create dataset and dataloader
from torch.utils.data import DataLoader, TensorDataset
dataset = TensorDataset(inputs, targets)
dataloader = DataLoader(dataset, batch_size=10)

トレーニング関数内でインポートを移動する

一般的な pickle 化エラーを回避するには、import torch などの必要なインポートをトレーニング関数内に配置します。 すべてのインポートをトレーニング関数内に配置すると、関数が複数のノードに分散されている場合でも、必要なすべてのモジュールを確実に使用できるようになります。

トレーニング関数を準備する

モデル、オプティマイザー、損失関数、トレーニング ループをトレーニング関数内に含めます。 モデルとデータが適切なデバイス (CPU または GPU) に移動されていることを確認します。

def train_model(dataloader):
    import torch
    import torch.nn as nn
    from torch.optim import SGD

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = SimpleModel().to(device)
    optimizer = SGD(model.parameters(), lr=0.01)
    loss_fn = nn.MSELoss()
    
    for epoch in range(10):
        for batch in dataloader:
            inputs, targets = batch
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_fn(outputs, targets)
            loss.backward()
            optimizer.step()

TorchDistributor のインスタンスを作成して実行する

必要なパラメーターを指定して TorchDistributor のインスタンスを作成し、.run(*args) を呼び出して分散トレーニングを開始します。 TorchDistributor を実行すると、トレーニング タスクは複数のノードに分散されます。

from pyspark.ml.torch.distributor import TorchDistributor

# Distribute the training
distributor = TorchDistributor(num_workers=4)
distributor.run(train_model, dataloader)

トレーニング ジョブを監視して評価する

組み込みのツールを使用して、CPU または GPU の使用率、メモリ使用率などのクラスターのパフォーマンスを監視できます。 トレーニングが完了したら、PyTorch 評価手法を使用して検証データセットまたはテスト データセット上のモデルを評価し、モデルのパフォーマンスを評価できます。

# Evaluate the model (after distributed training is complete)
model.eval()
with torch.no_grad():
    for inputs, targets in dataloader:
        outputs = model(inputs)
        # Perform evaluation logic

ヒント

詳細については、TorchDistributor を使用した分散トレーニングに関する記事を参照してください。