Google TFX 名詞小註解

聽說 Google 喜歡用自己的 tool, 跟外面的世界都不一樣, 不過這方面我們就不深究了. 本篇專門看 Tensorflow Extended (TFX) 這個 platform 包含那些東西. 就算是我們用不到這套 tool, 也能夠從它的架構複雜度, 理解到為何 Machine Learning 的 code size 只佔整套 AI 系統 5% 的原因.

本圖取材自 https://www.tensorflow.org/tfx?hl=zh-tw

其中幾個重點用我的話翻譯如下:

ExampleGenerator – 攝取 (ingest) 資料, 區分成 training set 和 evaluation sets.

StatisticsGen – 產生數據集的統計資料. (最大值, 均值, 缺值….etc.)

SchemaGen – 對於數據集 (dataset) 產生 schema , 各種數據的 type (e.g. floating).

ExampleValidator – 在數據集抓出異常資料.

Transform – feature engineering. 抓出 feature, 進行轉換.

Tuner – 找出最佳 hyper parameter 參數給 Trainer 用

Trainer – 實際訓練 model.

Evaluator – 評估 model 是否比 baseline 好

InfraValidator – 將 ExampleGen 輸出的合格 data 餵給 Trainer, 測試 model 是否能正確運行?

Pusher – 將 InfraValidator 驗證過的 model push 到 deployment 環境.

BulkInferrer – 終於可以做大量的 inference 了.

除了測試數據有沒有問題? model 有沒有問題? 後續還要有追蹤機制, 看看 model 是否不準了? 不準的原因看是 data drift 還是 data skew. 然後線上做更正處理. 總不能老是暫停服務吧! CI/CD 這部分就屬於 MLOP (Machine Learning Operation) Level 2 的範圍. 夭壽的是還有 level 3.

Data drift 包括 feature drift 和 concept drift. 前者像是叫外送的變多了, 所以交通流量和 training model 時已經不同. 後者像是病毒特徵被抓到後, 新的病毒把特徵改了, 所以舊 model 偵測不到. 輸出造成輸入的改變, 所以叫做 concept drift.

Data Skew 是說每個地區的使用習慣不同, 所以某特定區域就特別不適合先前訓練出來的 model. 像是台灣學生有午睡時間, 此時大家都沒有活動, 對世界其他地方就是一種異常. 這種狀況也需要線上監控把問題抓出來, 最後可能是加個 feature (location) 重 train.

一般人都不會想到還需要動態檢測 model 合不合用? 想想這還真是一個巨大的成本.

分散式機器學習小註解

先前學過的 Tensorflow model training, 久而久之也還給老師了. 今天趁颱風假學習分散式的機器學習, 順便把基本功也補一些回來!

需要分散式學習的原因就是 AI (ML) model 太大了, 因此有各式各樣的方法把工作分散給不同的 CPU, GPU, TPU (後續用 NPU 涵蓋之). 分散的方法包括把 training data 分散給大量 NPU, 或是把 model 的不同層拆給不同的 NPU 做. 後者比較沒有參數及時互通的問題就略過不談.

Data 分散出去給不同的 NPU 學習, 最明顯的問題就是大家拿到的 data 不同, 算出來的 gradient 理論上也不同, 那要麼收斂到同一版呢? 解決這個問題的架構 (architecture) 有兩個大方向, 分別為 synchronous 和 Asynchronous 兩種.

Synchronous 架構下, 每個 NPU 都要等其它 NPU 做完一個段落. 然後大家同步一下參數. 同步的方式可能只是將大家得到的數取個平均. 假設原本一個 update grdient 的段落是一個 batch. 現在有 N 個NPU 均分 data, 所以每個 NPU 只要做一個 mini-batch.

mini-batch = batch * strategy.num_replicas_in_sync (e.g. 1/N)

