DataLoader 和 Iterator 的差異

在 trace AI 課程中的訓練代碼時, 看到 iterator 和 DataLoader 同場出現時有點暈, 我想說這兩個做的事情不是一樣嗎? 所以花點時間將這個疑惑釐清.

先講結論: Iterator 是 design pattern, DataLoader 是 PyTorch 的 class.

Iterator 做為 design pattern 的好處是: 不用管 dataset 真正長什麼樣子, 每次都都抓出一包 data, 還用不到的先不抓, 等下次抓, 以節省記憶體.

那一包要抓多大呢? 透過 DataLoader 這個 class 取得 dataloader 這個 object.

然後 data_iterator 是個 iterator, iter() 把 dataloader 轉換成可迭代的 object.

first_batch 的 type 取決 Dataset, 主要就是一包 data. 而第一次呼叫 next() 拿到的是第一包, 而不是第二包.

from torch.utils.data import DataLoader, Dataset
...
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
....
data_iterator = iter(dataloader)
first_batch = next(data_iterator)

在 Tensorflow 中, 沒有 DataLoader, 但有 tf.data.Dataset. 效果也是一次抓一包.

import tensorflow as tf

# Create a basic dataset
dataset = tf.data.Dataset.range(10)

# Apply transformations (e.g., batch, shuffle)
batched_dataset = dataset.batch(2)

# Iterate through the dataset
for batch in batched_dataset:
    print(batch)

訓練模型需要多少計算量?

DeepSeek 問世後, 很多人討論它需要多少計算量? 我在此做個小整理.

基本上, 而一個 neural node 至少有 weighting 和 bias 要訓練. 而每層輸出共用一個 bias. 請 DeepSeek R1 想了 17 秒, 整理如下:

  • 對於 Dense layer, 參數 = Node 數 + 1.
  • 對於 convolution layer, 參數 = channel 數 * width * height + 1.
  • 對於 RNN layer, 參數 = (輸入維度 + hidden state 維度 h) * h + h.
  • 對於 LSTM, 參數=RNN * 4.
  • 對於 L 層 transformer, 參數 = L * (12h2 + 13h) * Vh ~= 12Lh2 , 其中 V 是 vocabulary size (詞表). [3]

如果是做研究, 可能要參考 [2] 的表 1.

不管是哪一種 weighting parameter, 只要可以被訓練, 我們就算它一個參數. 至於不能訓練的, 像是 ReLU, 這些都不會列入參數, 只會佔據 memory 和 CPU time.

接下來的問題是每個參數要被算幾次才完成 training. 如果去問 AI 的話, 有些回答真的很離譜, 像是 Gemini 算出來比別人大好幾個數量級; 有些 model 會說要除以 batch size. 實際上 batch size 只影響 gradient 的更新頻率. batch 愈多次應該要乘愈多次而不是除. 所以還是問人類比較可靠 [3], DeepSeek R1 也還行, 只是它想了 128 秒.

FLOP s≈ 6 × 參數數量 × 訓練總 token 總數.

例如 GPT-3 175B 參數, 訓練 300B token, 需要 3.15 * 1023 FLOPS. 和官方公布的 3.14* 1023 FLOPS 接近.

其中 6 來自 forward (矩陣, activation) 2 次計算和 backward (梯度, 參數更新) 的 4 次相加. 至於訓練總 token 數, 其實已經吸收了 batch size 參數. 這邊再找個證據.

根據 [2], 跳過各種英文或數學, 3.3 或 5.1. 裡面都引用了 C = 6NBS. 其中 C = training computing, B = batch size, S = number of parameter updates (訓練總 token 數), and N = non-embedding parameter count (參數數量).

什麼是 non-embedding parameter? 就是所有的 parameter 扣掉 token 和 position embedding . 包括:

  • Dense/Linear layers
  • Attention layers
  • Convolutional layers
  • Batch normalization layers
  • Any other trainable layers except embeddings

另外根據 Chinchilla 法則 [5], 訓練量應為參數的 20 倍. 所以 BS = 20N. 前面提到的 GPT-3 175B, 大約需要 3500 B token 的訓練才達到最佳效果. 它只訓練了 300B token, 理論上還有進步空間. 據說這是因為 GPT-3 發表於 2020 年, 而 Chinchilla 法則發表於 2022 年.

再回到風暴的起點, Deepseek 自稱是用 2.788K (pre-train 2664K) H800 GPU hours train 14.8 T tokens. 然而, 如果用上面的 6ND, 搭配 H800 Spec. 和論文中的數據 [7], MFU (Model FLOPS Utilization) 會超過 100%, 完全不合理.

MFU = 訓練需要的算力 / GPU 提供的算力 = (6 * 14.8T * 671B) / (2664 K Hours * 3600 second/Hours * 3026 TFLOPS) = 205%.

其中 MOE, MLA, MTP 可能有省到算力, 但就算把 MFU 壓到 100% 都解釋不通. 當然也可以說 DeepSeek 說謊 – 短報時數. 解決不了問題就解決人. 哈!

