Udostępnij za pośrednictwem


Dokumentacja języka PYTHON DLT

Ten artykuł zawiera szczegółowe informacje dotyczące interfejsu programowania języka Python DLT.

Aby uzyskać informacje na temat interfejsu API SQL, zobacz dokumentację języka SQL DLT.

Aby uzyskać szczegółowe informacje dotyczące konfigurowania automatycznego modułu ładującego, zobacz Co to jest moduł automatycznego ładowania?.

Przed rozpoczęciem

Poniżej przedstawiono ważne zagadnienia dotyczące implementowania potoków za pomocą interfejsu JĘZYKA Python DLT:

  • Ponieważ funkcje table() i view() języka Python są wywoływane wiele razy podczas planowania i uruchamiania aktualizacji potoku, nie dołączaj kodu w którejkolwiek z tych funkcji, który może mieć skutki uboczne (na przykład kod modyfikujący dane lub wysyłający wiadomość e-mail). Aby uniknąć nieoczekiwanego zachowania, funkcje języka Python definiujące zestawy danych powinny zawierać tylko kod wymagany do zdefiniowania tabeli lub widoku.
  • Aby wykonywać operacje, takie jak wysyłanie wiadomości e-mail lub integrowanie z zewnętrzną usługą monitorowania, użyj haków zdarzeń, szczególnie w funkcjach definiujących zestawy danych. Zaimplementowanie tych operacji w funkcjach definiujących zestawy danych spowoduje nieoczekiwane zachowanie.
  • Funkcje table i view języka Python muszą zwracać ramkę danych. Niektóre funkcje działające na ramkach danych nie zwracają ramek danych i nie powinny być używane. Te operacje obejmują funkcje, takie jak collect(), count(), toPandas(), save()i saveAsTable(). Ponieważ transformacje ramki danych są wykonywane po, gdy został rozwiązany graf pełnego przepływu danych, użycie takich operacji może mieć niezamierzone skutki uboczne.

Importowanie modułu dlt Python

Funkcje języka Python DLT są definiowane w module dlt. Przepływy zaimplementowane przy użyciu interfejsu API języka Python muszą zaimportować ten moduł.

import dlt

Utwórz zmaterializowany widok DLT lub tabelę strumieniową

W języku Python DLT określa, czy zestaw danych ma być aktualizowany jako zmaterializowany widok, czy też tabela przesyłania strumieniowego w oparciu o definiujące zapytanie. Dekoratora @table można używać zarówno do definiowania zmaterializowanych widoków, jak i tabel strumieniowych.

Aby zdefiniować zmaterializowany widok w języku Python, zastosuj @table do zapytania wykonującego statyczny odczyt względem źródła danych. Aby zdefiniować tabelę przesyłania strumieniowego, zastosuj @table do zapytania, które wykonuje odczyt strumieniowy względem źródła danych lub użyj funkcji create_streaming_table(). Oba typy zestawów danych mają tę samą specyfikację składni w następujący sposób:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Tworzenie widoku DLT

Aby zdefiniować widok w języku Python, zastosuj dekorator @view. Podobnie jak dekorator @table, można użyć widoków w technologii DLT dla zestawów danych statycznych lub przesyłanych strumieniowo. Poniżej przedstawiono składnię definiującą widoki przy użyciu języka Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Przykład: Definiowanie tabel i widoków

Aby zdefiniować tabelę lub widok w języku Python, zastosuj @dlt.view lub dekorator @dlt.table do funkcji. Możesz użyć nazwy funkcji lub parametru name, aby przypisać tabelę lub nazwę widoku. Poniższy przykład definiuje dwa różne zestawy danych: widok o nazwie taxi_raw, który przyjmuje plik JSON jako źródło wejściowe i tabelę o nazwie filtered_data, która przyjmuje widok taxi_raw jako dane wejściowe:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("taxi_raw").where(...)

Przykład: uzyskiwanie dostępu do zestawu danych zdefiniowanego w tym samym potoku

Uwaga

