使用 DLT 載入數據
您可以使用 DLT,從 Azure Databricks 上的 Apache Spark 所支援的任何數據源載入數據。 您可以針對任何傳回 Spark DataFrame 的查詢,定義 DLT 中的數據集(數據表和檢視表),包括 Spark DataFrame 的串流數據框架和 Pandas。 針對數據擷取工作,Databricks 建議針對大部分的使用案例使用串流數據表。 串流數據表適用於使用自動載入器從雲端物件記憶體擷取數據,或從 Kafka 之類的訊息總線擷取數據。 下列範例示範一些常見的模式。
重要
並非所有數據源都有 SQL 支援。 在 DLT 管線中,您可以混合使用 SQL 和 Python 筆記本,以便在資料擷取之後的所有操作中使用 SQL。
如需使用預設不包括在 DLT 中的庫的詳細資訊,請見 管理 DLT 管線的 Python 相依性。
從雲端物件記憶體載入檔案
Databricks 建議針對雲端物件記憶體的大部分數據擷取工作,搭配 DLT 使用自動載入器。 自動載入器和 DLT 的設計目的是在數據抵達雲端記憶體時,以累加方式和等冪方式載入不斷成長的數據。 下列範例使用自動載入器從 CSV 和 JSON 檔案建立資料集:
注意
您若要在已啟用 Unity Catalog 的管線中使用自動載入器載入檔案,必須使用 外部位置。 若要深入瞭解如何搭配 DLT 使用 Unity 目錄,請參閱 搭配您的 DLT 管線使用 Unity 目錄。
蟒
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
請參閱 什麼是自動載入器? 與 自動載入器 SQL 語法。
警告
如果您使用自動載入器搭配檔案通知,並針對管線或串流數據表執行完整重新整理,則必須手動清除您的資源。 您可以使用筆記本中的 CloudFilesResourceManager 來執行清除。
從訊息總線載入數據
您可以將 DLT 管線設定為使用串流資料表從訊息總線擷取數據。 Databricks 建議結合串流數據表、連續執行和增強型自動調整功能,以提供最有效的數據擷取,從訊息總線實現低延遲的載入。 請參閱 透過強化自動縮放優化 DLT 管線的叢集效能。
例如,下列程式代碼會將串流數據表設定為從 Kafka 擷取資料:
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
您可以在純 SQL 撰寫下游作業,以針對此資料執行串流轉換,如下列範例所示:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(kafka_raw)
WHERE ...
如需使用事件中樞的範例,請參閱 使用 Azure 事件中樞作為 DLT 資料來源。
請參閱 設定串流資料來源。
從外部系統載入數據
DLT 支援從 Azure Databricks 所支援的任何數據源載入數據。 請參閱 連線至資料來源。 您也可以使用 Lakehouse Federation 載入外部資料,適用於 支援的數據源。 由於要使用 Lakehouse 聯盟需要 Databricks Runtime 13.3 LTS 或更新版本,因此要使用 Lakehouse 聯盟,您必須將管線設定為使用 預覽通道。
某些數據源在 SQL 中沒有對等的支援。 如果您無法使用 Lakehouse Federation 搭配其中一個數據來源,您可以使用 Python 筆記本從來源擷取數據。 您可以將 Python 和 SQL 原始程式碼新增至相同的 DLT 管線。 下列範例會宣告具體化檢視,以存取遠端 PostgreSQL 數據表中的數據目前狀態:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
從雲端物件記憶體載入小型或靜態數據集
您可以使用 Apache Spark 載入語法來載入小型或靜態數據集。 DLT 支援 Azure Databricks 上 Apache Spark 支援的所有檔案格式。 如需完整清單,請參閱 數據格式選項。
下列範例示範載入 JSON 以建立 DLT 資料表:
蟒
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
注意
SELECT * FROM format.`path`;
SQL 建構適用於所有 Azure Databricks 上的 SQL 環境。 建議使用 SQL 搭配 DLT 進行直接檔案存取的模式。
使用管線中的秘密安全地存取記憶體認證
您可以使用 Azure Databricks 秘密 來儲存認證,例如存取密鑰或密碼。 若要在管線中設定秘密,請在管線設定叢集組態中使用Spark屬性。 請參閱 配置 DLT 管線的運算資源。
下列範例會使用秘密來儲存從 Azure Data Lake Storage Gen2 (ADLS Gen2) 儲存體帳戶讀取輸入資料所需的存取金鑰,自動載入器。 您可以使用這個相同的方法來設定管線所需的任何秘密,例如 AWS 密鑰來存取 S3 或 Apache Hive 中繼存放區的密碼。
若要深入瞭解如何使用 Azure Data Lake Storage Gen2,請參閱 連線至 Azure Data Lake Storage Gen2 和 Blob 記憶體。
注意
您必須將 spark.hadoop.
前綴新增至設定秘密值的 spark_conf
配置密鑰。
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
取代
-
<storage-account-name>
具有ADLS Gen2記憶體帳戶名稱。 - 將
<scope-name>
與 Azure Databricks 秘密範圍名稱關聯。 -
<secret-name>
為包含 Azure 儲存帳戶存取金鑰的金鑰名稱。
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
取代
-
<container-name>
,其中包含儲存輸入數據的 Azure 記憶體帳戶容器名稱。 -
<storage-account-name>
具有ADLS Gen2記憶體帳戶名稱。 -
<path-to-input-dataset>
附有輸入數據集的路徑。
從 Azure 事件中樞載入數據
Azure 事件中樞是提供 Apache Kafka 相容介面的數據串流服務。 您可以使用 DLT 執行時間中包含的結構化串流 Kafka 連接器,從 Azure 事件中樞載入訊息。 若要深入瞭解如何從 Azure 事件中樞載入和處理訊息,請參閱 使用 Azure 事件中樞作為 DLT 數據源。