在自動載入器中設定結構描述推斷和演進
您可以設定自動載入器來自動偵測已載入資料的結構描述,讓您不需要明確宣告資料結構描述並隨著新資料行導入而演進資料表結構描述,即可初始化資料表。 這樣就不需要在一段時間內手動追蹤和套用結構描述變更。
自動載入器也可以在 JSON Blob 資料行中「解救」非預期的數據(例如,數據類型不同),您稍後 可以使用半結構化的數據存取 API 來選擇存取。
架構推斷和演進支援下列格式:
檔案格式 | 支援的版本 |
---|---|
JSON |
所有版本 |
CSV |
所有版本 |
XML |
Databricks Runtime 14.3 LTS 和更新版本 |
Avro |
Databricks Runtime 10.4 LTS 和更新版本 |
Parquet |
Databricks Runtime 11.3 LTS 和更新版本 |
ORC |
不支援 |
Text |
不適用 (固定架構) |
Binaryfile |
不適用 (固定架構) |
架構推斷和演進的語法
為選項 cloudFiles.schemaLocation
指定目標目錄可啟用結構描述推斷和演進。 您可以選擇使用您為 checkpointLocation
指定的相同目錄。 如果您使用 Delta Live Tables,Azure Databricks 會自動管理架構位置和其他檢查點資訊。
注意
如果您將多個源數據位置載入目標數據表,則每個自動載入器擷取工作負載都需要個別的串流檢查點。
下列範例會針對 cloudFiles.format
使用 parquet
。 針對其他檔案來源使用 csv
、 avro
或 json
。 針對每個格式的預設行為,讀取和寫入的其他所有設定都保持不變。
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
自動載入器架構推斷如何運作?
若要在第一次讀取數據時推斷架構,自動載入器會先取樣它探索到的前 50 GB 或 1000 個檔案,但先交叉限制。 自動載入器會將架構資訊儲存在設定為追蹤一段時間輸入數據之架構變更的目錄中_schemas
cloudFiles.schemaLocation
。
注意
若要變更所使用的範例大小,您可以設定 SQL 組態:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(位元串,例如 10gb
)
及
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(整數)
根據預設,自動載入器架構推斷會因為類型不符而尋求避免架構演進問題。 對於未編碼數據類型的格式(JSON、CSV 和 XML),自動載入器會將所有數據行推斷為字串(包括 JSON 檔案中的巢狀字段)。 針對具有具型別架構的格式(Parquet 和 Avro),自動載入器會取樣檔案的子集,並合併個別檔案的架構。 下表摘要說明此行為:
檔案格式 | 默認推斷的數據類型 |
---|---|
JSON |
String |
CSV |
String |
XML |
String |
Avro |
在 Avro 架構中編碼的類型 |
Parquet |
以 Parquet 架構編碼的類型 |
Apache Spark DataFrameReader 會針對架構推斷使用不同的行為,並根據範例數據選取 JSON、CSV 和 XML 來源中數據行的數據類型。 若要使用自動載入器開啟此行為,請將 選項 cloudFiles.inferColumnTypes
設定為 true
。
注意
在推斷 CSV 資料的架構時,自動載入器會假設檔案包含標頭。 如果您的 CSV 檔案不包含標頭,請提供 選項 .option("header", "false")
。 此外,自動載入器會合併範例中所有檔案的架構,以產生全域架構。 然後,自動載入器可以根據其標頭讀取每個檔案,並正確地剖析 CSV。
注意
當數據行在兩個 Parquet 檔案中有不同的數據類型時,自動載入器會選擇最寬的類型。 您可以使用 schemaHints 來覆寫此選項。 當您指定架構提示時,自動載入器不會將數據行轉換成指定的類型,而是會指示 Parquet 讀取器將數據行讀取為指定的類型。 在不相符的情況下,數據行會在已獲救的數據行中獲救。
自動載入器架構演進如何運作?
自動載入器會在處理您的資料時偵測新資料行的新增情況。 當自動載入器偵測到新的數據行時,數據流會以 UnknownFieldException
停止。 在數據流擲回此錯誤之前,自動載入器會先對最新的微批次數據執行架構推斷,並藉由將新數據行合併至架構結尾,以最新的架構來更新架構位置。 現有數據行的數據類型保持不變。
Databricks 建議使用 Databricks 作業設定自動載入器串流,以在這類架構變更之後自動重新啟動。
自動載入器支援下列架構演進模式,您可以在 選項 cloudFiles.schemaEvolutionMode
中設定:
[模式] | 讀取新數據行的行為 |
---|---|
addNewColumns (預設值) |
數據流失敗。 新的數據行會新增至架構。 現有的數據行不會演進數據類型。 |
rescue |
架構永遠不會演進,而且數據流不會因為架構變更而失敗。 所有新的數據行都會記錄在已獲救的數據行中。 |
failOnNewColumns |
數據流失敗。 除非已更新提供的架構,否則數據流不會重新啟動,否則會移除違規的數據檔。 |
none |
不會演進架構、忽略新的數據行,而且除非 rescuedDataColumn 設定選項,否則不會解救數據。 數據流不會因為架構變更而失敗。 |
分割區如何與自動載入器搭配使用?
如果數據配置在Hive樣式數據分割中,自動載入器會嘗試從資料的基礎目錄結構推斷資料分割數據行。 例如,檔案路徑base_path/event=click/date=2021-04-01/f0.json
會產生 和 event
的date
推斷作為數據分割數據行。 如果基礎目錄結構包含衝突的Hive分割區或不包含Hive樣式數據分割,則會忽略資料分割數據行。
二進位檔 (binaryFile
) 和 text
檔案格式具有固定的數據架構,但支援數據分割數據行推斷。 Databricks 建議設定 cloudFiles.schemaLocation
這些文件格式。 這可避免任何潛在的錯誤或資訊遺失,並防止每次自動載入器開始時推斷數據分割數據行。
分割區數據行不會被視為架構演進。 如果您的初始目錄結構類似 base_path/event=click/date=2021-04-01/f0.json
,然後開始接收新的檔案做為 base_path/event=click/date=2021-04-01/hour=01/f1.json
,自動載入器會忽略小時數據行。 若要擷取新資料分割資料行的資訊,請將 設定 cloudFiles.partitionColumns
為 event,date,hour
。
注意
選項 cloudFiles.partitionColumns
會採用以逗號分隔的數據行名稱清單。 只會剖析目錄結構中以配對的形式 key=value
存在的數據行。
獲救的數據行是什麼?
當自動載入器推斷架構時,已獲救的資料行會自動新增至您的架構作為 _rescued_data
。 您可以藉由設定 選項 rescuedDataColumn
來重新命名數據行,或在提供架構的情況下包含該數據行。
已獲救的數據行可確保與架構不相符的數據行會獲救,而不是被卸除。 已獲救的數據列包含因下列原因而未剖析的任何數據:
- 架構中遺漏數據行。
- 類型不符。
- 大小寫不符。
已獲救的數據行包含 JSON,其中包含已獲救的數據行和記錄的來源檔案路徑。
注意
剖析記錄時,JSON 和 CSV 剖析器支援三種模式: PERMISSIVE
、 DROPMALFORMED
和 FAILFAST
。 搭配使用 rescuedDataColumn
時,資料類型不符會導致記錄在 DROPMALFORMED
模式中捨棄或以 FAILFAST
模式擲回錯誤。 只會卸除或擲回損毀的記錄,例如不完整或格式不正確的 JSON 或 CSV。 如果您在剖析 JSON 或 CSV 時使用 badRecordsPath
,則使用 rescuedDataColumn
時,數據類型不符不會被視為不正確的記錄。 只有不完整且格式不正確的 JSON 或 CSV 記錄會儲存在 中 badRecordsPath
。
變更區分大小寫的行為
除非啟用區分大小寫,否則數據行 abc
、 Abc
和 ABC
會被視為相同的數據行,以便進行架構推斷。 選擇的案例是任意的,取決於取樣的數據。 您可以使用 架構提示 來強制執行應該使用哪一個案例。 選取專案並推斷架構之後,自動載入器不會考慮未選取與架構一致的大小寫變體。
啟用獲救的數據行時,在架構以外的案例中命名的欄位會載入數據_rescued_data
行。 將選項 readerCaseSensitive
設定為 false 來變更此行為,在此情況下,自動載入器會以不區分大小寫的方式讀取數據。
使用架構提示覆寫架構推斷
您可以使用架構提示來強制執行所知道且預期於推斷架構上的架構資訊。 當您知道資料列是特定資料類型,或如果您想要選擇更一般資料類型時(例如 double
,而不是 ), integer
您可以使用 SQL 架構規格語法,為資料行數據類型提供任意數目的提示,例如:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
如需支援的數據類型清單,請參閱數據類型的檔。
如果數據流開頭沒有數據行,您也可以使用架構提示將該數據行新增至推斷的架構。
以下是推斷架構的範例,可查看具有架構提示的行為。
推斷的架構:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
藉由指定下列架構提示:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
您得到:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
注意
Databricks Runtime 9.1 LTS 和更新版本提供數位和對應架構提示支援。
以下是具有複雜數據類型的推斷架構範例,可查看具有架構提示的行為。
推斷的架構:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
藉由指定下列架構提示:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
您得到:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
注意
只有在未提供架構給自動載入器時,才會使用架構提示。 您可以使用架構提示,無論是 cloudFiles.inferColumnTypes
啟用還是停用。