Mimo że funkcje dlt.read() i dlt.read_stream() są nadal dostępne i w pełni obsługiwane przez interfejs DLT w języku Python, usługa Databricks zaleca zawsze używanie funkcji spark.read.table() i spark.readStream.table() z następujących powodów:

  • Funkcje spark obsługują odczytywanie zarówno wewnętrznych, jak i zewnętrznych zestawów danych, w tym tych zapisanych w magazynie zewnętrznym lub zdefiniowanych w innych potokach. Funkcje dlt obsługują tylko odczytywanie wewnętrznych zestawów danych.
  • Funkcje spark obsługują określanie opcji, takich jak skipChangeCommits, w celu odczytu operacji. Określanie opcji nie jest obsługiwane przez funkcje dlt.

Aby uzyskać dostęp do zestawu danych zdefiniowanego w tym samym potoku, użyj funkcji spark.read.table() lub spark.readStream.table().

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("customers_raw").where(...)

Uwaga

Podczas wykonywania zapytań dotyczących widoków lub tabel w potoku, można bezpośrednio określić katalog i schemat lub użyć domyślnych ustawień skonfigurowanych w potoku. W tym przykładzie tabela customersjest zapisywana i odczytywana z domyślnego katalogu i schematu skonfigurowanych dla twojego potoku.

Przykład: odczyt z tabeli zarejestrowanej w magazynie metadanych

Aby odczytać dane z tabeli zarejestrowanej w magazynie metadanych Hive, w argumencie funkcji można zakwalifikować nazwę tabeli przy użyciu nazwy bazy danych:

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

Aby zapoznać się z przykładem odczytu z tabeli w katalogu Unity, zobacz Pozyskiwanie danych do potoku katalogu Unity.

przykład : uzyskiwanie dostępu do zestawu danych przy użyciu spark.sql

Zestaw danych można również zwrócić przy użyciu wyrażenia spark.sql w funkcji zapytania. Aby odczytać z wewnętrznego zestawu danych, możesz pozostawić nazwę bez kwalifikacji, aby użyć domyślnego wykazu i schematu, lub można je wstępnie utworzyć:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")

Trwale usuń rekordy z zmaterializowanego widoku lub tabeli przesyłania strumieniowego

Aby trwale usunąć rekordy z zmaterializowanego widoku lub tabeli przesyłania strumieniowego z włączonymi wektorami usuwania, takimi jak zgodność z RODO, należy wykonać dodatkowe operacje na podstawowych tabelach delty obiektu. Aby zapewnić usunięcie rekordów z zmaterializowanego widoku, zobacz Trwałe usuwanie rekordów z zmaterializowanego widoku z włączonymi wektorami usuwania. Aby zapewnić usunięcie rekordów z tabeli przesyłania strumieniowego, zobacz Trwałe usuwanie rekordów z tabeli przesyłania strumieniowego.

zapisywanie w zewnętrznych usługach przesyłania strumieniowego zdarzeń lub w tabelach delta przy użyciu interfejsu API sink DLT

Ważny

Interfejs API sink DLT znajduje się w publicznej wersji zapoznawczej .

Uwaga

  • Uruchomienie pełnego odświeżenia nie powoduje wyczyszczenia danych z odbiorników. Wszystkie ponownie przetworzone dane zostaną dołączone do ujścia, a istniejące dane nie zostaną zmienione.
  • Oczekiwania DLT nie są obsługiwane w API sink.

Aby zapisać dane do usługi przesyłania strumieniowego zdarzeń, takiej jak Apache Kafka lub Azure Event Hubs, bądź do tabeli Delta z potoku DLT, użyj funkcji create_sink() zawartej w module dlt Python. Po utworzeniu ujścia za pomocą funkcji create_sink() należy użyć ujścia w przepływie dołączania do zapisywania danych w ujściu. Przepływ dodawania jest jedynym typem przepływu obsługiwanym przez funkcję create_sink(). Inne typy przepływów, takie jak apply_changes, nie są obsługiwane.

Poniżej przedstawiono składnię umożliwiającą utworzenie ujścia za pomocą funkcji create_sink():

