Udostępnij za pośrednictwem


Ładowanie danych za pomocą tabel delta live

Dane można załadować z dowolnego źródła danych obsługiwanego przez platformę Apache Spark w usłudze Azure Databricks przy użyciu tabel delta live. Zestawy danych (tabele i widoki) w Delta Live Tables można definiować względem dowolnego zapytania zwracającego ramkę danych Spark, w tym ramki danych strumieniowych oraz Pandas dla ramek danych Spark. W przypadku zadań pozyskiwania danych Databricks zaleca używanie tabel przesyłania strumieniowego w większości zastosowań. Tabele przesyłania strumieniowego są dobre do pozyskiwania danych z magazynu obiektów w chmurze przy użyciu Auto Loader lub z systemów przesyłania komunikatów, takich jak Kafka. W poniższych przykładach przedstawiono kilka typowych wzorców.

Ważne

Nie wszystkie źródła danych mają obsługę języka SQL. Możesz mieszać notesy SQL i Python w potoku Delta Live Tables, aby używać języka SQL dla wszystkich operacji poza wczytywaniem.

Aby uzyskać szczegółowe informacje na temat pracy z bibliotekami, które nie są domyślnie pakowane w Delta Live Tables, zobacz Zarządzanie zależnościami Pythona dla potoków Delta Live Tables.

Wczytaj pliki z magazynu obiektów w chmurze

Databricks zaleca używanie Auto Loader z tabelami Delta Live Tables do większości zadań związanych z pozyskiwaniem danych z magazynu obiektów w chmurze. „Auto Loader” i „Delta Live Tables” są przeznaczone do przyrostowego i idempotentnego ładowania ciągle rosnących danych w miarę ich napływu do magazynu w chmurze. W poniższych przykładach użyto narzędzia Auto Loader do tworzenia zestawów danych z plików CSV i JSON:

Uwaga

Aby załadować pliki za pomocą Auto Loader w potoku z włączonym Unity Catalog, należy użyć lokalizacji zewnętrznych. Aby dowiedzieć się więcej na temat korzystania z Unity Catalog z Delta Live Tables, zobacz Use Unity Catalog with your Delta Live Tables pipelines.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Zobacz What is Auto Loader? and Auto Loader SQL syntax (Co to jest moduł automatycznego ładowania? i składnia SQL modułu ładującego automatycznego).

Ostrzeżenie

Jeśli używasz automatycznego modułu ładującego z powiadomieniami o plikach i uruchamiasz pełne odświeżanie potoku lub tabeli przesyłania strumieniowego, musisz ręcznie wyczyścić zasoby. Aby wykonać oczyszczanie, możesz użyć elementu CloudFilesResourceManager w notesie.

Załaduj dane z magistrali komunikatów

Można skonfigurować potoki Delta Live Tables do pozyskiwania danych z magistrali komunikatów przy użyciu tabel przesyłanych strumieniowo. Usługa Databricks zaleca łączenie tabel przesyłania strumieniowego z ciągłym wykonywaniem i ulepszonym skalowaniem automatycznym w celu zapewnienia najbardziej wydajnego pozyskiwania na potrzeby ładowania z magistrali komunikatów o małych opóźnieniach. Zobacz Optymalizacja wykorzystania klastrów w potokach Delta Live Tables za pomocą ulepszonego automatycznego skalowania.

Na przykład poniższy kod konfiguruje tabelę przesyłania strumieniowego do pozyskiwania danych z systemu Kafka.

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Operacje podrzędne można zapisywać w czystym języku SQL, aby wykonywać przekształcenia przesyłania strumieniowego na tych danych, jak w poniższym przykładzie:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(kafka_raw)
WHERE ...

Aby zapoznać się z przykładem pracy z usługą Event Hubs, zobacz Użyj Azure Event Hubs jako źródła danych dla Delta Live Tables.

Zobacz Konfigurowanie źródeł danych przesyłanych strumieniowo.

Załaduj dane z systemów zewnętrznych

