共用方式為


常見的資料載入模式

自動載入器可簡化許多常見的數據擷取工作。 此快速參考提供數種熱門模式的範例。

使用 Glob 模式篩選目錄或檔案

Glob 模式可用於在路徑中提供時篩選目錄和檔案。

圖案 描述
? 比對任何單一字元
* 比對零或多個字元
[abc] 比對字元集 {a,b,c} 的單一字元。
[a-z] 比對字元範圍 {a...z} 內的單一字元。
[^a] 比對不在字元集或範圍 {a} 中的單一字元。 請注意, ^ 字元必須緊接在左括弧右邊。
{ab,cd} 比對字串集 {ab, cd} 中的字串。
{ab,c{de, fh}} 比對字串集 {ab, cde, cfh} 中的字串。

使用path來提供前綴模式,例如:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

重要

您必須使用 選項 pathGlobFilter 來明確提供後綴模式。 path只會提供前置詞篩選條件。

例如,如果您想僅剖析目錄中特定後綴的png檔案,而該目錄包含不同後綴的檔案,您可以這樣做:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

注意

自動載入器的預設檔案匹配行為與其他"Spark"檔案來源的預設行為不同。 將 新增 .option("cloudFiles.useStrictGlobber", "true") 至您的讀取,以使用符合預設 Spark 行為與檔案來源的 Globbing。 如需有關 Globbing 的詳細資訊,請參閱下表:

圖案 檔案路徑 預設 globber 嚴格模式匹配
/a/b /a/b/c/file.txt
/a/b /a/b_dir/c/file.txt
/a/b /a/b.txt
/a/b/ /a/b.txt
/a/*/c/ /a/b/c/file.txt
/a/*/c/ /a/b/c/d/file.txt
/a/*/c/ /a/b/x/y/c/file.txt
/a/*/c /a/b/c_file.txt
/a/*/c/ /a/b/c_file.txt
/a/*/c/ /a/*/cookie/file.txt
/a/b* /a/b.txt
/a/b* /a/b/file.txt
/a/{0.txt,1.txt} /a/0.txt
/a/*/{0.txt,1.txt} /a/0.txt
/a/b/[cde-h]/i/ /a/b/c/i/file.txt

啟用簡單的 ETL

將數據放入 Delta Lake 而不遺失任何數據的簡單方式,就是使用下列模式,並使用自動載入器啟用架構推斷。 Databricks 建議在 Azure Databricks 作業中執行下列程式代碼,以在源數據的架構變更時自動重新啟動串流。 根據預設,架構會被推斷為字串類型,任何剖析錯誤(如果所有項目都維持為字串)都會移至 _rescued_data,而任何新的欄位都會使數據流失敗並更新架構。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala 程式語言

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

防止結構良好數據的遺失

當您知道架構,但想要知道何時收到非預期的數據時,Databricks 建議使用 rescuedDataColumn

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

如果您想要讓串流在引進不符合架構的新欄位時停止處理,您可以新增:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

啟用彈性的半結構化數據管線

當您從新增數據列的廠商接收數據時,您可能無法確切知道他們何時進行此操作,或者您可能沒有足夠的資源來更新您的數據管道。 您現在可以利用架構演進來重新啟動數據流,並讓自動載入器自動更新推斷的架構。 您也可以利用 schemaHints 廠商可能提供的一些「無架構」欄位。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

轉換巢狀 JSON 數據

因為自動載入器將最上層的 JSON 欄位推斷為字串,您可能會遺留需要進一步轉換的巢狀 JSON 物件。 您可以使用 半結構化數據存取 API 來進一步轉換複雜的 JSON 內容。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

推斷巢狀 JSON 數據

當您有巢狀數據時,可以使用 cloudFiles.inferColumnTypes 選項來推斷數據和其他數據行類型的巢狀結構。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

載入不含標頭的 CSV 檔案

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

在具有標頭的 CSV 檔案上強制執行架構

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

將影像或二進位數據內嵌至 Delta Lake for ML

將數據儲存在 Delta Lake 中之後,您就可以對數據執行分散式推斷。 請參閱 使用 pandas UDF 執行分散式推斷。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

DLT 的自動載入器語法

DLT 為自動載入器提供稍微修改的 Python 語法,為自動載入器新增 SQL 支援。

下列範例使用自動載入器從 CSV 和 JSON 檔案建立資料集:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv"
)

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders/",
  format => "json")

您可以使用自動載入器支援的格式選項。 read_files 的選項是鍵值對。 如需支援的格式和選項的詳細資訊,請參閱 選項

例如:

CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
  FROM STREAM read_files(
    "/Volumes/my_volume/path/to/files/*",
    option-key => option-value,
    ...
  )

下列範例會從具有標頭的索引標籤分隔 CSV 檔案讀取資料:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv",
  delimiter => "\t",
  header => "true"
)

您可以使用 schema 手動指定格式;您必須指定 schema 不支援 架構推斷的格式:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
  format => "parquet",
  schema => "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING"
)

注意

DLT 會在使用自動載入器讀取檔案時,自動設定和管理架構和檢查點目錄。 不過,如果您手動設定其中一個目錄,則執行完整重新整理不會影響已設定目錄的內容。 Databricks 建議使用自動設定的目錄,以避免在處理期間發生非預期的副作用。