LCEL 與 Agent

LCEL 全名 LangChain Expression Language, 是一種描述 LangChain 架構的語言。最大的特徵就是看到 A = B | C | D 這種表示法 – 說明了 LCEL 串接多個可執行單元的特性,是 LangChain 的進階實現。而 ‘|’ 的理解和 Linux 的 pipe 很像,它是非同步的串接。

舉例來說,一個 LLM 的 LangChain 可以表示為:

chain = prompt | LLM model | Output Parser

依序做好這三件事:有方向性的提示、強大的大語言模型、友善的輸出格式,就可以提升使用者體驗。

但是顯然這樣還不太夠,比方說,LLM 需要查一筆它沒被訓練過的資料,在上面的 pipe 就無法做到。換個例子,我想幫 AI 助理取個名字,它也會說好。但是一轉眼什麼山盟海誓都忘光了!

顯然,我們要有個負責的 agent,把 function call 的能力加進來;而且 call 完之後,還要餵給 LLM 做 “人性化" 的自然語言潤飾。這是一個循環的路徑,直到 AI 判斷它不再需要任何外部工具,就可以結束迴圈,跟使用者報告最終結果了。

那麼這個 code 長什麼樣子? 首先把基本元件準備好:

# 初始化模型
llm = ChatOpenAI(model="gpt-4o", temperature=0)

# 將工具打包成列表
tools = [get_current_weather] # 以問天氣的函式為例

# 給出配合 function 的 prompt
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "你是天氣助理,請根據工具的結果來回答問題。"),
        ("placeholder", "{chat_history}"),  # 預留給歷史訊息
        ("human", "{input}"), # 真正的輸入
        ("placeholder", "{agent_scratchpad}"), # 於思考和記錄中間步驟
    ]
)

創建 agent

# 創建 Tool Calling Agent
# 將 LLM、Prompt 和 Tools 組合起來,處理 Function Calling 的所有複雜流程
agent = create_tool_calling_agent(llm, tools, prompt)

執行 agent

# 創建執行器, 跑 模型思考 -> 呼叫工具 -> 再次思考 -> 輸出答案的循環
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

設計人機介面

def run_langchain_example(user_prompt: str):
    print(f"👤 用戶提問: {user_prompt}")
    
    # 使用 invoke 運行 Agent,LangChain 會自動管理多輪 API 呼叫
    result = agent_executor.invoke({"input": user_prompt})
    
    print(f"🌟 最終答案: {result['output']}")

測試案例

# 示範一:Agent 判斷需要呼叫工具
run_langchain_example("請問新竹現在的天氣怎麼樣?請告訴我攝氏溫度。")

# 示範二:Agent 判斷不需要呼叫工具,直接回答
run_langchain_example("什麼是 LLM?請用中文簡短回答。")

這邊要留意的是,第一個問天氣的問題,LLM 顯然要用到 tool 去外面問,第二個問題不需要即時的資料,所以它自己回答就好。

意猶未盡的讀者可能好奇 function 長怎樣? 其實就很一般,主要的特色是用 @tool decorator。這樣可以獲得很多好處,最重要的一點是 agent 都認得它的 JSON 輸出,方便資料異步流動。

# LangChain 會自動將這個 Python 函數轉換成模型可理解的 JSON Schema
from langchain_core.tools import tool

@tool
def get_current_weather(location: str, unit: str = "celsius") -> str:
    if "新竹" in location or "hsinchu" in location.lower():
        return "風大啦! 新竹就是風大啦!"
    else:
        return f"我不知道 {location} 的天氣啦!"

另外,追根究柢的讀者可能想問,code 裡面怎麼沒看到 ‘|’? 它跑那裡去了? 沒錯,上面講的都是 agent,它比較進階,可以動態跑流程。反而 LCEL 只是 LangChain 的實行方式,它就是一個線性的 chain。

我們由奢返儉,回過頭來看 LCEL,它不能跟 agent 比,只能打敗傳統的 LangChain。標為紅色的是 LCEL 特色的 coding style。

