使用 TorchDistributor 分配 PyTorch 训练

已完成

与其他深度学习框架(如 TensorFlow)一样,PyTorch 的设计目的是在单台计算机上跨多个处理器(CPU 或 GPU)进行缩放。 在大多数情况下,这种使用具有更多或更快处理器的计算机进行纵向扩展的方法提供了足够的训练性能。

但在需要使用复杂的神经网络或大量训练数据时,你可以受益于 Apache Spark 可以跨多个工作器节点横向扩展处理任务的固有功能。

Azure Databricks 使用可包含多个工作器节点的 Spark 群集。 要充分利用这些群集,可以使用开放源代码库 TorchDistributor,它支持你跨群集中的节点分配 PyTorch 训练作业。 TorchDistributor 在 Databricks Runtime ML 13.0 及更高版本上可用。

使用 PyTorch 训练模型后,可以通过以下方法将单进程训练转换为分布式训练:

  1. 调整现有代码:修改单节点训练代码以与分布式训练兼容。 确保训练逻辑封装在单个函数中。
  2. 在训练函数中移动导入:在训练函数内部放置必要的导入(例如 import torch)以避免常见的序列化错误。
  3. 准备训练函数:在训练函数中包括模型、优化器、损失函数和训练循环。 确保将模型和数据移动到相应的设备(CPU 或 GPU)。
  4. 实例化并运行 TorchDistributor:使用所需参数创建 TorchDistributor 的实例,并调用 .run(*args) 以启动分布式训练。

调整现有代码

首先,需要修改单节点训练代码,以便与分布式训练兼容。 修改代码时,需要确保训练逻辑封装在单个函数中。 此函数由 TorchDistributor 用于跨多个节点分配训练。

import torch.nn as nn

class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(10, 1)
    
    def forward(self, x):
        return self.fc(x)

现在,可以使用 torch.utils.data.DataLoader 准备格式与 PyTorch 兼容的数据集。

# Sample data
inputs = torch.randn(100, 10)
targets = torch.randn(100, 1)

# Create dataset and dataloader
from torch.utils.data import DataLoader, TensorDataset
dataset = TensorDataset(inputs, targets)
dataloader = DataLoader(dataset, batch_size=10)

在训练函数中移动导入

要避免常见的序列化错误,请在训练函数内部放置必要的导入(例如 import torch)。 将所有导入放置在训练函数中可确保在跨多个节点分配函数时,所有必需的模块均处于可用状态。

准备训练函数

在训练函数中包括模型、优化器、损失函数和训练循环。 确保将模型和数据移动到相应的设备(CPU 或 GPU)。

def train_model(dataloader):
    import torch
    import torch.nn as nn
    from torch.optim import SGD

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = SimpleModel().to(device)
    optimizer = SGD(model.parameters(), lr=0.01)
    loss_fn = nn.MSELoss()
    
    for epoch in range(10):
        for batch in dataloader:
            inputs, targets = batch
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_fn(outputs, targets)
            loss.backward()
            optimizer.step()

实例化并运行 TorchDistributor

使用所需参数创建 TorchDistributor 的实例,并调用 .run(*args) 以启动分布式训练。 运行 TorchDistributor 可跨多个节点分配训练任务。

from pyspark.ml.torch.distributor import TorchDistributor

# Distribute the training
distributor = TorchDistributor(num_workers=4)
distributor.run(train_model, dataloader)

监视和评估训练作业

可以使用内置工具来监视群集的性能,包括 CPU 或 GPU 使用情况以及内存利用率。 训练完成后,可以使用 PyTorch 评估技术在验证或测试数据集上评估模型,以评估模型的性能。

# Evaluate the model (after distributed training is complete)
model.eval()
with torch.no_grad():
    for inputs, targets in dataloader:
        outputs = model(inputs)
        # Perform evaluation logic