create_sink(<sink_name>, <format>, <options>)
Argumenty
name
Typ: str
Ciąg, który identyfikuje ujście i służy do odwoływania się do niego oraz zarządzania nim. Nazwy wyjść muszą być unikatowe dla pipeline'u, we wszystkich fragmentach kodu źródłowego, takich jak notatniki lub moduły będące częścią pipeline'u.
Ten parametr jest wymagany.
format
Typ: str
Ciąg definiujący format danych wyjściowych kafka lub delta.
Ten parametr jest wymagany.
options
Typ: dict
Opcjonalna lista opcji ujścia, sformatowana jako {"key": "value"}, gdzie zarówno klucz, jak i wartość są ciągami znaków. Obsługiwane są wszystkie opcje środowiska Databricks Runtime, które są obsługiwane przez wyjścia Kafka i Delta. Aby uzyskać informacje o opcjach Kafka, zobacz Konfiguracja modułu zapisywania strukturalnego przesyłania strumieniowego dla Kafka. Aby uzyskać informacje o opcjach funkcji delta, zobacz tabelę delty jako ujście.

Przykład: tworzenie sinku dla platformy Kafka za pomocą funkcji create_sink()

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

Przykład: tworzenie ujścia delty za pomocą funkcji create_sink() i ścieżki systemu plików

Poniższy przykład tworzy odbiornik, który zapisuje dane do tabeli Delta, podając ścieżkę systemu plików do tej tabeli.

create_sink(
  "my_delta_sink",
    "delta",
    { "path": "//path/to/my/delta/table" }
)

Przykład: tworzenie ujścia Delta za pomocą funkcji create_sink() oraz nazwy tabeli w Unity Catalog

Uwaga

Zlewnia Delta obsługuje zewnętrzne i zarządzane tabele Unity Catalog oraz zarządzane tabele w Hive metastore. Nazwy tabel muszą być jednoznacznie określone. Na przykład tabele Unity Catalog muszą używać identyfikatora trójpoziomowego: <catalog>.<schema>.<table>. Tabele metadanych Hive muszą używać <schema>.<table>.

W poniższym przykładzie utworzono kanał, który zapisuje dane do tabeli Delta przez podanie nazwy tabeli w katalogu Unity.

create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)

Przykład: użycie przepływu dodawania w celu zapisu do Delta sinka

Poniższy przykład tworzy ujście, które zapisuje dane w tabeli Delta, a następnie tworzy strumień dołączania, aby zapisywać do tego ujścia.

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

Przykład: używanie przepływu dołączania do zapisu w ujściu platformy Kafka

Poniższy przykład tworzy odbiornik, który zapisuje dane w temacie Kafka, a następnie tworzy przepływ dołączania danych do zapisu w tym odbiorniku.

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))

Schemat DataFrame zapisanego na platformie Kafka powinien zawierać kolumny określone w Konfigurowaniu zapisywania strumieniowego Kafka.

Utwórz tabelę do użycia jako element docelowy operacji przesyłania strumieniowego

Użyj funkcji create_streaming_table(), aby utworzyć tabelę docelową dla rekordów generowanych przez operacje przesyłania strumieniowego, w tym apply_changes(), apply_changes_from_snapshot()i @append_flow.

Uwaga

Funkcje create_target_table() i create_streaming_live_table() są przestarzałe. Usługa Databricks zaleca aktualizowanie istniejącego kodu w celu korzystania z funkcji create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argumenty
name
Typ: str
Nazwa tabeli.
Ten parametr jest wymagany.
comment
Typ: str
Opcjonalny opis tabeli.
spark_conf
Typ: dict
Opcjonalna lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania.
table_properties
Typ: dict
Opcjonalna lista właściwości tabeli.
partition_cols
Typ: array
Opcjonalna lista co najmniej jednej kolumny używanej do partycjonowania tabeli.
cluster_by
Typ: array
Opcjonalnie włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania.
Zobacz Użyj klastrowania cieczy dla tabel Delta.
path
Typ: str
Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, system zostanie domyślnie ustawiony na lokalizację przechowywania potoku.
schema
Typ: str lub StructType
Opcjonalna definicja schematu dla tabeli. Schematy można zdefiniować jako ciąg DDL SQL lub w języku Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail
Typ: dict
Opcjonalne ograniczenia dotyczące jakości danych dla tabeli. Zobacz różne oczekiwania.
row_filter (publiczna wersja zapoznawcza)
Typ: str
Opcjonalna klauzula filtru wierszy dla tabeli. Zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.

Kontrolowanie sposobu materializacji tabel