def demonstrate_legacy_chain():
 
    # 定義模型
    llm = ChatOpenAI(temperature=0)
    
    # 定義 Prompt Template
    template = "Translate English text to Chinese: {text}"
    prompt = PromptTemplate(template=template, input_variables=["text"])
    
    # 建立 Chain (透過類別組合)
    # 缺點:語法較冗長,看不到資料流向,且 output 通常包含原始 meta data
    chain = LLMChain(llm=llm, prompt=prompt)
    
    # 執行
    input_text = "Make American Great Again!"
    result = chain.run(input_text)

VS

def demonstrate_lcel():
    
    # 定義模型
    model = ChatOpenAI(temperature=0)
    
    # 定義 Prompt Template, 使用更現代的 ChatPromptTemplate
    prompt = ChatPromptTemplate.from_template("Translate English text to Chinese: {text}")
    
    # 定義 Output Parser (將 AI Message 轉為純字串)
    output_parser = StrOutputParser()
    
    # 建立 Chain (使用 Pipe '|' 運算符)
    # 優點:Unix 風格管道,由左至右邏輯清晰,易於修改和擴展
    chain = prompt | model | output_parser
    
    # 執行
    input_text = "Make American Great Again!"
    result = chain.invoke({"text": input_text})

這兩個都是一次性的 Q&A。

  1. Functions, Tools and Agents with LangChain

超學習時代

最近的 Gemini 3 Pro 真的變比較聰明,所以我取消 Monica 的訂閱,改訂 Gemini。現在想要學習最新的技術,不但學校教不了;網路課程也教不了。就算是追著科技網紅,心裡沒有譜的話,也會像個無頭蒼蠅一樣、不會授粉只會傳播細菌,哈!

以 AI 技術來說,訓練模型、微調模型、RAG (檢索增強生成) 都是舊世代的技術。次世代的技術重點在於 Reasoning 和 Agency。雖然這個發展有跡可循、合情合理,但是沒有前面的跌跌撞撞,也絕不可能一步到位。短短一兩年之間,我們有了下面的這些進化。

[觀念改變]

一個 AI model 自己角色扮演 –> 建立認知架構:記憶、規畫、反思。

LangChain –> LangGraph, 線性思考 –> 非線性思考、圖論、立體化。

Funcation call –> Tool call。錯誤檢查和自我校正。

[模型調校]

Supervised Fine-Tuning、 RLHF (reinforcement learning from human feedback) –> DPO (Direct Preference Optimization)、IPO (Identity Preference Optimization)、KTO (Kahneman-Tversky Optimization)。

全能模型 –> SLERP (Spherical Linear Interpolation)、TIES-Merging。

自我對局 (Self-Play) 強化。

[In Model Learning]

Prompt –> DSPy (Declarative Self-improving Language Programs),透過 Compiler 自動尋找 Prompt 組合。

快問快答 –> Chain of Tought、Tree of Thought –> Test-Time Compute (有節制地想久一點,時間換取品質)

[模型評估]

BLEU、ROUGE 考試 –> LLM-as-a-Judge,自動評估

多元評價 – RAGAS

  • R:Retrieval(檢索)
  • A:Accuracy(準確性)
  • G:Generality(通用性)
  • A:Adherence(遵循性)
  • S:Stability(穩定性)

[多模態]

OCR + LLM –> 原生多模態 (Native Multimodel)、audio/video tokenization。

文字到文字 –> any-to-any interaction

上述有很多新的東西,也有一些半新半舊,我打算加強 Agentic、MCP 這方面的知識,然後快速進入 DSPy 的領域。

附帶一提,雖然退訂 Monica 可以省錢,但是我還不打算把 Coursera 停掉。因為上面還是有很多 Andrew Ng 開的短課程。愈長的課程愈容易過時、短課程甚至像  Andrej Karpathy 的 Youtube 都可能有一些最新的東西。至於 ArXiv 肯定好料滿滿,但是可能要叫 NoteLLM 幫我讀了。

DataLoader 和 Iterator 的差異

在 trace AI 課程中的訓練代碼時, 看到 iterator 和 DataLoader 同場出現時有點暈, 我想說這兩個做的事情不是一樣嗎? 所以花點時間將這個疑惑釐清.

先講結論: Iterator 是 design pattern, DataLoader 是 PyTorch 的 class.

