Udostępnij za pośrednictwem


Ładowanie danych za pomocą biblioteki DLT

Dane można załadować z dowolnego źródła danych obsługiwanego przez platformę Apache Spark w usłudze Azure Databricks przy użyciu biblioteki DLT. Zestawy danych (tabele i widoki) można zdefiniować w DLT względem dowolnego zapytania zwracającego ramkę danych Spark, w tym strumieniowych ramek danych i bibliotekę Pandas dla ramek danych Spark. Dla procesów integracyjnych danych usługa Databricks zaleca używanie tabel przesyłania strumieniowego w większości przypadków użycia. Tabele przesyłania strumieniowego są dobre do pozyskiwania danych z magazynu obiektów w chmurze przy użyciu modułu automatycznego ładującego lub z magistrali komunikatów, takich jak Kafka. W poniższych przykładach przedstawiono niektóre typowe wzorce.

Ważny

Nie wszystkie źródła danych mają obsługę języka SQL. Możesz łączyć notesy SQL i Python w potoku DLT, aby używać języka SQL dla wszystkich operacji oprócz importu danych.

Aby uzyskać szczegółowe informacje na temat pracy z bibliotekami, które nie są domyślnie pakowane w bibliotece DLT, zobacz Zarządzanie zależnościami języka Python dla potoków DLT.

ładowanie plików z magazynu obiektów w chmurze

Databricks rekomenduje używanie Auto Loader z technologią DLT do większości zadań związanych z pozyskiwaniem danych z magazynu obiektów w chmurze. Automatyczne ładowanie i DLT są przeznaczone do przyrostowego i idempotentnego ładowania stale rosnących danych, gdy tylko napływają do magazynu w chmurze. W poniższych przykładach użyto narzędzia Auto Loader do tworzenia zestawów danych z plików CSV i JSON:

Notatka

Aby załadować pliki z automatycznym modułem ładującym w potoku z włączonym katalogiem Unity, należy użyć lokalizacji zewnętrznych. Aby dowiedzieć się więcej na temat korzystania z Unity Catalog z potokami DLT, zobacz Używanie Unity Catalog z potokami DLT.

Pyton

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Zobacz Co to jest moduł automatycznego ładowania? i Składnia SQL automatycznego modułu ładującego SQL.

Ostrzeżenie

Jeśli używasz automatycznego modułu ładującego z powiadomieniami o plikach i uruchamiasz pełne odświeżanie potoku lub tabeli przesyłania strumieniowego, musisz ręcznie wyczyścić zasoby. Aby wykonać czyszczenie, możesz użyć CloudFilesResourceManager w notatniku.

Ładowanie danych z magistrali komunikatów

Potoki DLT można skonfigurować do pozyskiwania danych z magistrali komunikatów za pomocą tabel przesyłania strumieniowego. Databricks zaleca łączenie tabel strumieniowych z ciągłym wykonywaniem i ulepszonym skalowaniem automatycznym, aby zapewnić najbardziej efektywne wczytywanie przy niskich opóźnieniach z magistrali komunikatów. Zobacz Optymalizowanie wykorzystania klastra potoków DLT za pomocą rozszerzonego skalowania automatycznego.

Na przykład poniższy kod konfiguruje tabelę przesyłania strumieniowego w celu pozyskiwania danych z platformy Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Operacje podrzędne można zapisywać w czystym języku SQL, aby wykonywać przekształcenia strumieniowe na tych danych, jak w poniższym przykładzie:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(kafka_raw)
WHERE ...

Aby zapoznać się z przykładem pracy z usługą Event Hubs, zobacz Użyj Azure Event Hubs jako źródła danych DLT.

Zobacz Konfigurowanie źródeł danych przesyłania strumieniowego.

Ładowanie danych z systemów zewnętrznych

Biblioteka DLT obsługuje ładowanie danych z dowolnego źródła danych obsługiwanego przez usługę Azure Databricks. Zobacz Connect to data sources (Łączenie ze źródłami danych). Możesz również załadować dane zewnętrzne za pomocą usługi Lakehouse Federation dla obsługiwanych źródeł danych . Ponieważ federacja usługi Lakehouse wymaga środowiska Databricks Runtime 13.3 LTS lub nowszego, aby można było używać federacji usługi Lakehouse, potok musi być skonfigurowany do korzystania z kanału w wersji zapoznawczej.

