在自動載入器中設定結構描述推斷和演進
您可以設定自動載入器來自動偵測已載入資料的結構描述,讓您不需要明確宣告資料結構描述並隨著新資料行導入而演進資料表結構描述,即可初始化資料表。 這樣就不需要在一段時間內手動追蹤和套用結構描述變更。
自動載入器也可以在 JSON Blob 資料行中「解救」非預期的數據(例如,數據類型不同),您稍後 可以使用半結構化的數據存取 API 來選擇存取。
架構推斷和演進支援下列格式:
檔案格式 | 支援的版本 |
---|---|
JSON |
所有版本 |
CSV |
所有版本 |
XML |
Databricks Runtime 14.3 LTS 和更新版本 |
Avro |
Databricks Runtime 10.4 LTS 和更新版本 |
Parquet |
Databricks Runtime 11.3 LTS 及更高版本 |
ORC |
不支援 |
Text |
不適用 (固定架構) |
Binaryfile |
不適用 (固定架構) |
架構推斷和演進的語法
為選項 cloudFiles.schemaLocation
指定目標目錄可啟用結構描述推斷和演進。 您可以選擇使用您為 checkpointLocation
指定的相同目錄。 如果您使用 DLT,Azure Databricks 會自動管理架構位置和其他檢查點資訊。
注意
如果您將多個源數據位置載入目標數據表,則每個自動載入器擷取工作負載都需要個別的串流檢查點。
下列範例會使用 parquet
來作為 cloudFiles.format
。 針對其他檔案來源使用 csv
、 avro
或 json
。 針對每個格式的預設行為,讀取和寫入的其他所有設定都保持不變。
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
自動載入器架構推斷如何運作?
若要在第一次讀取資料時推斷架構,自動載入器會從其探索到的前 50 GB 或 1000 個檔案中取樣,以率先達到的限制為準。 自動載入器會將架構資訊儲存在設定的目錄_schemas
cloudFiles.schemaLocation
中,以便隨時間追蹤輸入數據的架構變更。
注意
若要變更所使用的範例大小,您可以設定 SQL 組態:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(位元串,例如 10gb
)
及
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(整數)
依預設,Auto Loader 架構推斷會主動避免因類型不匹配而導致的架構演化問題。 對於未編碼數據類型的格式(JSON、CSV 和 XML),自動載入器會將所有數據行推斷為字串(包括 JSON 檔案中的巢狀字段)。 針對具有具型別架構的格式(Parquet 和 Avro),自動載入器會取樣檔案的子集,並合併個別檔案的架構。 下表摘要說明此行為:
檔案格式 | 默認推斷的數據類型 |
---|---|
JSON |
字串 |
CSV |
字串 |
XML |
字串 |
Avro |
在 Avro 架構中編碼的類型 |
Parquet |
以 Parquet 格式編碼的類型 |
Apache Spark DataFrameReader 會針對架構推斷使用不同的行為,並根據範例數據選取 JSON、CSV 和 XML 來源中數據行的數據類型。 若要使用自動載入器開啟此行為,請將 選項 cloudFiles.inferColumnTypes
設定為 true
。
注意
在推斷 CSV 資料的架構時,自動載入器會假設檔案包含標頭。 如果您的 CSV 檔案不包含標頭,請提供 選項 .option("header", "false")
。 此外,自動載入器會合併範例中所有檔案的架構,以產生全域架構。 然後,自動載入器可以根據其標頭讀取每個檔案,並正確地剖析 CSV。
注意
當數據行在兩個 Parquet 檔案中有不同的數據類型時,自動載入器會選擇最寬的類型。 您可以使用 schemaHints 來覆蓋此選擇。 當您指定架構提示時,自動載入器不會將數據行轉換成指定的類型,而是會指示 Parquet 讀取器將數據行讀取為指定的類型。 在不相符的情況下,資料會被救援在已救援資料欄。
自動載入器架構演進如何運作?
自動加載器在處理您的資料時會偵測到新增新資料行的情況。 當自動載入器偵測到新的數據行時,數據流會以 UnknownFieldException
停止。 在資料流出現此錯誤之前,自動載入器會先對最新的微批次資料執行架構推斷,並以最新的架構更新架構位置,將新欄位合併至架構結尾。 現有數據行的數據類型保持不變。
Databricks 建議使用 Databricks 作業設定自動載入器串流,以在這類架構變更之後自動重新啟動。
自動載入器支援下列架構演進模式,您可以在 選項 cloudFiles.schemaEvolutionMode
中設定:
[模式] | 讀取新數據欄的行為模式 |
---|---|
addNewColumns (預設值) |
串流失敗。 新的欄位會新增至結構。 現有的欄位不會更改數據類型。 |
rescue |
架構永遠不會演進,而且數據流不會因為架構變更而失敗。 所有新的數據行都會記錄在已獲救的數據行中。 |
failOnNewColumns |
串流失敗。 除非更新了提供的架構或移除違規的數據檔,數據流不會重新啟動。 |
none |
不會演進架構、忽略新的數據行,而且除非 rescuedDataColumn 設定選項,否則不會解救數據。 數據流不會因為架構變更而失敗。 |
注意
未提供架構時,addNewColumns
模式是預設值,但當您提供架構時,none
是預設值。 當提供流的架構時,不允許使用addNewColumns
,但如果您將您的架構作為架構提示提供,則可以運作。
分割區是如何在 Auto Loader 中運作的?
如果資料是按照 Hive 風格分區布置的,自動載入會嘗試從資料的底層目錄結構推斷分區欄位。 例如,檔案路徑base_path/event=click/date=2021-04-01/f0.json
會將date
和event
推斷為分區列。 如果基礎目錄結構包含衝突的 Hive 分區或不包含 Hive 樣式分區,則會忽略分區欄。
二進位檔 (binaryFile
) 和 text
檔案格式具有固定的數據架構,但支援數據分割數據行推斷。 Databricks 建議設定 cloudFiles.schemaLocation
這些類型的文件格式。 這可避免任何潛在的錯誤或資訊遺失,並且防止每次自動載入器開始時推斷分區欄位。
分區欄不會被考慮進行模式演進。 如果您的初始目錄結構像 base_path/event=click/date=2021-04-01/f0.json
,然後開始接收新的檔案作為 base_path/event=click/date=2021-04-01/hour=01/f1.json
,自動載入器會忽略小時列。 若要捕捉新分割欄的資訊,請將cloudFiles.partitionColumns
設定為event,date,hour
。
注意
選項 cloudFiles.partitionColumns
接受以逗號分隔的欄位名稱列表。 只有目錄結構中以配對形式存在的欄位會被剖析。
被恢復的數據欄是什麼?
當自動載入器推斷結構時,會自動將復原的資料欄新增至您的結構,顯示為 _rescued_data
。 您可以藉由設定 選項 rescuedDataColumn
來重新命名數據行,或在提供架構的情況下包含該數據行。
被救援的數據欄可確保與結構不相符的數據欄會被救援,而不是被丟棄。 救援數據列包含因下列原因而未剖析的任何資料:
- 架構中遺漏欄位。
- 類型不符。
- 大小寫不符。
已獲救的數據行包含 JSON,其中包含已獲救的數據行和記錄的來源檔案路徑。
注意
剖析記錄時,JSON 和 CSV 剖析器支援三種模式: PERMISSIVE
、 DROPMALFORMED
和 FAILFAST
。 與 rescuedDataColumn
搭配使用時,資料類型不符的狀況不會導致記錄在 DROPMALFORMED
模式中被捨棄,或在 FAILFAST
模式中擲回錯誤。 只會丟棄或拋出錯誤的損毀記錄,例如不完整或格式不正確的 JSON 或 CSV。 如果您在剖析 JSON 或 CSV 時使用 badRecordsPath
,則使用 rescuedDataColumn
時,數據類型不符不會被視為不正確的記錄。 只有不完整且格式不正確的 JSON 或 CSV 記錄會儲存在 中 badRecordsPath
。
變更區分大小寫的行為
除非啟用區分大小寫,否則數據行 abc
、 Abc
和 ABC
會被視為相同的數據行,以便進行架構推斷。 選擇的案例是任意的,取決於取樣的數據。 您可以使用 架構提示 來強制執行應該使用哪一個案例。 選擇項目並推斷架構之後,自動載入器不會考慮那些未選擇而與架構不一致的大小寫變體。
當啟用救援資料欄時,命名方式與架構不同的欄位會載入至_rescued_data
欄。 將選項 readerCaseSensitive
設定為 false 來變更此行為,在此情況下,自動載入器會以不區分大小寫的方式讀取數據。
使用架構提示覆寫架構推斷
您可以使用架構提示來強制執行所知道且預期於推斷架構上的架構資訊。 當您知道資料行屬於特定資料類型,或如果您想選擇更一般的資料類型時(例如 double
而不是 integer
),您可以使用 SQL 結構描述規範語法,以字串形式為資料行數據類型提供任意數量的提示,例如:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
若需查看支援的數據類型列表,請參閱數據類型的文件。
如果數據流開頭沒有數據行,您也可以使用架構提示將該數據行新增至推斷的架構。
以下是推斷架構的範例,可查看具有架構提示的行為。
推斷的架構:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
透過指定下列架構提示:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
您得到:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
注意
在 Databricks Runtime 9.1 LTS 及更高版本中提供陣列和映射架構提示支援。
以下是具有複雜數據類型的推斷架構範例,可查看具有架構提示的行為。
推斷的架構:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
藉由指定下列架構提示:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
您會獲得:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
注意
只有在未提供架構給自動載入器時,才會使用架構提示。 您可以使用架構提示,無論 cloudFiles.inferColumnTypes
是啟用還是停用。