Iterator 做為 design pattern 的好處是: 不用管 dataset 真正長什麼樣子, 每次都都抓出一包 data, 還用不到的先不抓, 等下次抓, 以節省記憶體.

那一包要抓多大呢? 透過 DataLoader 這個 class 取得 dataloader 這個 object.

然後 data_iterator 是個 iterator, iter() 把 dataloader 轉換成可迭代的 object.

first_batch 的 type 取決 Dataset, 主要就是一包 data. 而第一次呼叫 next() 拿到的是第一包, 而不是第二包.

from torch.utils.data import DataLoader, Dataset
...
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
....
data_iterator = iter(dataloader)
first_batch = next(data_iterator)

在 Tensorflow 中, 沒有 DataLoader, 但有 tf.data.Dataset. 效果也是一次抓一包.

import tensorflow as tf

# Create a basic dataset
dataset = tf.data.Dataset.range(10)

# Apply transformations (e.g., batch, shuffle)
batched_dataset = dataset.batch(2)

# Iterate through the dataset
for batch in batched_dataset:
    print(batch)

Pytorch 轉 ONNX 小筆記

基本上這個轉換有兩大類方法, 一類是用官方 tool 去轉, 另一類就是寫個 Python 小程式去做. 原先我都是嘗試後面這路, 但要顧慮的東西很多, 一下修語法, 一下 memory 爆掉, 而是默默出錯時也會轉出 model, 要測試過才知道它的智力有沒有受損?搞得滿累的.

當我再次卡在下面這個檔案限制時, 我就決定換方法了 (悔不當初).

RuntimeError: The serialized model is larger than the 2GiB limit imposed by the protobuf library. Therefore the output file must be a file path, so that the ONNX external data can be written to the same directory. Please specify the output file name.

官方做法其實很簡單, 唯一要顧慮的是 onnx, onnxruntime, onnxruntime_genai 這三個軟體有沒有跟系統衝突? 有沒有跟 NPU tool 衝突 ? 這些搞定就可以了. 用 CPU 也不會轉太久. 這次的障礙是 DeepSeek 跟我講錯指令, 下面這行跑起來找不到 builder.

python -m onnxruntime_genai.builder --model microsoft/phi-2 --precision fp16

我去 onnxruntime 安裝的目錄下找, 確實也沒有對應的程式, 所以我把 Monica 預設的 DeepSeek R1 切到提供第二個意見的 Claude Sonnet V3.7, 它就指出 DeepSeek 的錯誤了, 哈!正確指令如下:

python -m onnxruntime_genai.models.builder \
  --model microsoft/phi-2 \
  --precision fp32 \
  --output ./phi-2-onnx \
  --execution_provider cpu \
  --cache_dir ~/.cache/huggingface \
  --extra_options trust_remote_code=True

轉完之後, 當然要測試一下有沒有問題? 如果發現它答非所問, 應該就是轉錯了. 然而, 我發現 DeepSeek R1 寫的測試程式還是遜了一點, 所以我又讓 Claude 重寫一次.

import numpy as np
import onnxruntime as ort
from transformers import AutoTokenizer
from typing import List, Dict, Optional, Tuple
import time