Tabele oferują również dodatkową kontrolę nad ich materializacją:

  • Określ, jak klastrować tabele przy użyciu cluster_by. Możesz użyć płynnego klastrowania, aby przyspieszyć zapytania. Zobacz Użyj klastrowania cieczy dla tabel Delta.
  • Określ sposób partycjonowania tabel przy użyciu partition_cols.
  • Właściwości tabeli można ustawić podczas definiowania widoku lub tabeli. Zobacz właściwości tabeli DLT .
  • Ustaw lokalizację przechowywania danych tabeli przy użyciu ustawienia path. Domyślnie dane tabeli są przechowywane w lokalizacji magazynu potoku, jeśli path nie jest ustawiona.
  • W definicji schematu można użyć wygenerowanych kolumn . Zobacz Przykład: określ schemat i kolumny klastrowe.

Uwaga

W przypadku tabel o rozmiarze mniejszym niż 1 TB usługa Databricks zaleca, aby DLT kontrolowało organizację danych. Nie należy określać kolumn partycji, chyba że spodziewasz się, że tabela przekroczy terabajt.

Przykład: określić schemat i kolumny klastra

Opcjonalnie można określić schemat tabeli przy użyciu języka Python StructType lub ciągu DDL SQL. Gdy zdefiniowano ciąg DDL, definicja może zawierać kolumny wygenerowane .

Poniższy przykład tworzy tabelę o nazwie sales ze schematem określonym przy użyciu StructTypejęzyka Python:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Poniższy przykład określa schemat tabeli przy użyciu ciągu DDL, definiuje wygenerowaną kolumnę i definiuje kolumny klastrowania:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

Domyślnie biblioteka DLT wywnioskuje schemat z definicji table, jeśli nie określisz schematu.

Przykład: określanie kolumn partycji

Poniższy przykład określa schemat tabeli przy użyciu ciągu DDL, definiuje wygenerowaną kolumnę i definiuje kolumnę partycji:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Przykład: Definiowanie ograniczeń tabeli

Ważny

Ograniczenia tabel znajdują się w wersji zapoznawczej publicznej.

Podczas określania schematu można zdefiniować klucze podstawowe i obce. Ograniczenia są informacyjne i nie są wymuszane. Zobacz klauzulę CONSTRAINT w dokumentacji języka SQL.

W poniższym przykładzie zdefiniowano tabelę z ograniczeniem klucza podstawowego i obcego:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Przykład: Definiowanie filtru wierszy i maski kolumn

Ważny

Filtry wierszy i maski kolumn znajdują się w publicznej wersji zapoznawczej.

Aby utworzyć zmaterializowany widok lub tabelę przesyłania strumieniowego z filtrem wierszy oraz maską kolumn, użyj klauzuli ROW FILTER oraz klauzuli MASK. W poniższym przykładzie pokazano, jak zdefiniować zmaterializowany widok i tabelę przesyłania strumieniowego z filtrem wierszy i maską kolumn:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Aby uzyskać więcej informacji na temat filtrów wierszy i masek kolumn, zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.

Skonfiguruj tabelę streamingową, aby ignorować zmiany w źródłowej tabeli streamingowej

Uwaga

  • Flaga skipChangeCommits działa tylko z spark.readStream przy użyciu funkcji option(). Nie można użyć tej flagi w funkcji dlt.read_stream().
  • Nie można użyć flagi skipChangeCommits, gdy źródłowa tabela przesyłania strumieniowego jest zdefiniowana jako cel funkcji apply_changes() .

Domyślnie tabele przesyłania strumieniowego wymagają źródeł tylko do dołączania. Jeśli tabela przesyłania strumieniowego używa innej tabeli przesyłania strumieniowego jako źródła, a źródłowa tabela przesyłania strumieniowego wymaga aktualizacji lub usuwania, na przykład w ramach przetwarzania RODO "prawo do zapomnienia", można ustawić flagę skipChangeCommits podczas odczytywania tej tabeli, aby zignorować te zmiany. Aby uzyskać więcej informacji na temat tej flagi, zobacz Ignoruj aktualizacje i usuwa.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

właściwości DLT w Pythonie

W poniższych tabelach opisano opcje i właściwości, które można określić podczas definiowania tabel i widoków za pomocą biblioteki DLT:

