串流 tables 的運作方式
串流 table 是一般 Delta table,可額外支援串流或累加數據處理。
串流 tables 是資料引入的好選擇,原因如下:
- 每個輸入數據列只會處理一次,這會建立絕大多數擷取工作負載的模型(也就是說,藉由將數據列附加或向上插入至 table)。
- 它們可以處理僅限附加數據的大型 volumes。
串流 tables 也是低延遲串流轉換的絕佳選擇,原因如下:
- 數據列和時間範圍的原因
- 處理大量 volumes 數據
- 低延遲
下圖說明串流 tables 的運作方式。
顯示串流運作方式的圖表
串流 tables 是由單一 Delta Live Tables 管道定義並更新。 當您建立 Delta Live Tables 管線時,您可以在管線的原始碼中明確定義串流 tables。 標記為 tables 的這些項目接著由該流水線定義,並且不能被任何其他流水線更改或更新。 當您在 Databricks SQL 中建立串流 table 時,Databricks 會建立 Delta Live Tables 管線,以用來 update 這個 table。
用於匯入的流式傳輸 tables
串流 tables 專為僅附加數據源而設計,且只會處理輸入一次。
完整 refresh 會讓串流 tables 重複處理已處理過的數據。 完整的 refresh 動作會讓串流 table 重新處理所有輸入,包括先前已處理過的輸入。
下列範例示範如何使用串流 table 從雲端記憶體擷取新的檔案。 當您在數據集定義中使用一或多個 spark.readStream
調用時,它會導致 Delta Live Tables 將數據集視為串流 table,而不是具體化檢視。
import dlt
@dlt.table
def raw_customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/path/to/files")
)
下圖說明只能追加串流 tables 的運作原理。
顯示僅可附加的串流運作方式的圖表
串流 tables 和低延遲串流
串流 tables 是專為在有限狀態下進行低延遲串流而設計的。 串流 tables 使用 RocksDB 進行檢查點管理,使其非常適合低延遲串流。 不過,預期資料流要麼是自然界定,要麼是以 watermark界定。
自然系結數據流是由具有妥善定義開始和結束的串流數據源所產生。 自然系結數據流的範例是從檔案目錄讀取數據,where 放置初始檔案批次之後,不會新增任何新檔案。 數據流會被視為限定,因為檔案數目有限,然後,數據流會在處理所有檔案之後結束。
您也可以使用 watermark 系結數據流。 Spark 結構化串流中的 watermark 是一種機制,可藉由指定系統應該等待延遲事件的時間長度,再將時間 window 視為完成,來協助處理延遲數據。 沒有 watermark 的未系結數據流可能會導致 Delta Live Tables 管線因為記憶體壓力而失敗。
串流快照集聯結
串流快照集聯結是數據流與數據流啟動時快照集的維度之間的聯結。 在數據流啟動之後,如果維度變更,這些聯結不會重新計算,因為維度 table 被視為一個時間點的快照,除非重新載入或 refresh 維度 table,否則不會反映數據流啟動後維度 table 的變更。 如果您可以在 join中接受小差異,則這是合理的行為。 例如,當交易數目比客戶數目多很多個數量級時,大約 join 是可接受的。
在以下程式碼範例中,我們對維度 join 進行了處理,涉及 table客戶維度,其中有兩列來自不斷增長的數據集,交易資料。 我們將這兩個數據集之間的 join 具體化為稱為 sales_report
的 table。 請注意,如果外部過程藉由新增數據列來更新客戶 table(customer_id=3, name=Zoya
),則此新增數據列將不會出現在 join 中,因為在啟動數據流時,靜態維度 table 已被拍攝快照。
import dlt
@dlt.view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
return spark.readStream.table("transactions")
# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dlt.view
def v_customers():
return spark.read.table("customers")
@dlt.table
def sales_report():
facts = spark.readStream.table("v_transactions")
dims = spark.read.table("v_customers")
return (
facts.join(dims, on="customer_id", how="inner"
)
串流 table 限制
串流 tables 有下列限制:
- 有限的演進: 您可以變更查詢,而不需重新計算整個數據集。 由於串流 table 只會看到一次數據列,因此您可以在不同的數據列上執行不同的查詢。 這表示您必須注意數據集上執行的所有舊版查詢。 需要完整 refresh,才能讓串流 table 重新看到已見過的數據。
- 狀態管理: 串流 tables 具有低延遲,因此您必須確保其運行的數據流自然有限界或配備 watermark。
- 聯結不會重新計算: 不同於具體化 views 其結果一律正確,因為它們會自動重新計算,串流中的聯結 tables 在維度變更時不會重新計算。 此特性適用於「快速但錯誤的」案例。