Usługa Delta Live Tables obsługuje ładowanie danych z dowolnego źródła danych obsługiwanego przez usługę Azure Databricks. Zobacz Nawiązywanie połączenia ze źródłami danych. Możesz również załadować dane zewnętrzne przy użyciu usługi Lakehouse Federation dla obsługiwanych źródeł danych. Ponieważ federacja usługi Lakehouse wymaga środowiska Databricks Runtime 13.3 LTS lub nowszego, aby można było używać federacji usługi Lakehouse, potok musi być skonfigurowany do korzystania z kanału w wersji zapoznawczej.

Niektóre źródła danych nie mają równoważnej obsługi w języku SQL. Jeśli nie możesz użyć federacji lakehouse z jednym z tych źródeł danych, możesz użyć notesu języka Python do pozyskiwania danych ze źródła. Kod źródłowy języka Python i SQL można dodać do tego samego potoku Delta Live Tables. Poniższy przykład deklaruje zmaterializowany widok, aby uzyskać dostęp do bieżącego stanu danych w zdalnej tabeli PostgreSQL:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Załaduj małe lub statyczne zestawy danych z magazynu obiektów w chmurze

Małe lub statyczne zestawy danych można załadować przy użyciu składni ładowania platformy Apache Spark. Funkcja Delta Live Tables obsługuje wszystkie formaty plików obsługiwane przez platformę Apache Spark w usłudze Azure Databricks. Aby uzyskać pełną listę, zobacz Opcje formatowania danych.

W poniższych przykładach pokazano ładowanie kodu JSON w celu utworzenia tabel delta live tables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Uwaga

Konstrukcja SELECT * FROM format.`path`; SQL jest wspólna dla wszystkich środowisk SQL w usłudze Azure Databricks. Jest to zalecany wzorzec bezpośredniego dostępu do plików przy użyciu języka SQL z tabelami delta live.

Bezpieczny dostęp do poświadczeń magazynu za pomocą sekretów w pipeline'u

Możesz użyć Azure Databricks tajemnic do przechowywania poświadczeń, takich jak klucze dostępu lub hasła. Aby skonfigurować wpis tajny w potoku, użyj właściwości Spark w konfiguracji klastra ustawień potoku. Zobacz Configure compute for a Delta Live Tables pipeline (Konfigurowanie obliczeń dla potoku tabel na żywo delty).

W poniższym przykładzie użyto wpisu tajnego do przechowywania klucza dostępu wymaganego do odczytu danych wejściowych z konta magazynu usługi Azure Data Lake Storage Gen2 (ADLS Gen2) przy użyciu funkcji automatycznego ładowania. Tej samej metody można użyć do skonfigurowania dowolnego wpisu tajnego wymaganego przez potok, na przykład kluczy platformy AWS w celu uzyskania dostępu do usługi S3 lub hasła do magazynu metadanych Apache Hive.

Aby dowiedzieć się więcej na temat pracy z usługą Azure Data Lake Storage Gen2, zobacz Nawiązywanie połączenia z usługą Azure Data Lake Storage Gen2 i usługą Blob Storage.

Uwaga

Należy dodać spark.hadoop. prefiks do spark_conf klucza konfiguracji, który ustawia wartość wpisu tajnego.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> z nazwą konta magazynu usługi ADLS Gen2.
  • <scope-name> z nazwą zakresu wpisu tajnego usługi Azure Databricks.
  • <secret-name> z nazwą klucza zawierającego klucz dostępu do konta usługi Azure Storage.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> z nazwą kontenera konta usługi Azure Storage, który przechowuje dane wejściowe.
  • <storage-account-name> z nazwą konta magazynu usługi ADLS Gen2.
  • <path-to-input-dataset> ze ścieżką do wejściowego zestawu danych.

Ładowanie danych z usługi Azure Event Hubs

Azure Event Hubs to usługa przesyłania strumieniowego danych, która zapewnia interfejs zgodny z platformą Apache Kafka. Aby załadować komunikaty z usługi Azure Event Hubs, możesz użyć łącznika Structured Streaming dla Kafki, uwzględnionego w środowisku uruchomieniowym Delta Live Tables. Aby dowiedzieć się więcej na temat ładowania i przetwarzania komunikatów z usługi Azure Event Hubs, zobacz Użyj Azure Event Hubs jako źródła danych dla Delta Live Tables.