適当なメモブログ

雑なまとめです。話半分で見てください。

Pytorch 並列 DataParallel/DistributedDataParallelについて


執筆途中。あくまでメモなので注意
Pytorchの並列化について。
GAN等の重たいモデルを学習する際や、バッチサイズを大きくしたかったり、学習を高速で終えるために複数のGPUを使いたいときがあります。
そういった場合「並列処理」を使います。

PytorchにはDataParallel と DistributedDataParallelの2つがあります。DDPを使うと学習が早く終わります。
https://i.gyazo.com/170cbb889c0cb87e6346343e8aa2c64e.png
Improvement of DDP is needed! · Issue #463 · ultralytics/yolov5 · GitHub

面倒なので説明は省略しますが、DPだとPythonのGIL(グローバルインタプリタロック)の制限がボトルネックになって遅い為DDPを使うと早くなります。


CPU

# CPU
import torch
import torchvision
import torch.nn as nn

class NN(nn.Module):
    def __init__(self):
        super().__init__()
        self.n = nn.Sequential(
            nn.Flatten(),
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
      return self.n(x)
data = torchvision.datasets.MNIST(root= "data", train=True, download=True, transform = torchvision.transforms.ToTensor())
data_loader = torch.utils.data.DataLoader(data, batch_size=64, shuffle=True, num_workers=2, pin_memory=True)

model = NN()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

for epoch in range(2):
  total_loss = 0
  for imgs, labels in data_loader:
    predict = model(imgs)
    loss = criterion(predict, labels)
    total_loss+= loss.item()
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  print(f"{epoch:3d}: {total_loss:.4f}")
torch.save(model.state_dict(), 'model.pth')

Single GPU

# CPU
model = NN()
predict = model(imgs)

モデルとデータにcuda()をつけるだけ。

# Single GPU
model = NN().cuda()
predict = model(imgs.cuda())
labels = labels.cuda()

DataParallel

# Single GPU
model = NN().cuda()
predict = model(imgs.cuda())
torch.save(model.state_dict(), 'model.pth')

modelをtorch.nn.DataParallelで包んであげるだけ。
デフォルトだと見えるGPU全部使うので、GPU番号を指定してください。

# Multi GPU(DP)
model = NN().cuda()
model = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
predict = model(imgs.cuda())
torch.save(model.module.state_dict(), 'model.pth')

または、python a.pyの前に、CUDA_VISIBLE_DEVICES=0,1,2をつけて見えるGPUを制限するのもいいです。

# Multi GPU(DP)
# CUDA_VISIBLE_DEVICES=0,1,2
model = NN().cuda()
model = torch.nn.DataParallel(model)
predict = model(imgs.cuda())
torch.save(model.module.state_dict(), 'model.pth')

備考として各GPUに送られるバッチサイズは、宣言したバッチサイズ/並列にした個数になります。
また、モデルを保存するときはmoduleを呼び出して上げてください。
理由/忘れた場合 Pytorchのnn.DataParallelを使ったモデルを保存するとloadするときにエラーになる問題 - Qiita

DistributedDataParallel

いくつかあるので分割します。
まず最初におまじないを書きます。

# Multi GPU(DDP)
import os
rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(rank)
world_size = torch.cuda.device_count()
torch.distributed.init_process_group(backend='nccl', init_method='env://', world_size=world_size)

world_sizeは並列処理の個数
rankはそのうちの何個目かを示すものです。

.to(rank)

でGPUにデータを送ってますが、ネットのコードを見てみると「4GPUで8並列」とかやってるコードもありました。そういうことをやる場合は

gpu_id = rank % world_size

とか適当な変数付けて

.to(gpu_id)

とかすると良いです。

# single
data_loader = torch.utils.data.DataLoader(data, batch_size=64, shuffle=True, num_workers=2, pin_memory=True)

data_loaderはsamplerというものを使います。
各epochの最初でset_epochを宣言することを忘れずに。

# Mult GPU(DDP)
sampler = torch.utils.data.distributed.DistributedSampler(data, rank=rank)
data_loader = torch.utils.data.DataLoader(data, batch_size=64, num_workers=2, pin_memory=True, sampler=sampler)
for epoch in range(100):
    sampler.set_epoch(epoch)
# single
model = model.cuda()

DataParallelと同じように、ラップしてあげます。

# Mult GPU(DDP)
model = torch.nn.parallel.DistributedDataParallel(model.to(rank), device_ids=[rank])


モデルを保存するときはDP同様moduleを使うこと。
注意点として、何かしらのTensorの値を全GPUで共有したいときは

torch.distributed.all_reduce(tensor)

を使ってください。
tensorに共有された値が入ります。
呼び出し方は
CUDA_VISIBLE_DEVICES=1,2,3,4 torchrun --nnodes=1 --nproc_per_node=4 hoge.py (args1) (...)
です。

全コード 比較

CPU

# CPU
import torch
import torchvision
import torch.nn as nn

class NN(nn.Module):
    def __init__(self):
        super().__init__()
        self.n = nn.Sequential(
            nn.Flatten(),
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
      return self.n(x)
data = torchvision.datasets.MNIST(root= "data", train=True, download=True, transform = torchvision.transforms.ToTensor())
data_loader = torch.utils.data.DataLoader(data, batch_size=64, shuffle=True, num_workers=2, pin_memory=True)

model = NN()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

for epoch in range(2):
  total_loss = 0
  for imgs, labels in data_loader:
    predict = model(imgs)
    loss = criterion(predict, labels)
    total_loss+= loss.item()
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  print(f"{epoch:3d}: {total_loss:.4f}")
torch.save(model.state_dict(), 'model.pth')

1 GPU

# Single GPU
import torch
import torchvision
import torch.nn as nn

class NN(nn.Module):
    def __init__(self):
        super().__init__()
        self.n = nn.Sequential(
            nn.Flatten(),
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
      return self.n(x)
data = torchvision.datasets.MNIST(root= "data", train=True, download=True, transform = torchvision.transforms.ToTensor())
data_loader = torch.utils.data.DataLoader(data, batch_size=64, shuffle=True, num_workers=2, pin_memory=True)

model = NN().cuda()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

for epoch in range(2):
  total_loss = 0
  for imgs, labels in data_loader:
    predict = model(imgs.cuda())
    loss = criterion(predict, labels.cuda())
    total_loss+= loss.item()
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  print(f"{epoch:3d}: {total_loss:.4f}")
torch.save(model.state_dict(), 'model.pth')

複数GPU(Data Parallel)

# Multi GPU(DP)
import torch
import torchvision
import torch.nn as nn

class NN(nn.Module):
    def __init__(self):
        super().__init__()
        self.n = nn.Sequential(
            nn.Flatten(),
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
      return self.n(x)
data = torchvision.datasets.MNIST(root= "data", train=True, download=True, transform = torchvision.transforms.ToTensor())
data_loader = torch.utils.data.DataLoader(data, batch_size=64*torch.cuda.device_count(), shuffle=True, num_workers=2, pin_memory=True)

model = NN().cuda()
model = torch.nn.DataParallel(model)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

for epoch in range(2):
  total_loss = 0
  for imgs, labels in data_loader:
    predict = model(imgs.cuda())
    loss = criterion(predict, labels.cuda())
    total_loss+= loss.item()
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  print(f"{epoch:3d}: {total_loss:.4f}")
torch.save(model.module.state_dict(), 'model.pth')

複数GPU(Distributed Data Parallel)

# CUDA_VISIBLE_DEVICES={使うGPUのID} torchrun --nnodes 1 --nproc_per_node {使用するGPUの個数} sample.py
# CUDA_VISIBLE_DEVICES=0,1 torchrun --nnodes 1 --nproc_per_node 2 sample.py

# multi GPU(DDP)
import torch
import torchvision
import torch.nn as nn
import os

class NN(nn.Module):
    def __init__(self):
        super().__init__()
        self.n = nn.Sequential(
            nn.Flatten(),
            nn.Linear(28*28, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
      return self.n(x)

rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(rank)
world_size = torch.cuda.device_count()
torch.distributed.init_process_group(backend='nccl', init_method='env://', world_size=world_size)

data = torchvision.datasets.MNIST(root= "data", train=True, download=True, transform = torchvision.transforms.ToTensor())
sampler = torch.utils.data.distributed.DistributedSampler(data, rank=rank)
data_loader = torch.utils.data.DataLoader(data, batch_size=64, sampler=sampler, num_workers=2, pin_memory=True)

model = NN()
model = torch.nn.parallel.DistributedDataParallel(model.to(rank), device_ids=[rank])
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

for epoch in range(2):
  sampler.set_epoch(epoch)
  total_loss = 0
  for imgs, labels in data_loader:
    predict = model(imgs.cuda())
    loss = criterion(predict, labels.cuda())
    total_loss+= loss.item()
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
  total_loss = torch.Tensor([total_loss])[0].cuda()
  torch.distributed.all_reduce(total_loss)
  total_loss = total_loss.item()
  if rank == 0:
    print(f"{epoch:3d}: {total_loss:.4f}")
if rank == 0:
    torch.save(model.module.state_dict(), 'model.pth')