@table lub @view
name
Typ: str
Opcjonalna nazwa tabeli lub widoku. Jeśli nie jest zdefiniowana, nazwa funkcji jest używana jako nazwa tabeli lub widoku.
comment
Typ: str
Opcjonalny opis tabeli.
spark_conf
Typ: dict
Opcjonalna lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania.
table_properties
Typ: dict
Opcjonalna lista właściwości tabeli.
path
Typ: str
Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, system zostanie domyślnie ustawiony na lokalizację przechowywania potoku.
partition_cols
Typ: a collection of str
Opcjonalna kolekcja, na przykład list składająca się z jednej lub więcej kolumn do celu partycjonowania tabeli.
cluster_by
Typ: array
Opcjonalnie włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania.
Zobacz Użyj klastrowania cieczy dla tabel Delta.
schema
Typ: str lub StructType
Opcjonalna definicja schematu dla tabeli. Schematy można zdefiniować jako ciąg DDL SQL lub w języku Python StructType.
temporary
Typ: bool
Utwórz tabelę, ale nie publikuj metadanych dla tabeli. Słowo kluczowe temporary nakazuje DLT utworzenie tabeli, która jest dostępna dla przepływu danych, ale nie powinna być używana poza tym przepływem. Aby skrócić czas przetwarzania, tabela tymczasowa jest zachowywana przez cały okres działania potoku, który ją tworzy, a nie tylko na potrzeby pojedynczej aktualizacji.
Wartość domyślna to "False".
row_filter (publiczna wersja zapoznawcza)
Typ: str
Opcjonalna klauzula filtru wierszy dla tabeli. Zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.
Definicja tabeli lub widoku
def <function-name>()
Funkcja języka Python, która definiuje zestaw danych. Jeśli parametr name nie jest ustawiony, <function-name> jest używana jako nazwa docelowego zestawu danych.
query
Instrukcja Spark SQL zwracająca zestaw danych Platformy Spark lub ramkę danych Koalas.
Użyj dlt.read() lub spark.read.table(), aby wykonać pełny odczyt z zestawu danych zdefiniowanego w tym samym potoku. Aby odczytać zewnętrzny zestaw danych, użyj funkcji spark.read.table(). Nie można użyć dlt.read() do odczytywania zewnętrznych zestawów danych. Ponieważ spark.read.table() może służyć do odczytywania wewnętrznych zestawów danych, zestawów danych zdefiniowanych poza bieżącym potokiem i umożliwia określenie opcji odczytywania danych, usługa Databricks zaleca użycie go zamiast funkcji dlt.read().
Podczas definiowania zestawu danych w potoku domyślnie będzie on używać wykazu i schematu zdefiniowanego w konfiguracji potoku. Możesz użyć funkcji spark.read.table() do odczytu z zestawu danych zdefiniowanego w ramach procesu bez kwalifikacji. Aby na przykład odczytać z zestawu danych o nazwie customers:
spark.read.table("customers")
Możesz również użyć funkcji spark.read.table() do odczytu z tabeli zarejestrowanej w magazynie metadanych, opcjonalnie kwalifikując nazwę tabeli o nazwie bazy danych:
spark.read.table("sales.customers")
Użyj dlt.read_stream() lub spark.readStream.table(), aby wykonać odczyt strumieniowy z zestawu danych zdefiniowanego w tym samym potoku. Aby wykonać odczyt strumieniowy z zewnętrznego zestawu danych, użyj
funkcja spark.readStream.table(). Ponieważ spark.readStream.table() może służyć do odczytywania wewnętrznych zestawów danych, zestawów danych zdefiniowanych poza bieżącym potokiem i umożliwia określenie opcji odczytywania danych, usługa Databricks zaleca użycie go zamiast funkcji dlt.read_stream().
Aby zdefiniować zapytanie w funkcji table DLT przy użyciu składni JĘZYKA SQL, użyj funkcji spark.sql. Zobacz Przykład: uzyskiwanie dostępu do zestawu danych przy użyciu spark.sql. Aby zdefiniować zapytanie w funkcji table DLT przy użyciu języka Python, użyj składni PySpark.
Oczekiwania
@expect("description", "constraint")
Deklarowanie ograniczenia jakości danych zidentyfikowanego przez
description. Jeśli wiersz narusza oczekiwania, uwzględnij wiersz w docelowym zestawie danych.
@expect_or_drop("description", "constraint")
Deklarowanie ograniczenia jakości danych zidentyfikowanego przez
description. Jeśli wiersz narusza oczekiwania, upuść wiersz z docelowego zestawu danych.
@expect_or_fail("description", "constraint")
Deklarowanie ograniczenia jakości danych zidentyfikowanego przez
description. Jeśli wiersz narusza oczekiwania, natychmiast zatrzymaj wykonywanie operacji.
@expect_all(expectations)
Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych.
expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, uwzględnij wiersz w docelowym zestawie danych.
@expect_all_or_drop(expectations)
Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych.
expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, upuść wiersz z docelowego zestawu danych.
@expect_all_or_fail(expectations)
Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych.
expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, natychmiast zatrzymaj wykonywanie.

