Udostępnij za pośrednictwem


Załaduj dane za pomocą platformy Delta Live Tables

Dane można załadować z dowolnego źródła danych obsługiwanego przez platformę Apache Spark w usłudze Azure Databricks przy użyciu platformy Delta Live Tables. Zestawy danych (tabele i widoki) można zdefiniować w tabelach platformy Delta Live Tables względem dowolnego zapytania zwracającego Spark DataFrame, w tym DataFrames i Pandas przesyłania strumieniowego i biblioteki dla Spark DataFrames. W przypadku zadań pozyskiwania danych usługa Databricks zaleca używanie tabel przesyłania strumieniowego w większości przypadków użycia. Tabele przesyłania strumieniowego są dobre do pozyskiwania danych z magazynu obiektów w chmurze przy użyciu modułu automatycznego ładowania lub z magistrali komunikatów, takich jak Kafka. W poniższych przykładach przedstawiono kilka typowych wzorców.

Ważne

Nie wszystkie źródła danych mają obsługę języka SQL. Notesy SQL i Python można mieszać w potoku Delta Live Tables, aby używać języka SQL dla wszystkich operacji poza pozyskiwaniem.

Aby uzyskać szczegółowe informacje na temat pracy z bibliotekami, które nie są domyślnie pakowane w tabelach delta Live Tables, zobacz Zarządzanie zależnościami języka Python dla potoków tabel na żywo delty.

Załaduj pliki z magazynu obiektów w chmurze

Usługa Databricks zaleca używanie automatycznego modułu ładującego z tabelami Delta Live Tables w przypadku większości zadań pozyskiwania danych z magazynu obiektów w chmurze. Automatyczne ładowanie i tabele na żywo delty są przeznaczone do przyrostowego i idempotentnego ładowania stale rosnących danych w miarę ich napływu do magazynu w chmurze. W poniższych przykładach użyto narzędzia Auto Loader do tworzenia zestawów danych z plików CSV i JSON:

Uwaga

Aby załadować pliki z automatycznym modułem ładującym w potoku z obsługą wykazu aparatu Unity, należy użyć lokalizacji zewnętrznych. Aby dowiedzieć się więcej o korzystaniu z rozwiązania Unity Catalog z tabelami Delta Live Tables, zobacz Używanie ozwiązania Unity Catalog z potokami platformy Delta Live Tables.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Zobacz What is Auto Loader? and Auto Loader SQL syntax (Co to jest moduł automatycznego ładowania? i składnia SQL modułu ładującego automatycznego).

Ostrzeżenie

Jeśli używasz automatycznego modułu ładującego z powiadomieniami o plikach i uruchamiasz pełne odświeżanie potoku lub tabeli przesyłania strumieniowego, musisz ręcznie wyczyścić zasoby. Aby wykonać oczyszczanie, możesz użyć elementu CloudFilesResourceManager w notesie.

Załaduj dane z magistrali komunikatów

Potoki delty tabel na żywo można skonfigurować do pozyskiwania danych z magistrali komunikatów przy użyciu tabel przesyłania strumieniowego. Usługa Databricks zaleca łączenie tabel przesyłania strumieniowego z ciągłym wykonywaniem i ulepszonym skalowaniem automatycznym w celu zapewnienia najbardziej wydajnego pozyskiwania na potrzeby ładowania z magistrali komunikatów o małych opóźnieniach. Zobacz Optymalizowanie wykorzystania klastra potoków tabel na żywo delty przy użyciu rozszerzonego skalowania automatycznego.

Na przykład poniższy kod konfiguruje tabelę przesyłania strumieniowego w celu pozyskiwania danych z platformy Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Operacje podrzędne można zapisywać w czystym języku SQL, aby wykonywać przekształcenia przesyłania strumieniowego na tych danych, jak w poniższym przykładzie:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Przykład pracy z usługą Event Hubs można znaleźć w temacie Use Azure Event Hubs as a Delta Live Tables data source (Używanie usługi Azure Event Hubs jako źródła danych delty tabel na żywo).

Zobacz Konfigurowanie źródeł danych przesyłanych strumieniowo.

Załaduj dane z systemów zewnętrznych