Niektóre źródła danych nie mają równoważnej obsługi w języku SQL. Jeśli nie możesz użyć Federation Lakehouse z jednym z tych źródeł danych, możesz użyć notatnika Python do pobierania danych ze źródła. Możesz dodać język Python i kod źródłowy SQL do tego samego potoku DLT. Poniższy przykład deklaruje zmaterializowany widok, aby uzyskać dostęp do bieżącego stanu danych w zdalnej tabeli PostgreSQL:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Ładowanie małych lub statycznych zestawów danych z magazynu obiektów w chmurze

Małe lub statyczne zestawy danych można załadować przy użyciu składni ładowania platformy Apache Spark. Biblioteka DLT obsługuje wszystkie formaty plików obsługiwane przez platformę Apache Spark w usłudze Azure Databricks. Aby uzyskać pełną listę, zobacz Opcje formatowania danych.

W poniższych przykładach pokazano ładowanie kodu JSON w celu utworzenia tabel DLT:

Pyton

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Notatka

Konstrukcja SELECT * FROM format.`path`; SQL jest wspólna dla wszystkich środowisk SQL w usłudze Azure Databricks. Jest to zalecany wzorzec bezpośredniego dostępu do plików przy użyciu języka SQL z biblioteką DLT.

Bezpiecznie uzyskaj dostęp do poświadczeń magazynu za pomocą tajnych wartości w potoku danych

Możesz użyć Azure Databricks do przechowywania tajnych danych, takich jak klucze dostępu lub hasła. Aby skonfigurować tajne dane w potoku, użyj właściwości Spark w konfiguracji klastra w ustawieniach potoku. Zobacz Konfigurowanie obliczeń dla potoku DLT.

W poniższym przykładzie użyto wpisu tajnego do przechowywania klucza dostępu wymaganego do odczytu danych wejściowych z konta magazynu usługi Azure Data Lake Storage Gen2 (ADLS Gen2) przy użyciu automatycznego modułu ładującego. Możesz użyć tej samej metody do skonfigurowania dowolnego tajnego elementu wymaganego przez potok, na przykład kluczy AWS do uzyskania dostępu do S3 lub hasła do magazynu metadanych Apache Hive.

Aby dowiedzieć się więcej na temat pracy z usługą Azure Data Lake Storage Gen2, zobacz Łączenie z usługą Azure Data Lake Storage Gen2 i Blob Storage.

Notatka

Należy dodać prefiks spark.hadoop. do klucza konfiguracji spark_conf, który ustawia tajną wartość.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
      }
    }
  ],
  "name": "DLT quickstart using ADLS2"
}

Zastąpić

  • <storage-account-name> z nazwą konta magazynu ADLS Gen2.
  • <scope-name> z nazwą zakresu sekretnych danych usługi Azure Databricks.
  • <secret-name> z nazwą klucza zawierającego klucz dostępu do konta usługi Azure Storage.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Zastąpić

  • <container-name> z nazwą kontenera konta usługi Azure Storage, który przechowuje dane wejściowe.
  • <storage-account-name> z nazwą konta magazynu ADLS Gen2.
  • <path-to-input-dataset> z ścieżką do wejściowego zestawu danych.

Ładowanie danych z usługi Azure Event Hubs

Azure Event Hubs to usługa przesyłania strumieniowego danych, która zapewnia interfejs zgodny z platformą Apache Kafka. Aby załadować komunikaty z usługi Azure Event Hubs, możesz użyć łącznika Kafka do Przesyłania Strumieniowego z Strukturą, uwzględnionego w środowisku uruchomieniowym DLT. Aby dowiedzieć się więcej na temat ładowania i przetwarzania komunikatów z usługi Azure Event Hubs, zobacz Użyj Azure Event Hubs jako źródła danych DLT.