共用方式為


使用 Delta Live Tables 載入數據

您可以使用 Delta Live Tables,從 Azure Databricks 上的 Apache Spark 所支援的任何數據源載入數據。 您可以在 Delta Live Tables 中針對傳回 Spark DataFrame 的任何查詢定義數據集(tables 和 views),包括 Spark DataFrame 的串流數據框架和 Pandas。 針對數據引入工作,Databricks 建議對大部分的使用案例使用串流 tables。 串流 tables 適用於使用自動載入器從雲端物件儲存區擷取數據,或從 Kafka 之類的訊息總線擷取數據。 下列範例示範一些常見的模式。

重要

並非所有數據源都有 SQL 支援。 您可以將 SQL 和 Python 筆記本混搭成 Delta Live Tables 管線,以 SQL 處理除擷取外的所有操作。

如需了解使用預設未封裝在 Delta Live Tables 中的函式庫的資訊詳情,請參閱 管理 Delta Live Tables 管線的 Python 相依性

從雲端物件記憶體載入檔案

Databricks 建議針對雲端物件記憶體的大部分數據擷取工作,搭配 Delta Live Tables 使用自動載入器。 自動加載器和 Delta Live Tables 的設計目的是在資料抵達雲端儲存時,增量地和等冪地載入不斷增長的數據。 下列範例使用自動載入器從 CSV 和 JSON 檔案建立資料集:

注意

若要在啟用了 Unity Catalog 的管線中使用自動載入器載入檔案,您必須使用 外部位置。 若要深入瞭解如何搭配 Delta Live Tables使用 Unity Catalog,請參閱 使用 Unity Catalog 搭配您的 Delta Live Tables 管線

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

請參閱 什麼是自動載入器?自動載入器 SQL 語法

警告

如果您使用自動載入器搭配檔案通知,並針對管線或串流 table執行完整的 refresh,您必須手動清除資源。 您可以在筆記本中使用 CloudFilesResourceManager 來執行清除。

從訊息匯流排載入資料

您可以將 Delta Live Tables 管線設定為從訊息總線以串流 tables引入數據。 Databricks 建議結合串流 tables、連續執行和增強的自動擴展,以最有效率地擷取低延遲的訊息總線載入。 查看 Optimize Delta Live Tables 管線叢集的使用率與增強型自動調整

例如,下列程式碼會設定串流 table 從 Kafka 引入資料:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

您可以在純 SQL 撰寫下游作業,以針對此資料執行串流轉換,如下列範例所示:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

如需了解使用 Event Hubs 的範例,請參閱 使用 Azure Event Hubs 作為 Delta Live Tables 數據來源

請參閱 <設定串流資料來源>。

從外部系統載入資料

Delta Live Tables 支援從 Azure Databricks 所支援的任何數據源載入數據。 請參閱 連線至數據源。 您也可以針對支持的數據源使用 Lakehouse 同盟載入外部數據。 因為 Lakehouse 同盟需要 Databricks Runtime 13.3 LTS 或更新版本,若要使用 Lakehouse 同盟,您必須將管線設定為使用 預覽通道

某些數據源在 SQL 中沒有對等的支援。 如果您無法搭配其中一個數據源使用 Lakehouse 同盟,您可以使用 Python 筆記本從來源內嵌數據。 您可以將 Python 和 SQL 原始程式碼新增至相同的 Delta Live Tables 管線。 下列範例會宣告具體化檢視,以存取遠端 PostgreSQL table中數據的目前狀態:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

從雲端物件儲存體載入小型或靜態資料集

您可以使用 Apache Spark 載入語法來載入小型或靜態數據集。 Delta Live Tables 支援 Azure Databricks 上 Apache Spark 支援的所有檔案格式。 如需完整的 list,請參閱 資料格式選項

下列範例示範載入 JSON 以建立 Delta Live Tablestables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

注意

SELECT * FROM format.`path`; SQL 建構適用於所有 Azure Databricks 上的 SQL 環境。 建議的模式是使用 SQL 搭配 Delta Live Tables,進行直接檔案存取。

使用管線中的秘密安全地存取記憶體 credentials

您可以使用 Azure Databricks 秘密 來儲存 credentials,例如存取密鑰或密碼。 若要在管線中設定秘密,請在管線設定叢集組態中使用Spark屬性。 請參閱 為 Delta Live Tables 管道配置計算資源。

下列範例會使用秘密來儲存使用自動載入器從 Azure Data Lake Storage Gen2 (ADLS Gen2) 記憶體帳戶讀取輸入數據所需的存取密鑰。 您可以使用這個相同的方法來設定管線所需的任何秘密,例如 AWS 密鑰來存取 S3 或 Apache Hive 中繼存放區的密碼。

若要深入瞭解如何使用 Azure Data Lake Storage Gen2,請參閱 聯機到 Azure Data Lake Storage Gen2 和 Blob 記憶體

注意

您必須將 spark.hadoop. 前置詞新增至 spark_conf 設定秘密值的組態金鑰。

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> 具有ADLS Gen2記憶體帳戶名稱。
  • 具有 Azure Databricks 祕密範圍名稱的 <scope-name>
  • 具有包含 Azure 儲存體帳戶存取金鑰之金鑰名稱的 <secret-name>
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> 使用儲存輸入數據的 Azure 記憶體帳戶容器名稱。
  • <storage-account-name> 具有ADLS Gen2記憶體帳戶名稱。
  • <path-to-input-dataset> 具有輸入數據集的路徑。

從Azure 事件中樞載入數據

Azure 事件中樞 是提供 Apache Kafka 相容介面的數據串流服務。 您可以使用 Delta Live Tables 執行時間中包含的結構化串流 Kafka 連接器,從 Azure 事件中樞載入訊息。 若要深入瞭解如何從 Azure 事件中樞載入和處理訊息,請參閱 使用 Azure 事件中樞作為 Delta Live Tables 資料來源