在一台機器有多個 NPU 的時候, 我們稱這個策略為 mirrorstrategy. 因為每個 NPU 做的事情都一樣, 像是照鏡子. 假如我有多台機器, 每一台機器 (worker) 都執行 mirrorstrategy, 此時稱為 multi-worker mirror strategy. 實際上的精神都是一樣的. 就是三個動作:

  1. 初始化:所有 worker 從相同的初始模型參數開始。
  2. 同步:取一個段落, 每個 worker 把自己的 grafient 傳出去. 每個 worker 看到其他所有 worker 的資料後, 做個計算 (像是取平均). 因為只要有一個 worker 沒更新, 大家都缺一筆資料, 所以不需要中控也可以自動進行.
  3. 參數更新:每個 worker 使用同步的梯度, 以 optimizer 更新其模型參數, 確保所有工作者在每一步都有相同的模型參數。

至於 asynchronous architecture 就沒有互等的機制, 參數 (weight, bias) 會放在一個以上的 parameter server (PS). 大家都去跟它要參數就對了. 等到 worker 算出自己的 gradient, 就把它傳給 PS, PS 負責用大家的 gradient 更新出新的參數給下一個 worker 抓取. 講義原文如下:

Each worker independently fetches the latest parameters from the parameter servers and computes gradients based on a subset of training samples. It then sends the gradients back to the parameter server, which then updates its copy of the parameters with those gradients.

由 Asynchronous 的架構會有大量的資料要傳遞 (weight, bias), 所以適合用在參數大多是 0 的 sparse model. 而參數大多不是 0 的 dense model (如 BERT, GPT) 就更適合 synchronous architecture. 言下之意是傳的時候多少會做壓縮吧!

MirroredStrategy 的 sample code 如下, 藍色字是和普通 training 不一樣的地方. 特別留意是 model 這行退縮到 with strategy.scope(): 的下一層.

import tensorflow as tf
import tensorflow_datasets as tfds

# Load the dataset

datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets['train'], datasets['test']

# Define the distribution strategy
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

# Set up the input pipeline
BUFFER_SIZE = 10000
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
test_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

# Build and compile the model within the strategy scope
with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10)
    ])

    model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  optimizer=tf.keras.optimizers.Adam(),
                  metrics=['accuracy'])

# Train the model
model.fit(train_dataset, epochs=10, validation_data=test_dataset)

Model.pb 小註解

這個檔案是用來放 model 的 protocol buffer, 所以叫做 pb. 當我們對 input raw data 做了一些前處理, 然後才去 train model. 那使用這個 model 的天命任意人, 怎麼知道要用那些 pre-processing 的手法來處理他的真實資料呢? 沒錯! 就是去看這個 model.bp.

Model.bp 不只是記錄 metadata 這麼簡單, 它只是三個主要功能之一. 這三個功能分別是:

  1. Model Graph: 把 model 的 graph 記下來. 這個 graph 不是個圖檔, 是資料結構裡面的 graph, 它定義了 model 的資料流向.
  2. Metadata: 主要是 input 和 output 的 tensor information. 不論是做前處理或是後處理都會參考到.
  3. Portability: Model 作者最初可能是用 Tensorflow, Tensorflow Lite, 或者Tensorflow.JS 開發, 使用者 deploy 的 platform 不同於作者時, 也可以參考這個檔案獲得相容性資訊.

講到前處理, 另一個 keyword 是 tf.Transform. 這個 library 專門做 Tensorflow 的前處理用. Graph 也是它產生的. tf.Transform 整合了 Apache Beam, (Google) cloud Dataflow, 和 TensorFlow 兩種不同的處理方式 [1][2].

請留意 tf.Transform 是在 pre-process 階段做 feature engineering, 而 Apache Beam 和 Cloud Dataflow 是在 feature creation 階段. 所以後者提供 API 給 tf.Transform 使用. 第三個可以執行 feture engineering 的階段是 train model. 在 [1] 裡面寫了 1,2,3 三個圈圈, 順序和我提到的相反, 但是那沒關係, 只是做個區隔.

附帶一提 Beam 可以用更多的 programming language, 像是 Java, Python, 和 SQL. Tensorflow 基本上多了 C++, Javascript, 少了 SQL. 總之學 Python 就對了.

[REF]

  1. https://ithelp.ithome.com.tw/articles/10227075
  2. https://www.tensorflow.org/tfx/transform/get_started
  3. https://towardsdatascience.com/hands-on-apache-beam-building-data-pipelines-in-python-6548898b66a5