przechwytywanie danych ze zestawienia zmian za pomocą języka Python w technologii DLT

Użyj funkcji apply_changes() w interfejsie API języka Python, aby używać funkcjonalności przechwytywania zmian danych dla DLT (CDC) do przetwarzania danych źródłowych z kanału danych o zmianach (CDF).

Ważny

Aby zastosować zmiany, należy zadeklarować tabelę docelową przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu tabeli docelowej apply_changes() należy uwzględnić kolumny __START_AT i __END_AT o tym samym typie danych co pola sequence_by.

Aby utworzyć wymaganą tabelę docelową, możesz użyć funkcji create_streaming_table() w interfejsie języka Python DLT.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Uwaga

W przypadku przetwarzania APPLY CHANGES, domyślne zachowanie podczas obsługi zdarzeń INSERT i UPDATE polega na przeprowadzaniu operacji upsert na zdarzeniach CDC ze źródła: zaktualizowanie wierszy w tabeli docelowej, które odpowiadają określonym kluczom, lub wstawienie nowego wiersza, jeśli odpowiadający rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE można określić przy użyciu warunku APPLY AS DELETE WHEN.

Aby dowiedzieć się więcej na temat przetwarzania CDC z użyciem strumienia zmian, zapoznaj się z APPLY CHANGES API: uproszczenie przechwytywania zmian danych przy użyciu DLT. Aby zapoznać się z przykładem użycia funkcji apply_changes(), zobacz Przykład: przetwarzanie dla typów SCD 1 i SCD 2 przy użyciu danych źródłowych CDF.

Ważny

Aby zastosować zmiany, należy ustalić tabelę docelową przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu tabeli docelowej apply_changes należy uwzględnić kolumny __START_AT i __END_AT o tym samym typie danych co pole sequence_by.

Zobacz Interfejsy API APPLY CHANGES: upraszczanie przechwytywania zmian danych za pomocą DLT.

Argumenty
target
Typ: str
Nazwa tabeli, którą należy zaktualizować. Za pomocą funkcji create_streaming_table() można utworzyć tabelę docelową przed wykonaniem funkcji apply_changes().
Ten parametr jest wymagany.
source
Typ: str
Źródło danych zawierające rekordy CDC.
Ten parametr jest wymagany.
keys
Typ: list
Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej.
Możesz określić jedną z następujących opcji:
  • Lista ciągów: ["userId", "orderId"]
  • Lista funkcji usługi Spark SQL col(): [col("userId"), col("orderId"]

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).
Ten parametr jest wymagany.
sequence_by
Typ: str lub col()
Nazwa kolumny określająca kolejność logiczną zdarzeń CDC w danych źródłowych. Biblioteka DLT używa tego sekwencjonowania do obsługi zdarzeń zmiany, które docierają w niewłaściwej kolejności.
Możesz określić jedną z następujących opcji:
  • Ciąg: "sequenceNum"
  • Funkcja col() Spark SQL: col("sequenceNum")

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).
Określona kolumna musi być sortowalnym typem danych.
Ten parametr jest wymagany.
ignore_null_updates
Typ: bool
Umożliwiaj przyjmowanie aktualizacji zawierających podzestaw kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i ignore_null_updates jest True, kolumny z null zachowują istniejące wartości w obiekcie docelowym. To również dotyczy kolumn zagnieżdżonych o wartości null. Gdy ignore_null_updates jest False, istniejące wartości są zastępowane wartościami null.
Ten parametr jest opcjonalny.
Wartość domyślna to False.
apply_as_deletes
Typ: str lub expr()
Określa, kiedy zdarzenie CDC powinno być traktowane jako DELETE, a nie upsert. Aby obsłużyć dane poza kolejnością, usunięty wiersz jest tymczasowo zachowywany jako tzw. "tombstone" w bazowej tabeli Delta, a w metastore tworzony jest widok, który filtruje te wpisy "tombstone". Interwał przechowywania można skonfigurować za pomocą
pipelines.cdc.tombstoneGCThresholdInSeconds właściwość tabeli.
Możesz określić jedną z następujących opcji:
  • Ciąg: "Operation = 'DELETE'"
  • Funkcja expr() Spark SQL: expr("Operation = 'DELETE'")