class Phi2ONNXGenerator:
    def __init__(self, model_path: str, tokenizer_path: str = "microsoft/phi-2"):
        """初始化 Phi-2 ONNX 生成器"""
        # 載入分詞器
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)
        self.tokenizer.pad_token = self.tokenizer.eos_token
        
        # 設定 ONNX 執行選項以優化效能
        sess_options = ort.SessionOptions()
        sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        sess_options.intra_op_num_threads = 4  # 調整為您的 CPU 核心數
        sess_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
        
        # 建立推理會話
        self.session = ort.InferenceSession(
            model_path, 
            sess_options=sess_options,
            providers=['CPUExecutionProvider']
        )
        
        # 獲取模型輸入輸出資訊
        self.input_names = [input.name for input in self.session.get_inputs()]
        self.output_names = [output.name for output in self.session.get_outputs()]
        
        # 模型常數
        self.num_layers = 32  # Phi-2 有 32 層注意力層
        self.head_dim = 80    # 每個注意力頭的維度
        self.num_heads = 32   # 注意力頭數量
        
        # 快取字首
        self.key_prefix = 'past_key_values.'
        self.key_suffix = '.key'
        self.value_suffix = '.value'

    def _initialize_kv_cache(self, batch_size: int = 1) -> Dict[str, np.ndarray]:
        """初始化 KV 快取為零張量,使用預分配記憶體"""
        kv_cache = {}
        for i in range(self.num_layers):
            k_name = f'{self.key_prefix}{i}{self.key_suffix}'
            v_name = f'{self.key_prefix}{i}{self.value_suffix}'
            
            # 預分配零張量
            kv_cache[k_name] = np.zeros(
                (batch_size, self.num_heads, 0, self.head_dim), dtype=np.float32
            )
            kv_cache[v_name] = np.zeros(
                (batch_size, self.num_heads, 0, self.head_dim), dtype=np.float32
            )
        return kv_cache

    def _prepare_inputs(self, 
                        input_ids: np.ndarray, 
                        attention_mask: np.ndarray, 
                        kv_cache: Optional[Dict[str, np.ndarray]] = None) -> Dict[str, np.ndarray]:
        """準備模型輸入"""
        inputs = {
            'input_ids': input_ids,
            'attention_mask': attention_mask
        }
        
        # 加入 KV 快取(如果提供)
        if kv_cache:
            inputs.update(kv_cache)
            
        return inputs

    def _update_kv_cache(self, outputs, start_idx: int = 1) -> Dict[str, np.ndarray]:
        """從模型輸出更新 KV 快取"""
        kv_cache = {}
        for i in range(self.num_layers):
            k_idx = start_idx + 2*i
            v_idx = start_idx + 2*i + 1
            
            k_name = f'{self.key_prefix}{i}{self.key_suffix}'
            v_name = f'{self.key_prefix}{i}{self.value_suffix}'
            
            kv_cache[k_name] = outputs[k_idx]
            kv_cache[v_name] = outputs[v_idx]
            
        return kv_cache

    def generate(self, 
                prompt: str, 
                max_new_tokens: int = 100,
                temperature: float = 1.0,
                top_k: int = 50,
                top_p: float = 0.9,
                do_sample: bool = True) -> str:
        """生成文本"""
        start_time = time.time()
        
        # 編碼輸入文本
        encoded_input = self.tokenizer(prompt, return_tensors="np")
        input_ids = encoded_input['input_ids'].astype(np.int64)
        attention_mask = encoded_input['attention_mask'].astype(np.int64)
        
        # 初始化 KV 快取
        kv_cache = self._initialize_kv_cache()
        
        # 初始化輸入
        onnx_inputs = self._prepare_inputs(input_ids, attention_mask, kv_cache)
        
        # 保存原始提示的 token IDs
        prompt_ids = input_ids[0].tolist()
        generated_ids = []
        
        # 逐步生成文本
        for i in range(max_new_tokens):
            # 執行推理
            outputs = self.session.run(None, onnx_inputs)
            
            # 獲取 logits
            logits = outputs[0][:, -1, :]  # [batch, vocab_size]
            
            # 應用溫度
            if temperature > 0:
                logits = logits / temperature
            
            # 選擇下一個 token
            if do_sample:
                # Top-K 過濾
                if top_k > 0:
                    indices_to_remove = logits < np.partition(logits, -top_k, axis=-1)[..., -top_k:][..., :1]
                    logits[indices_to_remove] = -float('Inf')
                
                # Top-p (nucleus) 採樣
                if top_p < 1.0:
                    sorted_logits = np.sort(logits, axis=-1)[:, ::-1]
                    cumulative_probs = np.cumsum(np.exp(sorted_logits) / np.sum(np.exp(sorted_logits), axis=-1, keepdims=True), axis=-1)
                    
                    sorted_indices_to_remove = cumulative_probs > top_p
                    sorted_indices_to_remove[:, 1:] = sorted_indices_to_remove[:, :-1].copy()
                    sorted_indices_to_remove[:, 0] = False
                    
                    # 將索引轉換回原始順序
                    indices_to_remove = np.zeros_like(logits, dtype=bool)
                    for batch_idx in range(logits.shape[0]):
                        indices_to_remove[batch_idx, np.argsort(-logits[batch_idx])[sorted_indices_to_remove[batch_idx]]] = True
                    
                    logits[indices_to_remove] = -float('Inf')
                
                # 計算概率並採樣
                probs = np.exp(logits) / np.sum(np.exp(logits), axis=-1, keepdims=True)
                next_token = np.random.choice(probs.shape[-1], p=probs[0])
            else:
                # 貪婪解碼
                next_token = np.argmax(logits, axis=-1)[0]
            
            # 終止條件
            if next_token == self.tokenizer.eos_token_id:
                break
                
            # 更新生成的 token 列表
            generated_ids.append(int(next_token))
            
            # 更新輸入
            onnx_inputs['input_ids'] = np.array([[next_token]], dtype=np.int64)
            
            # 更新注意力遮罩
            new_attention_mask = np.ones((1, attention_mask.shape[1] + 1), dtype=np.int64)
            new_attention_mask[0, :attention_mask.shape[1]] = attention_mask[0]
            attention_mask = new_attention_mask
            onnx_inputs['attention_mask'] = attention_mask
            
            # 更新 KV 快取
            kv_cache = self._update_kv_cache(outputs)
            onnx_inputs.update(kv_cache)
        
        # 計算生成時間
        generation_time = time.time() - start_time
        tokens_per_second = len(generated_ids) / generation_time if generation_time > 0 else 0
        
        # 解碼並返回生成的文本
        result = self.tokenizer.decode(generated_ids, skip_special_tokens=True)
        
        print(f"生成了 {len(generated_ids)} 個 tokens,耗時 {generation_time:.2f} 秒 ({tokens_per_second:.2f} tokens/秒)")
        
        return result