Feature Engineering 備忘小筆記

對 AI 有興趣的我, 其實比較喜歡研究 model 的原理, 對於資料的處理沒興趣. 不過偶爾要用到某個語法, 卻又記不起來的話, 還是會有點傷腦筋! 所以筆記一下加深印象. 以後也好查詢.

  1. dataframe 轉 dataset
def df_to_dataset(dataframe):
    dataframe = dataframe.copy()
    
    labels = dataframe.pop('your target')
    ds = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
                            
    return ds

2. Categorical 轉 Numeral (one-hot)

from tensorflow import feature_column as fc

# invest_df is predefined dataframe

A = ['stock','bond','ETF']
B = []

for C in A:
    D = invest_df[C].unique()
    E = fc.categorical_column_with_vocabulary_list(C, D)
    F = fc.indicator_column(E)
    B.append(F)

3. Bucketized 轉 numerical

G = fc.numeric_column("net_asset")
​
# Bucketized cols
H = fc.bucketized_column(G, boundaries=[10, 20, 30, 40, 50, 60, 80, 100]) # in million USD
B.append(H)

4. Feature Cross (Bucketized + Categorical)

I = invest_df['FATFIRE_proximity'].unique()
J = fc.categorical_column_with_vocabulary_list('FATFIRE_proximity',I)
​
crossed_feature = fc.crossed_column([H, I],hash_bucket_size=1000)
crossed_feature = fc.indicator_column(crossed_feature)
B.append(crossed_feature)

5. 實際運用

# input_dim = 上述的 feature 個數, 此時 = 8
# 假設下一層是 12 nodes.
# 8 x 12 是 fully connected.

feature_layer = tf.keras.layers.DenseFeatures(B, dtype='float64')
​
model = tf.keras.Sequential([
  feature_layer,
  layers.Dense(12, input_dim=8, activation='relu'),
  layers.Dense(8, activation='relu'),
  layers.Dense(1, activation='linear',  name='your target')
])

6. 產生新的 Bucketetized feature 做成 Feature Cross


    M = np.linspace(0, 1, nbuckets).tolist()
    N = np.linspace(0, 1, nbuckets).tolist()

    OP = fc.bucketized_column(B['OPEN_PRICE'], M)
    CP = fc.bucketized_column(B['CLOSE_PRICE'], M)
    OV = fc.bucketized_column(B['OPEN_VOL'], N)
    CV = fc.bucketized_column(B['CLOSE_VOL'], N)

    OO = fc.crossed_column([OP, OV], nbuckets * nbuckets)
    CC = fc.crossed_column([CP, CV], nbuckets * nbuckets)

    new_bucket_cross_feature = fc.crossed_column([OO, CC], nbuckets ** 4)

7. 畫出酷炫流程圖並存檔

tf.keras.utils.plot_model(model, 'model.png', show_shapes=False, rankdir='LR') # or 'TB'

用 Multi-LLM 解釋投資風險

Coursera 有一門新的課 [1], 由該公司老闆 Andrew 介紹 CrewAI 來講課. 主要是講多個 LLM 怎麼應用. 課程不長, 有 Lab, 沒證書. 看在老闆推薦的份上, 我也來蹭一下.

用最簡單的話來講, 它的技術就是叫每個 Agent 執行一個 task. 雖然大家平平都是 LLM, 但是指定了不同的角色, 每個 agent 就會各自專注在它的 task 上, 達到互相幫忙的結果. 當然每個 agent 的排列方式 (hierachy) 會影響他們共事的結果.

可不可 search 網路? 需不需要 human input, 可不可以非同步? 這些在 CrewAI 這家公司的 library 中都可以設定. 每個 agent 透過 memory 互相溝通, 因此即使不指定誰 (agent) 要傳訊息給誰 (other agents), 資料也可以共用.

有個 Lab 很好玩, 就是建立一個 crew 去分析買股票的風險. 它的架構是 Crew 叫 agent 做 task. Task 就只是明訂工作內容 (description) 和預期成果 (expect output), 然後註明給哪個 agent. Agent 要指定 role, goal, backstroy (工作指導), 標記可以用那些 tools? 標記可不可以餵資料給別人 (delegation), log 要多詳細 (verbose).