Ten parametr jest opcjonalny.
apply_as_truncates
Typ: str lub expr()
Określa, kiedy zdarzenie CDC powinno być traktowane jako pełna tabela TRUNCATE. Ponieważ ta klauzula wyzwala całkowite usunięcie danych z tabeli docelowej, powinna być używana tylko w konkretnych przypadkach użycia wymagających tej funkcjonalności.
Parametr apply_as_truncates jest obsługiwany tylko dla typu SCD 1. Typ SCD 2 nie obsługuje operacji skracania.
Możesz określić jedną z następujących opcji:
  • Ciąg: "Operation = 'TRUNCATE'"
  • Funkcja expr() Spark SQL: expr("Operation = 'TRUNCATE'")

Ten parametr jest opcjonalny.
column_list
except_column_list
Typ: list
Podzbiór kolumn do uwzględnienia w tabeli docelowej. Użyj column_list, aby określić pełną listę kolumn do uwzględnienia. Użyj except_column_list, aby określić kolumny do wykluczenia. Wartość można zadeklarować jako listę ciągów lub jako funkcje usługi Spark SQL col():
  • column_list = ["userId", "name", "city"].
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).
Ten parametr jest opcjonalny.
Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy do funkcji nie zostanie przekazany żaden argument column_list lub except_column_list.
stored_as_scd_type
Typ: str lub int
Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2.
Ustaw wartość 1 dla typu SCD 1 lub 2 dla typu SCD 2.
Ta klauzula jest opcjonalna.
Wartość domyślna to SCD typ 1.
track_history_column_list
track_history_except_column_list
Typ: list
Podzbiór kolumn wyjściowych, które mają być śledzone pod kątem historii w tabeli docelowej. Użyj track_history_column_list, aby określić pełną listę kolumn do śledzenia. Użyj
track_history_except_column_list określić kolumny, które mają być wykluczone ze śledzenia. Wartość można zadeklarować jako listę ciągów lub jako funkcje usługi Spark SQL col():
  • track_history_column_list = ["userId", "name", "city"].
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).
Ten parametr jest opcjonalny.
Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy brak track_history_column_list lub...
track_history_except_column_list argument jest przekazywany do funkcji.

zmienianie przechwytywania danych z migawek bazy danych za pomocą języka Python w technologii DLT

Ważny

Interfejs API APPLY CHANGES FROM SNAPSHOT znajduje się w fazie Public Preview dla użytkowników publicznych.

Użyj funkcji apply_changes_from_snapshot() w interfejsie API języka Python, aby używać funkcji przechwytywania zmian DLT (CDC) do przetwarzania danych źródłowych z migawek bazy danych.

Ważny

Aby zastosować zmiany, należy zadeklarować tabelę docelową przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu tabeli docelowej apply_changes_from_snapshot() należy również uwzględnić kolumny __START_AT i __END_AT o tym samym typie danych co pole sequence_by.

Aby utworzyć wymaganą tabelę docelową, możesz użyć funkcji create_streaming_table() w interfejsie języka Python DLT.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Uwaga

W przypadku przetwarzania APPLY CHANGES FROM SNAPSHOT domyślne zachowanie polega na wstawieniu nowego wiersza, gdy pasujący rekord z tymi samymi kluczami nie istnieje w obiekcie docelowym. Jeśli istnieje pasujący rekord, jest aktualizowany tylko wtedy, gdy którakolwiek z wartości w wierszu uległa zmianie. Wiersze z kluczami obecnymi w obiekcie docelowym, ale już nieobecnymi w źródle, są usuwane.