# 使用範例
if __name__ == "__main__":
    # 初始化生成器
    generator = Phi2ONNXGenerator(
        model_path='./phi-2-onnx/model.onnx',
        tokenizer_path="microsoft/phi-2"
    )
    
    # 生成文本
    prompt = "find all prime numbers below 120"
    result = generator.generate(
        prompt=prompt,
        max_new_tokens=200,
        temperature=0.7,
        top_p=0.9,
        do_sample=True
    )
    
    print(f"\n提示:\n{prompt}")
    print(f"\n生成結果:\n{result}")

DeepSeek R1 給的 inference 程式會寫出大致正確但有錯誤的程式 – 邏輯正確, 但引用函數未定義. 我以為 PHI-2 的極限就是這樣了. 想不到 Claude inference 程式寫得好, 答案竟然也跟著好很多 (雖然還有錯)! 在同樣的 model 下也會有顯著的差異, 令我太意外了.

Claude V3.7 Inference 產生的答案:

import numpy as np

def find_primes(n):
    primes = np.arange(2, n)
    for i in range(2, int(np.sqrt(n))+1):
        primes = primes[primes%i!= 0]
    return primes

print(find_primes(120))

DeepSeek-R1 Inference 產生的答案:

import numpy as np

# Define the upper limit
upper_limit = 120

# Create an array of numbers from 2 to the upper limit
numbers = np.arange(2, upper_limit)

# Use the isprime function to find all prime numbers
primes = numbers[np.vectorize(isprime)(numbers)]

print(primes)

訓練模型需要多少計算量?

DeepSeek 問世後, 很多人討論它需要多少計算量? 我在此做個小整理.

基本上, 而一個 neural node 至少有 weighting 和 bias 要訓練. 而每層輸出共用一個 bias. 請 DeepSeek R1 想了 17 秒, 整理如下:

  • 對於 Dense layer, 參數 = Node 數 + 1.
  • 對於 convolution layer, 參數 = channel 數 * width * height + 1.
  • 對於 RNN layer, 參數 = (輸入維度 + hidden state 維度 h) * h + h.
  • 對於 LSTM, 參數=RNN * 4.
  • 對於 L 層 transformer, 參數 = L * (12h2 + 13h) * Vh ~= 12Lh2 , 其中 V 是 vocabulary size (詞表). [3]