from crewai import Crew, Process
from langchain_openai import ChatOpenAI

# Define the crew with agents and tasks
financial_trading_crew = Crew(
    agents=[data_analyst_agent, 
            trading_strategy_agent, 
            execution_agent, 
            risk_management_agent],
    
    tasks=[data_analysis_task, 
           strategy_development_task, 
           execution_planning_task, 
           risk_assessment_task],
    
    manager_llm=ChatOpenAI(model="gpt-3.5-turbo", 
                           temperature=0.7),
    process=Process.hierarchical,
    verbose=True
)

Crew kickoff 之後, agent 就會去做事. 至於要做什麼? 寫在 input string 裡, 相當於一個 prompt. 舉例指定用 1000 元去買 Nvidia, 風險承受度中等, 應該如何操作? 在課程的例子中, 因為指定 process 是 hierachy. 所以叫第一個 agent 去做 data analysis, 它有 search 網路的 tool, 因此就會各個網站 search Nvidia 的新聞. 總結出 10 條. 交給下一棒 Trade agent.

Trade agent 的工作是要分析標的物的統計值, 它也有網路工具. 所以它也去找了一堆網站. 總結出 Nvidia 的評價.

Based on the information gathered from various analyst forecasts and recommendations, the average 12-month price target for NVDA is $130.68, with the highest target being $200.00 and the lowest at $90.00. The consensus rating for NVDA is “Strong Buy," supported by 38 buy ratings and 3 hold ratings. The stock has a current price of $135.58. The analysis suggests that there is a potential -3.61% downside from the current price based on the average price target. The historical performance of NVDA shows consistent outperformance relative to the industry.

接一下到了 execution agent. 它有甚麼大膽的創見嗎? 沒有. 即使它收到這麼明顯地看多訊息: Considering the historical performance and analyst forecasts, developing a trading strategy that aligns with the bullish sentiment towards NVDA could be a profitable approach, especially for day trading preferences.

它還是說我要上網查看看, 然後歸納出 5 點結論:

Execution Plan for NVDA:
1. Utilize historical performance data to identify key trends and patterns in NVDA’s stock price movements.
2. Implement a strategy that leverages the ‘Strong Buy’ recommendation and average 12-month price target of $130.68.
3. Monitor market trends and movements closely to capitalize on potential trading opportunities presented by NVDA’s growth potential.
4. Develop a risk management strategy that aligns with the user-defined risk tolerance (Medium) and trading preferences (Day Trading).
5. Regularly review and adjust the execution plan based on new market data and insights to optimize trading outcomes for NVDA.

接著回到 Crew. 它根據風險承受度為 Medium 這個條件, 再上網去跑一輪. 對每個網站的內容做一個小結論. 最後叫 risk management agent 彙總, 結果就是給安全牌 (因為風險承受度不高).

Overall, the risk analysis for NVDA’s trading strategies should focus on understanding the potential risks associated with each strategy, assessing the firm’s risk tolerance, and implementing appropriate safeguards to manage and mitigate risks effectively.

我認為畢竟 Crew 收到的指令就是風險承受度中等而已. 已經預設立場, 不用問 AI 也知道結果. 當我把風險承受度改為 Ultra High 重跑一次. 這次它的結論就變狠了! 建議了一些選擇權策略: Straddle Strategy、Iron Condor Strategy 、Long Call Butterfly Spread Strategy、LEAPS Contracts Strategy 等等.

這告訴我們兩件事.:

  1. CrewAI 使用 multi LLM 的功效很強大. 大家做完自己的事就交給同事 (co-worker), 各司其職. 可以用同一個 LLM 做出一群同事開會的效果!
  2. 你跟 AI 講我風險承受度低, AI 就叫你保守. 你說你不怕死, AI 就叫你玩選擇權. 這些不用問 AI, 應該是問施主你自己就好了.

[REF]

  1. https://www.coursera.org/learn/multi-ai-agent-systems-with-crewai/home/welcome