Načtení dat pomocí DLT
Data z libovolného zdroje dat podporovaného Apache Sparkem v Azure Databricks můžete načíst pomocí DLT. Datové sady (tabulky a pohledy) můžete definovat v DLT na základě libovolného dotazu, který vrací datový rámec Spark, včetně streamovaných datových rámců a Pandas pro datové rámce Spark. Pro úlohy příjmu dat doporučuje Databricks používat streamované tabulky pro většinu případů použití. Streamované tabulky jsou vhodné pro příjem dat z cloudového úložiště objektů pomocí Auto Loaderu nebo ze sběrnic zpráv, jako Kafka. Níže uvedené příklady ukazují některé běžné vzory.
Důležitý
Ne všechny zdroje dat mají podporu SQL. Poznámkové bloky SQL a Pythonu můžete kombinovat v kanálu DLT a používat SQL pro všechny operace nad rámec příjmu dat.
Podrobnosti o práci s knihovnami, které nejsou ve výchozím nastavení zabalené v DLT, najdete v tématu Správa závislostí Pythonu pro kanály DLT.
Načtení souborů z cloudového úložiště objektů
Databricks doporučuje používat Auto Loader s DLT pro většinu úloh příjmu dat z cloudového úložiště objektů. Auto Loader a DLT jsou navržené tak, aby postupně a idempotentním způsobem načítaly stále rostoucí data, jakmile dorazí do cloudového úložiště. Následující příklady používají Auto Loader k vytvoření datových sad ze souborů CSV a JSON:
Poznámka
Pokud chcete načíst soubory pomocí automatického zavaděče v pipeline s povoleným katalogem Unity, musíte využít externí umístění. Další informace o používání katalogu Unity s DLT najdete v tématu Použití katalogu Unity s kanály DLT.
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
Podívejte se na Co je automatický zavaděč? a syntaxe SQL automatického zavaděče.
Varování
Pokud používáte Auto Loader s oznámeními o souborech a spustíte úplnou aktualizaci pipeline nebo streamovací tabulky, musíte prostředky ručně vyčistit. K vyčištění můžete použít CloudFilesResourceManager v poznámkovém bloku.
Načtení dat ze sběrnice zpráv
Můžete nakonfigurovat DLT procesy, aby ingestovaly data ze sběrnice zpráv pomocí streamovaných tabulek. Databricks doporučuje kombinovat streamované tabulky s průběžným spouštěním a vylepšeným automatickým škálováním, aby se zajistilo nejúčinnější příjem dat pro načítání z sběrnic zpráv s nízkou latencí. Viz Optimalizace využití clusterů kanálů DLT s vylepšeným automatickým škálováním.
Následující kód například nakonfiguruje streamovací tabulku pro příjem dat ze systému Kafka:
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
Následné operace můžete napsat v čistém SQL pro provádění streamování transformací na těchto datech, jak ukazuje následující příklad:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(kafka_raw)
WHERE ...
Příklad práce se službou Event Hubs najdete v tématu Použití služby Azure Event Hubs jako zdroje dat DLT.
Viz Konfigurace streamovaných zdrojů dat.
Načtení dat z externích systémů
DLT podporuje načítání dat z libovolného zdroje dat podporovaného službou Azure Databricks. Viz Připojení ke zdrojům dat. Externí data můžete načíst také pomocí federace Lakehouse pro podporované zdroje dat. Vzhledem k tomu, že federace Lakehouse vyžaduje Databricks Runtime 13.3 LTS nebo vyšší, musí být pro použití služby Lakehouse Federation váš kanál nakonfigurovaný tak, aby používal kanál Preview.
Některé zdroje dat nemají v SQL ekvivalentní podporu. Pokud se službou Lakehouse Federation nemůžete použít některý z těchto zdrojů dat, můžete k ingestování dat ze zdroje použít poznámkový blok Pythonu. Zdrojový kód Pythonu a SQL můžete přidat do stejného kanálu DLT. Následující příklad deklaruje materializované zobrazení pro přístup k aktuálnímu stavu dat ve vzdálené tabulce PostgreSQL:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Načtení malých nebo statických datových sad z cloudového úložiště objektů
Malé nebo statické datové sady můžete načíst pomocí syntaxe načtení Apache Sparku. DLT podporuje všechny formáty souborů podporované Apache Sparkem v Azure Databricks. Úplný seznam najdete v tématu Možnosti formátu dat.
Následující příklady ukazují načtení JSON pro vytvoření tabulek DLT:
Python
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
Poznámka
Konstruktor SQL SELECT * FROM format.`path`;
je společný pro všechna prostředí SQL v Azure Databricks. Tento vzor se doporučuje pro přímý přístup k souborům pomocí SQL s DLT.
Bezpečný přístup k přihlašovacím údajům úložiště s tajemstvími v rámci pipeline
K ukládání přihlašovacích údajů, jako jsou přístupové klíče nebo hesla, můžete použít azure Databricks tajných kódů. Ke konfiguraci tajného kódu v kanálu použijte vlastnost Spark v konfiguraci clusteru nastavení kanálu. Vizte konfiguraci výpočetních prostředků pro DLT pipeline.
Následující příklad používá tajemství k uložení přístupového klíče potřebného ke čtení vstupních dat z účtu úložiště Azure Data Lake Storage Gen2 (ADLS Gen2) pomocí Auto Loader. Stejnou metodu můžete použít ke konfiguraci jakéhokoli tajného kódu vyžadovaného vaším kanálem, například klíčů AWS pro přístup k S3 nebo k heslu k metastoru Apache Hive.
Další informace o práci s Azure Data Lake Storage Gen2 najdete v tématu Připojení k Azure Data Lake Storage Gen2 a službě Blob Storage.
Poznámka
Do konfiguračního klíče spark_conf
, který nastaví hodnotu tajného klíče, musíte přidat předponu spark.hadoop.
.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Nahradit
-
<storage-account-name>
s názvem účtu úložiště ADLS Gen2. -
<scope-name>
s názvem tajné oblasti Azure Databricks. -
<secret-name>
s názvem klíče obsahujícího přístupový klíč účtu úložiště Azure.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Nahradit
-
<container-name>
s názvem kontejneru účtu úložiště Azure, který ukládá vstupní data. -
<storage-account-name>
s názvem účtu úložiště ADLS Gen2. -
<path-to-input-dataset>
s cestou ke vstupní datové sadě.
Načtení dat z Azure Event Hubs
Azure Event Hubs je služba streamování dat, která poskytuje rozhraní kompatibilní s Apache Kafka. K načtení zpráv ze služby Azure Event Hubs můžete použít konektor Kafka strukturovaného streamování, který je součástí modulu runtime DLT. Další informace o načítání a zpracování zpráv ze služby Azure Event Hubs najdete v tématu Použití služby Azure Event Hubs jako zdroje dat DLT.