如果是做研究, 可能要參考 [2] 的表 1.

不管是哪一種 weighting parameter, 只要可以被訓練, 我們就算它一個參數. 至於不能訓練的, 像是 ReLU, 這些都不會列入參數, 只會佔據 memory 和 CPU time.

接下來的問題是每個參數要被算幾次才完成 training. 如果去問 AI 的話, 有些回答真的很離譜, 像是 Gemini 算出來比別人大好幾個數量級; 有些 model 會說要除以 batch size. 實際上 batch size 只影響 gradient 的更新頻率. batch 愈多次應該要乘愈多次而不是除. 所以還是問人類比較可靠 [3], DeepSeek R1 也還行, 只是它想了 128 秒.

FLOP s≈ 6 × 參數數量 × 訓練總 token 總數.

例如 GPT-3 175B 參數, 訓練 300B token, 需要 3.15 * 1023 FLOPS. 和官方公布的 3.14* 1023 FLOPS 接近.

其中 6 來自 forward (矩陣, activation) 2 次計算和 backward (梯度, 參數更新) 的 4 次相加. 至於訓練總 token 數, 其實已經吸收了 batch size 參數. 這邊再找個證據.

根據 [2], 跳過各種英文或數學, 3.3 或 5.1. 裡面都引用了 C = 6NBS. 其中 C = training computing, B = batch size, S = number of parameter updates (訓練總 token 數), and N = non-embedding parameter count (參數數量).

什麼是 non-embedding parameter? 就是所有的 parameter 扣掉 token 和 position embedding . 包括:

  • Dense/Linear layers
  • Attention layers
  • Convolutional layers
  • Batch normalization layers
  • Any other trainable layers except embeddings

另外根據 Chinchilla 法則 [5], 訓練量應為參數的 20 倍. 所以 BS = 20N. 前面提到的 GPT-3 175B, 大約需要 3500 B token 的訓練才達到最佳效果. 它只訓練了 300B token, 理論上還有進步空間. 據說這是因為 GPT-3 發表於 2020 年, 而 Chinchilla 法則發表於 2022 年.

再回到風暴的起點, Deepseek 自稱是用 2.788K (pre-train 2664K) H800 GPU hours train 14.8 T tokens. 然而, 如果用上面的 6ND, 搭配 H800 Spec. 和論文中的數據 [7], MFU (Model FLOPS Utilization) 會超過 100%, 完全不合理.

MFU = 訓練需要的算力 / GPU 提供的算力 = (6 * 14.8T * 671B) / (2664 K Hours * 3600 second/Hours * 3026 TFLOPS) = 205%.

其中 MOE, MLA, MTP 可能有省到算力, 但就算把 MFU 壓到 100% 都解釋不通. 當然也可以說 DeepSeek 說謊 – 短報時數. 解決不了問題就解決人. 哈!

歸納起來, 此時不能用 6ND, 要參考 [8]. 用 “3 * 計算 Deepseek v3 的所有 forward 操作" 來代替 6ND. 如此就可以算出 MFU = 35~45% [6], 這樣起碼是落在合理範圍 (< 60%).

其實 [3] 和 [6] 都寫得很好, [6] 尤其完整. 不過我悶著頭寫了一大段才 reference 到他們, 只好改一改重點, 讓它們當配角了.

附帶一提. GPU 卡上的 RAM size, 決定了要用幾張卡才塞得下整個 model 的參數. Training 時要存 weighting, activation function, optimizer, gradient, 抓個 4 倍. Inference 時需要weighting, activation 和 bias. 抓 2.5 倍. 這是一般狀況, 非特別指DeepSeek.

[REF]

  1. GPT-3, The Model Simply Knows! – Analixa
  2. Scaling Laws for Neural Language Models
  3. AI大模型训练相关参数如何估算?有这一篇就够了
  4. https://zhuanlan.zhihu.com/p/606038646
  5. https://arxiv.org/pdf/2203.15556
  6. 【LLM 專欄】Deepseek v3 的訓練時間到底合不合理?淺談 LLM Training efficiency
  7. https://en.wikipedia.org/wiki/DeepSeek
  8. https://zhuanlan.zhihu.com/p/16445683081