歸納起來, 此時不能用 6ND, 要參考 [8]. 用 “3 * 計算 Deepseek v3 的所有 forward 操作" 來代替 6ND. 如此就可以算出 MFU = 35~45% [6], 這樣起碼是落在合理範圍 (< 60%).

其實 [3] 和 [6] 都寫得很好, [6] 尤其完整. 不過我悶著頭寫了一大段才 reference 到他們, 只好改一改重點, 讓它們當配角了.

附帶一提. GPU 卡上的 RAM size, 決定了要用幾張卡才塞得下整個 model 的參數. Training 時要存 weighting, activation function, optimizer, gradient, 抓個 4 倍. Inference 時需要weighting, activation 和 bias. 抓 2.5 倍. 這是一般狀況, 非特別指DeepSeek.

[REF]

  1. GPT-3, The Model Simply Knows! – Analixa
  2. Scaling Laws for Neural Language Models
  3. AI大模型训练相关参数如何估算?有这一篇就够了
  4. https://zhuanlan.zhihu.com/p/606038646
  5. https://arxiv.org/pdf/2203.15556
  6. 【LLM 專欄】Deepseek v3 的訓練時間到底合不合理?淺談 LLM Training efficiency
  7. https://en.wikipedia.org/wiki/DeepSeek
  8. https://zhuanlan.zhihu.com/p/16445683081

分散式機器學習小註解

先前學過的 Tensorflow model training, 久而久之也還給老師了. 今天趁颱風假學習分散式的機器學習, 順便把基本功也補一些回來!

需要分散式學習的原因就是 AI (ML) model 太大了, 因此有各式各樣的方法把工作分散給不同的 CPU, GPU, TPU (後續用 NPU 涵蓋之). 分散的方法包括把 training data 分散給大量 NPU, 或是把 model 的不同層拆給不同的 NPU 做. 後者比較沒有參數及時互通的問題就略過不談.

Data 分散出去給不同的 NPU 學習, 最明顯的問題就是大家拿到的 data 不同, 算出來的 gradient 理論上也不同, 那要麼收斂到同一版呢? 解決這個問題的架構 (architecture) 有兩個大方向, 分別為 synchronous 和 Asynchronous 兩種.

Synchronous 架構下, 每個 NPU 都要等其它 NPU 做完一個段落. 然後大家同步一下參數. 同步的方式可能只是將大家得到的數取個平均. 假設原本一個 update grdient 的段落是一個 batch. 現在有 N 個NPU 均分 data, 所以每個 NPU 只要做一個 mini-batch.

mini-batch = batch * strategy.num_replicas_in_sync (e.g. 1/N)

在一台機器有多個 NPU 的時候, 我們稱這個策略為 mirrorstrategy. 因為每個 NPU 做的事情都一樣, 像是照鏡子. 假如我有多台機器, 每一台機器 (worker) 都執行 mirrorstrategy, 此時稱為 multi-worker mirror strategy. 實際上的精神都是一樣的. 就是三個動作:

  1. 初始化:所有 worker 從相同的初始模型參數開始。
  2. 同步:取一個段落, 每個 worker 把自己的 grafient 傳出去. 每個 worker 看到其他所有 worker 的資料後, 做個計算 (像是取平均). 因為只要有一個 worker 沒更新, 大家都缺一筆資料, 所以不需要中控也可以自動進行.
  3. 參數更新:每個 worker 使用同步的梯度, 以 optimizer 更新其模型參數, 確保所有工作者在每一步都有相同的模型參數。

至於 asynchronous architecture 就沒有互等的機制, 參數 (weight, bias) 會放在一個以上的 parameter server (PS). 大家都去跟它要參數就對了. 等到 worker 算出自己的 gradient, 就把它傳給 PS, PS 負責用大家的 gradient 更新出新的參數給下一個 worker 抓取. 講義原文如下:

Each worker independently fetches the latest parameters from the parameter servers and computes gradients based on a subset of training samples. It then sends the gradients back to the parameter server, which then updates its copy of the parameters with those gradients.

由 Asynchronous 的架構會有大量的資料要傳遞 (weight, bias), 所以適合用在參數大多是 0 的 sparse model. 而參數大多不是 0 的 dense model (如 BERT, GPT) 就更適合 synchronous architecture. 言下之意是傳的時候多少會做壓縮吧!

MirroredStrategy 的 sample code 如下, 藍色字是和普通 training 不一樣的地方. 特別留意是 model 這行退縮到 with strategy.scope(): 的下一層.

import tensorflow as tf
import tensorflow_datasets as tfds

# Load the dataset

datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets['train'], datasets['test']

# Define the distribution strategy
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

# Set up the input pipeline
BUFFER_SIZE = 10000
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
test_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

# Build and compile the model within the strategy scope
with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10)
    ])

    model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  optimizer=tf.keras.optimizers.Adam(),
                  metrics=['accuracy'])

# Train the model
model.fit(train_dataset, epochs=10, validation_data=test_dataset)