Aby dowiedzieć się więcej o przetwarzaniu CDC za pomocą migawek, zobacz Interfejsy API DO ZASTOSOWANIA ZMIAN: Uproszczenie przechwytywania zmian danych za pomocą DLT. Przykłady użycia funkcji apply_changes_from_snapshot() można znaleźć w przykładach okresowego pozyskiwania migawek i historycznego pozyskiwania migawek.

Argumenty
target
Typ: str
Nazwa tabeli, którą należy zaktualizować. Możesz użyć funkcji create_streaming_table(), aby utworzyć tabelę docelową przed uruchomieniem funkcji apply_changes().
Ten parametr jest wymagany.
source
Typ: str lub lambda function
Albo nazwa tabeli lub widoku do tworzenia migawek w regularnych odstępach czasu, albo funkcja lambda w języku Python, która zwraca ramkę danych migawek do przetworzenia oraz wersję migawki. Zobacz Implementuj argument source.
Ten parametr jest wymagany.
keys
Typ: list
Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej.
Możesz określić jedną z następujących opcji:
  • Lista ciągów: ["userId", "orderId"]
  • Lista funkcji usługi Spark SQL col(): [col("userId"), col("orderId"]

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).
Ten parametr jest wymagany.
stored_as_scd_type
Typ: str lub int
Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2.
Ustaw wartość 1 dla typu SCD 1 lub 2 dla typu SCD 2.
Ta klauzula jest opcjonalna.
Wartość domyślna to SCD typ 1.
track_history_column_list
track_history_except_column_list
Typ: list
Podzbiór kolumn wyjściowych, które mają być śledzone pod kątem historii w tabeli docelowej. Użyj track_history_column_list, aby określić pełną listę kolumn do śledzenia. Użyj
track_history_except_column_list określić kolumny, które mają być wykluczone ze śledzenia. Wartość można zadeklarować jako listę ciągów lub jako funkcje usługi Spark SQL col():
  • track_history_column_list = ["userId", "name", "city"].
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).
Ten parametr jest opcjonalny.
Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy brak track_history_column_list lub...
track_history_except_column_list argument jest przekazywany do funkcji.

Implementowanie argumentu source

Funkcja apply_changes_from_snapshot() zawiera argument source. W przypadku przetwarzania migawek historycznych argument source powinien być funkcją lambda języka Python, która zwraca dwie wartości do funkcji apply_changes_from_snapshot(): ramkę danych języka Python zawierającą dane migawki do przetworzenia i wersję migawki.

Poniżej znajduje się podpis funkcji lambda:

lambda Any => Optional[(DataFrame, Any)]
  • Argumentem funkcji lambda jest najnowsza przetworzona wersja migawki.
  • Wartość zwracana funkcji lambda jest None lub krotka dwóch wartości: Pierwsza wartość krotki to ramka danych zawierająca migawkę do przetworzenia. Drugą wartością krotki jest wersja migawki, która reprezentuje logiczną kolejność tej migawki.

Przykład, który implementuje i wywołuje funkcję lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Środowisko uruchomieniowe DLT wykonuje następujące kroki za każdym razem, gdy pipeline zawierający funkcję apply_changes_from_snapshot() zostaje uruchomiony:

  1. Uruchamia funkcję next_snapshot_and_version, aby załadować następną ramkę migawki i odpowiadającą jej wersję migawki.
  2. Jeśli nie zostanie zwrócony żaden DataFrame, proces zostanie zakończony, a aktualizacja pipeline'u zostanie ukończona.
  3. Wykrywa zmiany w nowym zrzucie i przyrostowo stosuje je do tabeli docelowej.
  4. Wraca do kroku nr 1, aby załadować następną migawkę i jej wersję.

Ograniczenia

Interfejs DLT w języku Python ma następujące ograniczenie:

Funkcja pivot() nie jest obsługiwana. Operacja pivot na platformie Spark wymaga ładowania danych wejściowych w trybie gorliwym w celu obliczenia schematu wyjściowego. Ta funkcja nie jest obsługiwana w technologii DLT.