Usługa Delta Live Tables obsługuje ładowanie danych z dowolnego źródła danych obsługiwanego przez usługę Azure Databricks. Zobacz Nawiązywanie połączenia ze źródłami danych. Możesz również załadować dane zewnętrzne przy użyciu usługi Lakehouse Federation dla obsługiwanych źródeł danych. Ponieważ federacja usługi Lakehouse wymaga środowiska Databricks Runtime 13.3 LTS lub nowszego, aby można było używać federacji usługi Lakehouse, potok musi być skonfigurowany do korzystania z kanału w wersji zapoznawczej.

Niektóre źródła danych nie mają równoważnej obsługi w języku SQL. Jeśli nie możesz użyć federacji lakehouse z jednym z tych źródeł danych, możesz użyć notesu języka Python do pozyskiwania danych ze źródła. Kod źródłowy języka Python i SQL można dodać do tego samego potoku tabel na żywo delty. Poniższy przykład deklaruje zmaterializowany widok, aby uzyskać dostęp do bieżącego stanu danych w zdalnej tabeli PostgreSQL:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Załaduj małe lub statyczne zestawy danych z magazynu obiektów w chmurze

Małe lub statyczne zestawy danych można załadować przy użyciu składni ładowania platformy Apache Spark. Funkcja Delta Live Tables obsługuje wszystkie formaty plików obsługiwane przez platformę Apache Spark w usłudze Azure Databricks. Aby uzyskać pełną listę, zobacz Opcje formatowania danych.

W poniższych przykładach pokazano ładowanie kodu JSON w celu utworzenia tabel delta live tables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Uwaga

Konstrukcja SELECT * FROM format.`path`; SQL jest wspólna dla wszystkich środowisk SQL w usłudze Azure Databricks. Jest to zalecany wzorzec bezpośredniego dostępu do plików przy użyciu języka SQL z tabelami delta live.

Bezpieczny dostęp do poświadczeń magazynu za pomocą wpisów tajnych w potoku

Wpisów tajnych usługi Azure Databricks można używać do przechowywania poświadczeń, takich jak klucze dostępu lub hasła. Aby skonfigurować wpis tajny w potoku, użyj właściwości Spark w konfiguracji klastra ustawień potoku. Zobacz Configure compute for a Delta Live Tables pipeline (Konfigurowanie obliczeń dla potoku tabel na żywo delty).

W poniższym przykładzie użyto wpisu tajnego do przechowywania klucza dostępu wymaganego do odczytu danych wejściowych z konta magazynu usługi Azure Data Lake Storage Gen2 (ADLS Gen2) przy użyciu funkcji automatycznego ładowania. Tej samej metody można użyć do skonfigurowania dowolnego wpisu tajnego wymaganego przez potok, na przykład kluczy platformy AWS w celu uzyskania dostępu do usługi S3 lub hasła do magazynu metadanych Apache Hive.

Aby dowiedzieć się więcej na temat pracy z usługą Azure Data Lake Storage Gen2, zobacz Nawiązywanie połączenia z usługą Azure Data Lake Storage Gen2 i usługą Blob Storage.

Uwaga

Należy dodać spark.hadoop. prefiks do spark_conf klucza konfiguracji, który ustawia wartość wpisu tajnego.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> z nazwą konta magazynu usługi ADLS Gen2.
  • <scope-name> z nazwą zakresu wpisu tajnego usługi Azure Databricks.
  • <secret-name> z nazwą klucza zawierającego klucz dostępu do konta usługi Azure Storage.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> z nazwą kontenera konta usługi Azure Storage, który przechowuje dane wejściowe.
  • <storage-account-name> z nazwą konta magazynu usługi ADLS Gen2.
  • <path-to-input-dataset> ze ścieżką do wejściowego zestawu danych.

Ładowanie danych z usługi Azure Event Hubs

Azure Event Hubs to usługa przesyłania strumieniowego danych, która zapewnia interfejs zgodny z platformą Apache Kafka. Aby załadować komunikaty z usługi Azure Event Hubs, możesz użyć łącznika Platformy Kafka ze strukturą przesyłania strumieniowego uwzględnionego w środowisku uruchomieniowym delta Live Tables. Aby dowiedzieć się więcej na temat ładowania i przetwarzania komunikatów z usługi Azure Event Hubs, zobacz Używanie usługi Azure Event Hubs jako źródła danych usługi Delta Live Tables.