Sdílet prostřednictvím


Načtení dat pomocí DLT

Data z libovolného zdroje dat podporovaného Apache Sparkem v Azure Databricks můžete načíst pomocí DLT. Datové sady (tabulky a pohledy) můžete definovat v DLT na základě libovolného dotazu, který vrací datový rámec Spark, včetně streamovaných datových rámců a Pandas pro datové rámce Spark. Pro úlohy příjmu dat doporučuje Databricks používat streamované tabulky pro většinu případů použití. Streamované tabulky jsou vhodné pro příjem dat z cloudového úložiště objektů pomocí Auto Loaderu nebo ze sběrnic zpráv, jako Kafka. Níže uvedené příklady ukazují některé běžné vzory.

Důležitý

Ne všechny zdroje dat mají podporu SQL. Poznámkové bloky SQL a Pythonu můžete kombinovat v kanálu DLT a používat SQL pro všechny operace nad rámec příjmu dat.

Podrobnosti o práci s knihovnami, které nejsou ve výchozím nastavení zabalené v DLT, najdete v tématu Správa závislostí Pythonu pro kanály DLT.

Načtení souborů z cloudového úložiště objektů

Databricks doporučuje používat Auto Loader s DLT pro většinu úloh příjmu dat z cloudového úložiště objektů. Auto Loader a DLT jsou navržené tak, aby postupně a idempotentním způsobem načítaly stále rostoucí data, jakmile dorazí do cloudového úložiště. Následující příklady používají Auto Loader k vytvoření datových sad ze souborů CSV a JSON:

Poznámka

Pokud chcete načíst soubory pomocí automatického zavaděče v pipeline s povoleným katalogem Unity, musíte využít externí umístění. Další informace o používání katalogu Unity s DLT najdete v tématu Použití katalogu Unity s kanály DLT.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Podívejte se na Co je automatický zavaděč? a syntaxe SQL automatického zavaděče.

Varování

Pokud používáte Auto Loader s oznámeními o souborech a spustíte úplnou aktualizaci pipeline nebo streamovací tabulky, musíte prostředky ručně vyčistit. K vyčištění můžete použít CloudFilesResourceManager v poznámkovém bloku.

Načtení dat ze sběrnice zpráv

Můžete nakonfigurovat DLT procesy, aby ingestovaly data ze sběrnice zpráv pomocí streamovaných tabulek. Databricks doporučuje kombinovat streamované tabulky s průběžným spouštěním a vylepšeným automatickým škálováním, aby se zajistilo nejúčinnější příjem dat pro načítání z sběrnic zpráv s nízkou latencí. Viz Optimalizace využití clusterů kanálů DLT s vylepšeným automatickým škálováním.

Následující kód například nakonfiguruje streamovací tabulku pro příjem dat ze systému Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Následné operace můžete napsat v čistém SQL pro provádění streamování transformací na těchto datech, jak ukazuje následující příklad:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(kafka_raw)
WHERE ...

Příklad práce se službou Event Hubs najdete v tématu Použití služby Azure Event Hubs jako zdroje dat DLT.

Viz Konfigurace streamovaných zdrojů dat.

Načtení dat z externích systémů

DLT podporuje načítání dat z libovolného zdroje dat podporovaného službou Azure Databricks. Viz Připojení ke zdrojům dat. Externí data můžete načíst také pomocí federace Lakehouse pro podporované zdroje dat. Vzhledem k tomu, že federace Lakehouse vyžaduje Databricks Runtime 13.3 LTS nebo vyšší, musí být pro použití služby Lakehouse Federation váš kanál nakonfigurovaný tak, aby používal kanál Preview.

Některé zdroje dat nemají v SQL ekvivalentní podporu. Pokud se službou Lakehouse Federation nemůžete použít některý z těchto zdrojů dat, můžete k ingestování dat ze zdroje použít poznámkový blok Pythonu. Zdrojový kód Pythonu a SQL můžete přidat do stejného kanálu DLT. Následující příklad deklaruje materializované zobrazení pro přístup k aktuálnímu stavu dat ve vzdálené tabulce PostgreSQL:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Načtení malých nebo statických datových sad z cloudového úložiště objektů

Malé nebo statické datové sady můžete načíst pomocí syntaxe načtení Apache Sparku. DLT podporuje všechny formáty souborů podporované Apache Sparkem v Azure Databricks. Úplný seznam najdete v tématu Možnosti formátu dat.

Následující příklady ukazují načtení JSON pro vytvoření tabulek DLT:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Poznámka

Konstruktor SQL SELECT * FROM format.`path`; je společný pro všechna prostředí SQL v Azure Databricks. Tento vzor se doporučuje pro přímý přístup k souborům pomocí SQL s DLT.

Bezpečný přístup k přihlašovacím údajům úložiště s tajemstvími v rámci pipeline

K ukládání přihlašovacích údajů, jako jsou přístupové klíče nebo hesla, můžete použít azure Databricks tajných kódů. Ke konfiguraci tajného kódu v kanálu použijte vlastnost Spark v konfiguraci clusteru nastavení kanálu. Vizte konfiguraci výpočetních prostředků pro DLT pipeline.

Následující příklad používá tajemství k uložení přístupového klíče potřebného ke čtení vstupních dat z účtu úložiště Azure Data Lake Storage Gen2 (ADLS Gen2) pomocí Auto Loader. Stejnou metodu můžete použít ke konfiguraci jakéhokoli tajného kódu vyžadovaného vaším kanálem, například klíčů AWS pro přístup k S3 nebo k heslu k metastoru Apache Hive.

Další informace o práci s Azure Data Lake Storage Gen2 najdete v tématu Připojení k Azure Data Lake Storage Gen2 a službě Blob Storage.

Poznámka

Do konfiguračního klíče spark_conf, který nastaví hodnotu tajného klíče, musíte přidat předponu spark.hadoop..

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
      }
    }
  ],
  "name": "DLT quickstart using ADLS2"
}

Nahradit

  • <storage-account-name> s názvem účtu úložiště ADLS Gen2.
  • <scope-name> s názvem tajné oblasti Azure Databricks.
  • <secret-name> s názvem klíče obsahujícího přístupový klíč účtu úložiště Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Nahradit

  • <container-name> s názvem kontejneru účtu úložiště Azure, který ukládá vstupní data.
  • <storage-account-name> s názvem účtu úložiště ADLS Gen2.
  • <path-to-input-dataset> s cestou ke vstupní datové sadě.

Načtení dat z Azure Event Hubs

Azure Event Hubs je služba streamování dat, která poskytuje rozhraní kompatibilní s Apache Kafka. K načtení zpráv ze služby Azure Event Hubs můžete použít konektor Kafka strukturovaného streamování, který je součástí modulu runtime DLT. Další informace o načítání a zpracování zpráv ze služby Azure Event Hubs najdete v tématu Použití služby Azure Event Hubs jako zdroje dat DLT.