Udostępnij za pośrednictwem


Dokumentacja języka PYTHON DLT

Ten artykuł zawiera szczegółowe informacje dotyczące interfejsu programowania języka Python DLT.

Aby uzyskać informacje na temat interfejsu API SQL, zobacz dokumentację języka SQL DLT.

Aby uzyskać szczegółowe informacje dotyczące konfigurowania automatycznego modułu ładującego, zobacz Co to jest moduł automatycznego ładowania?.

Przed rozpoczęciem

Poniżej przedstawiono ważne zagadnienia dotyczące implementowania potoków za pomocą interfejsu JĘZYKA Python DLT:

  • Ponieważ funkcje table() i view() języka Python są wywoływane wiele razy podczas planowania i uruchamiania aktualizacji potoku, nie dołączaj kodu w jednej z tych funkcji, które mogą mieć skutki uboczne (na przykład kod modyfikujący dane lub wysyłający wiadomość e-mail). Aby uniknąć nieoczekiwanego zachowania, funkcje języka Python definiujące zestawy danych powinny zawierać tylko kod wymagany do zdefiniowania tabeli lub widoku.
  • Aby wykonywać operacje, takie jak wysyłanie wiadomości e-mail lub integrowanie z zewnętrzną usługą monitorowania, użyj haków zdarzeń, szczególnie w funkcjach definiujących zestawy danych. Zaimplementowanie tych operacji w funkcjach definiujących zestawy danych spowoduje nieoczekiwane zachowanie.
  • Funkcje table i view języka Python muszą zwracać ramkę danych. Niektóre funkcje działające na ramkach danych nie zwracają ramek danych i nie powinny być używane. Te operacje obejmują funkcje, takie jak collect(), count(), toPandas(), save()i saveAsTable(). Ponieważ transformacje ramki danych są wykonywane po, gdy został rozwiązany graf pełnego przepływu danych, użycie takich operacji może mieć niezamierzone skutki uboczne.

Importowanie modułu dlt Python

Funkcje języka Python DLT są definiowane w module dlt. Potoki zaimplementowane za pomocą interfejsu API języka Python muszą zaimportować ten moduł:

import dlt

Utwórz zmaterializowany widok DLT lub tabelę przesyłania strumieniowego

W języku Python DLT określa, czy zestaw danych ma być aktualizowany jako zmaterializowany widok, czy też tabela przesyłania strumieniowego w oparciu o definiujące zapytanie. Dekoratora @table można używać zarówno do definiowania zmaterializowanych widoków, jak i tabel strumieniowych.

Aby zdefiniować zmaterializowany widok w języku Python, zastosuj @table do zapytania wykonującego statyczny odczyt względem źródła danych. Aby zdefiniować tabelę przesyłania strumieniowego, zastosuj @table do zapytania, które wykonuje odczyt strumieniowy względem źródła danych lub użyj funkcji create_streaming_table(). Oba typy zestawów danych mają tę samą specyfikację składni w następujący sposób:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Tworzenie widoku DLT

Aby zdefiniować widok w języku Python, zastosuj dekorator @view. Podobnie jak dekorator @table, można użyć widoków w technologii DLT dla zestawów danych statycznych lub przesyłanych strumieniowo. Poniżej przedstawiono składnię definiującą widoki przy użyciu języka Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Przykład: Definiowanie tabel i widoków

Aby zdefiniować tabelę lub widok w języku Python, zastosuj @dlt.view lub dekorator @dlt.table do funkcji. Możesz użyć nazwy funkcji lub parametru name, aby przypisać tabelę lub nazwę widoku. Poniższy przykład definiuje dwa różne zestawy danych: widok o nazwie taxi_raw, który przyjmuje plik JSON jako źródło wejściowe i tabelę o nazwie filtered_data, która przyjmuje widok taxi_raw jako dane wejściowe:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("taxi_raw").where(...)

Przykład: uzyskiwanie dostępu do zestawu danych zdefiniowanego w tym samym potoku

Uwaga

Mimo że funkcje dlt.read() i dlt.read_stream() są nadal dostępne i w pełni obsługiwane przez interfejs DLT w języku Python, usługa Databricks zaleca zawsze używanie funkcji spark.read.table() i spark.readStream.table() z następujących powodów:

  • Funkcje spark obsługują odczytywanie zarówno wewnętrznych, jak i zewnętrznych zestawów danych, w tym tych zapisanych w magazynie zewnętrznym lub zdefiniowanych w innych potokach. Funkcje dlt obsługują tylko odczytywanie wewnętrznych zestawów danych.
  • Funkcje spark obsługują określanie opcji, takich jak skipChangeCommits, w celu odczytu operacji. Określanie opcji nie jest obsługiwane przez funkcje dlt.

Aby uzyskać dostęp do zestawu danych zdefiniowanego w tym samym potoku, użyj funkcji spark.read.table() lub spark.readStream.table().

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("customers_raw").where(...)

Notatka

Podczas wykonywania zapytań dotyczących widoków lub tabel w potoku można bezpośrednio określić wykaz i schemat lub użyć wartości domyślnych skonfigurowanych w potoku. W tym przykładzie tabela customersjest zapisywana i odczytywana z domyślnego katalogu i schematu skonfigurowanych dla twojego potoku.

Przykład: odczyt z tabeli zarejestrowanej w magazynie metadanych

Aby odczytać dane z tabeli zarejestrowanej w magazynie metadanych Hive, w argumencie funkcji można zakwalifikować nazwę tabeli przy użyciu nazwy bazy danych:

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

Aby zapoznać się z przykładem odczytu z tabeli w katalogu Unity, zobacz Pozyskiwanie danych do potoku katalogu Unity.

przykład : uzyskiwanie dostępu do zestawu danych przy użyciu spark.sql

Zestaw danych można również zwrócić przy użyciu wyrażenia spark.sql w funkcji zapytania. Aby odczytać z wewnętrznego zestawu danych, możesz pozostawić nazwę bez kwalifikacji, aby użyć domyślnego wykazu i schematu, lub można je wstępnie utworzyć:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")

Trwale usuń rekordy z zmaterializowanego widoku lub tabeli przesyłania strumieniowego

Aby trwale usunąć rekordy z zmaterializowanego widoku lub tabeli przesyłania strumieniowego z włączonymi wektorami usuwania, takimi jak zgodność z RODO, należy wykonać dodatkowe operacje na podstawowych tabelach delty obiektu. Aby zapewnić usunięcie rekordów z zmaterializowanego widoku, zobacz Trwałe usuwanie rekordów z zmaterializowanego widoku z włączonymi wektorami usuwania. Aby zapewnić usunięcie rekordów z tabeli przesyłania strumieniowego, zobacz Trwałe usuwanie rekordów z tabeli przesyłania strumieniowego.

zapisywanie w zewnętrznych usługach przesyłania strumieniowego zdarzeń lub w tabelach delta przy użyciu interfejsu API sink DLT

Ważny

Interfejs API sink DLT znajduje się w publicznej wersji zapoznawczej .

Notatka

  • Uruchomienie aktualizacji pełnej odświeżania nie powoduje wyczyszczenia danych z ujść. Wszystkie ponownie przetworzone dane zostaną dołączone do ujścia, a istniejące dane nie zostaną zmienione.
  • Oczekiwania DLT nie są obsługiwane w API sink.

Aby zapisać dane do usługi przesyłania strumieniowego zdarzeń, takiej jak Apache Kafka lub Azure Event Hubs, bądź do tabeli Delta z potoku DLT, użyj funkcji create_sink() zawartej w module dlt Python. Po utworzeniu ujścia za pomocą funkcji create_sink() należy użyć ujścia w przepływie dołączania do zapisywania danych w ujściu. Przepływ dołączania jest jedynym typem przepływu obsługiwanym przez funkcję create_sink(). Inne typy przepływów, takie jak apply_changes, nie są obsługiwane.

Poniżej przedstawiono składnię umożliwiającą utworzenie ujścia za pomocą funkcji create_sink():

create_sink(<sink_name>, <format>, <options>)
Argumenty
name

Typ: str

Ciąg, który identyfikuje ujście i służy do odwoływania się do niego oraz zarządzania nim. Nazwy wyjść muszą być unikatowe dla pipeline'u, we wszystkich fragmentach kodu źródłowego, takich jak notatniki lub moduły będące częścią pipeline'u.

Ten parametr jest wymagany.
format

Typ: str

Ciąg definiujący format danych wyjściowych kafka lub delta.

Ten parametr jest wymagany.
options

Typ: dict

Opcjonalna lista opcji ujścia, sformatowana jako {"key": "value"}, gdzie zarówno klucz, jak i wartość są ciągami znaków. Obsługiwane są wszystkie opcje środowiska Databricks Runtime, które są obsługiwane przez wyjścia Kafka i Delta. Aby uzyskać informacje o opcjach Kafka, zobacz Konfigurowanie zapisywania przesyłania strumieniowego w Kafka. Aby uzyskać informacje o opcjach funkcji delta, zobacz tabelę delty jako ujście.

Przykład: tworzenie sinku dla platformy Kafka za pomocą funkcji create_sink()

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

Przykład: tworzenie ujścia delty za pomocą funkcji create_sink() i ścieżki systemu plików

Poniższy przykład tworzy odbiornik danych, który zapisuje do tabeli Delta, przekazując ścieżkę systemu plików do tej tabeli.

create_sink(
  "my_delta_sink",
    "delta",
    { "path": "//path/to/my/delta/table" }
)

Przykład: tworzenie ujścia Delta za pomocą funkcji create_sink() oraz nazwy tabeli w Unity Catalog

Notatka

Zlew Delta supportuje zewnętrzne i zarządzane tabele Unity Catalog oraz zarządzane tabele w magazynie metadanych Hive. Nazwy tabel muszą być jednoznacznie określone. Na przykład tabele Unity Catalog muszą używać identyfikatora trójpoziomowego: <catalog>.<schema>.<table>. Tabele metadanych Hive muszą używać <schema>.<table>.

Poniższy przykład tworzy ujście, które zapisuje w tabeli Delta, podając nazwę tabeli w katalogu Unity.

create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)

Przykład: użycie przepływu dołączania do pisania do delta sinka

Poniższy przykład tworzy miejsce docelowe, które zapisuje dane do tabeli Delta, a następnie tworzy proces dołączania, aby zapisywać w tym miejscu docelowym.

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

Przykład: używanie przepływu dołączania do zapisu w ujściu platformy Kafka

Poniższy przykład tworzy ujście, które zapisuje w temacie platformy Kafka, a następnie tworzy przepływ dołączania do zapisu w tym ujściu:

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))

Schemat ramki danych zapisanej na platformie Kafka powinien zawierać kolumny określone w Konfigurowanie składnika zapisywania przesyłania strumieniowego ze strukturą platformy Kafka.

Utwórz tabelę do użycia jako element docelowy operacji przesyłania strumieniowego

Użyj funkcji create_streaming_table(), aby utworzyć tabelę docelową dla rekordów generowanych przez operacje przesyłania strumieniowego, w tym apply_changes(), apply_changes_from_snapshot()i @append_flow.

Notatka

Funkcje create_target_table() i create_streaming_live_table() są przestarzałe. Usługa Databricks zaleca aktualizowanie istniejącego kodu w celu korzystania z funkcji create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argumenty
name

Typ: str

Nazwa tabeli.

Ten parametr jest wymagany.
comment

Typ: str

Opcjonalny opis tabeli.
spark_conf

Typ: dict

Opcjonalna lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania.
table_properties

Typ: dict

Opcjonalna lista właściwości tabeli dla tabeli.
partition_cols

Typ: array

Opcjonalna lista co najmniej jednej kolumny używanej do partycjonowania tabeli.
cluster_by

Typ: array

Opcjonalnie włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania.

Zobacz Użyj klastrowania cieczy dla tabel Delta.
path

Typ: str

Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, system zostanie domyślnie ustawiony na lokalizację przechowywania potoku.
schema

Typ: str lub StructType

Opcjonalna definicja schematu dla tabeli. Schematy można zdefiniować jako ciąg DDL SQL lub w języku Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Typ: dict

Opcjonalne ograniczenia dotyczące jakości danych dla tabeli. Zobacz różne oczekiwania.
row_filter (publiczna wersja zapoznawcza)

Typ: str

Opcjonalna klauzula filtru wierszy dla tabeli. Zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.

Kontrolowanie sposobu materializacji tabel

Tabele oferują również dodatkową kontrolę nad ich materializacją:

  • Określ sposób tabel klastra przy użyciu cluster_by. Możesz użyć płynnego klastrowania, aby przyspieszyć zapytania. Zobacz Użyj klastrowania płynnego dla tabel Delta.
  • Określ sposób partycjonowania tabel przy użyciu partition_cols.
  • Właściwości tabeli można ustawić podczas definiowania widoku lub tabeli. Zobacz właściwości tabeli DLT .
  • Ustaw lokalizację przechowywania danych tabeli przy użyciu ustawienia path. Domyślnie dane tabeli są przechowywane w lokalizacji magazynu potoku, jeśli path nie jest ustawiona.
  • W definicji schematu można użyć wygenerowanych kolumn . Zobacz Przykład: określ schemat i kolumny klastrowe.

Notatka

W przypadku tabel o rozmiarze mniejszym niż 1 TB usługa Databricks zaleca, aby DLT kontrolowało organizację danych. Nie należy określać kolumn partycji, chyba że spodziewasz się, że tabela przekroczy terabajt.

Przykład: określić schemat i kolumny klastra

Opcjonalnie można określić schemat tabeli przy użyciu języka Python StructType lub ciągu DDL SQL. Gdy zdefiniowano ciąg DDL, definicja może zawierać kolumny wygenerowane .

Poniższy przykład tworzy tabelę o nazwie sales ze schematem określonym przy użyciu StructTypejęzyka Python:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Poniższy przykład określa schemat tabeli przy użyciu ciągu DDL, definiuje wygenerowaną kolumnę i definiuje kolumny klastrowania:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

Domyślnie biblioteka DLT wywnioskuje schemat z definicji table, jeśli nie określisz schematu.

Przykład: określanie kolumn partycji

Poniższy przykład określa schemat tabeli przy użyciu ciągu DDL, definiuje wygenerowaną kolumnę i definiuje kolumnę partycji:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Przykład: Definiowanie ograniczeń tabeli

Ważny

Ograniczenia tabel znajdują się w wersji publicznej zapoznawczej .

Podczas określania schematu można zdefiniować klucze podstawowe i obce. Ograniczenia są informacyjne i nie są wymuszane. Zobacz klauzulę CONSTRAINT w dokumentacji języka SQL.

W poniższym przykładzie zdefiniowano tabelę z ograniczeniem klucza podstawowego i obcego:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Przykład: Definiowanie filtru wierszy i maski kolumn

Ważny

Filtry wierszy i maski kolumn znajdują się w publicznej wersji zapoznawczej.

Aby utworzyć zmaterializowany widok lub tabelę przesyłania strumieniowego z filtrem wierszy oraz maską kolumn, użyj klauzuli ROW FILTER oraz klauzuli MASK. W poniższym przykładzie pokazano, jak zdefiniować zmaterializowany widok i tabelę przesyłania strumieniowego z filtrem wierszy i maską kolumn:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Aby uzyskać więcej informacji na temat filtrów wierszy i masek kolumn, zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.

Skonfiguruj tabelę streamingową, aby ignorować zmiany w źródłowej tabeli streamingowej

Notatka

  • Flaga skipChangeCommits działa tylko z spark.readStream przy użyciu funkcji option(). Nie można użyć tej flagi w funkcji dlt.read_stream().
  • Nie można użyć flagi skipChangeCommits, gdy źródłowa tabela przesyłania strumieniowego jest zdefiniowana jako cel funkcji apply_changes() .

Domyślnie tabele przesyłania strumieniowego wymagają źródeł tylko do dołączania. Jeśli tabela przesyłania strumieniowego używa innej tabeli przesyłania strumieniowego jako źródła, a źródłowa tabela przesyłania strumieniowego wymaga aktualizacji lub usunięcia, na przykład przetwarzania RODO "prawo do zapomnienia", podczas odczytywania źródłowej tabeli przesyłania strumieniowego można ustawić flagę skipChangeCommits w celu zignorowania tych zmian. Aby uzyskać więcej informacji na temat tej flagi, zobacz Ignoruj aktualizacje i usuwa.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

właściwości DLT w Pythonie

W poniższych tabelach opisano opcje i właściwości, które można określić podczas definiowania tabel i widoków za pomocą biblioteki DLT:

@table lub @view
name

Typ: str

Opcjonalna nazwa tabeli lub widoku. Jeśli nie jest zdefiniowana, nazwa funkcji jest używana jako nazwa tabeli lub widoku.
comment

Typ: str

Opcjonalny opis tabeli.
spark_conf

Typ: dict

Opcjonalna lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania.
table_properties

Typ: dict

Opcjonalna lista właściwości tabeli dla tabeli.
path

Typ: str

Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiony, system domyślnie przyjmuje lokalizację przechowywania potoku.
partition_cols

Typ: a collection of str

Opcjonalna kolekcja, na przykład list z co najmniej jednej kolumny do partycjonowania tabeli.
cluster_by

Typ: array

Opcjonalnie włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania.

Zobacz Użyj klastrowania cieczy dla tabel Delta.
schema

Typ: str lub StructType

Opcjonalna definicja schematu dla tabeli. Schematy można zdefiniować jako ciąg DDL SQL lub w języku Python StructType.
temporary

Typ: bool

Utwórz tabelę, ale nie publikuj metadanych dla tabeli. Słowo kluczowe temporary instruuje DLT o utworzeniu tabeli, która jest dostępna dla potoku, ale nie powinna być używana poza potokiem. Aby skrócić czas przetwarzania, tabela tymczasowa utrzymuje się przez okres istnienia potoku, który ją tworzy, a nie tylko pojedynczej aktualizacji.

Wartość domyślna to "False".
row_filter (publiczna wersja zapoznawcza)

Typ: str

Opcjonalna klauzula filtru wierszy dla tabeli. Zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.
Definicja tabeli lub widoku
def <function-name>()

Funkcja języka Python, która definiuje zestaw danych. Jeśli parametr name nie jest ustawiony, <function-name> jest używana jako nazwa docelowego zestawu danych.
query

Instrukcja Spark SQL zwracająca zestaw danych Platformy Spark lub ramkę danych Koalas.

Użyj dlt.read() lub spark.read.table(), aby wykonać pełny odczyt z zestawu danych zdefiniowanego w tym samym potoku. Aby odczytać zewnętrzny zestaw danych, użyj funkcji spark.read.table(). Nie można użyć dlt.read() do odczytywania zewnętrznych zestawów danych. Ponieważ spark.read.table() może służyć do odczytywania wewnętrznych zestawów danych, zestawów danych zdefiniowanych poza bieżącym potokiem i umożliwia określenie opcji odczytywania danych, usługa Databricks zaleca użycie go zamiast funkcji dlt.read().

Podczas definiowania zestawu danych w potoku domyślnie będzie on używać wykazu i schematu zdefiniowanego w konfiguracji potoku. Możesz użyć funkcji spark.read.table() do odczytu z zestawu danych zdefiniowanego w ramach procesu bez kwalifikacji. Aby na przykład odczytać z zestawu danych o nazwie customers:

spark.read.table("customers")

Możesz również użyć funkcji spark.read.table() do odczytu z tabeli zarejestrowanej w magazynie metadanych, opcjonalnie kwalifikując nazwę tabeli o nazwie bazy danych:

spark.read.table("sales.customers")

Użyj dlt.read_stream() lub spark.readStream.table(), aby wykonać odczyt strumieniowy z zestawu danych zdefiniowanego w tym samym potoku. Aby wykonać odczyt strumieniowy z zewnętrznego zestawu danych, użyj
funkcja spark.readStream.table(). Ponieważ spark.readStream.table() może służyć do odczytywania wewnętrznych zestawów danych, zestawów danych zdefiniowanych poza bieżącym potokiem i umożliwia określenie opcji odczytywania danych, usługa Databricks zaleca użycie go zamiast funkcji dlt.read_stream().

Aby zdefiniować zapytanie w funkcji table DLT przy użyciu składni JĘZYKA SQL, użyj funkcji spark.sql. Zobacz Przykład: uzyskiwanie dostępu do zestawu danych przy użyciu spark.sql. Aby zdefiniować zapytanie w funkcji table DLT przy użyciu języka Python, użyj składni PySpark.
Oczekiwania
@expect("description", "constraint")

Deklarowanie ograniczenia jakości danych zidentyfikowanego przez
description. Jeśli wiersz narusza oczekiwania, uwzględnij wiersz w docelowym zestawie danych.
@expect_or_drop("description", "constraint")

Deklarowanie ograniczenia jakości danych zidentyfikowanego przez
description. Jeśli wiersz narusza oczekiwania, upuść wiersz z docelowego zestawu danych.
@expect_or_fail("description", "constraint")

Deklarowanie ograniczenia jakości danych zidentyfikowanego przez
description. Jeśli wiersz narusza warunki, natychmiast zatrzymaj wykonywanie.
@expect_all(expectations)

Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych.
expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, uwzględnij wiersz w docelowym zestawie danych.
@expect_all_or_drop(expectations)

Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych.
expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, upuść wiersz z docelowego zestawu danych.
@expect_all_or_fail(expectations)

Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych.
expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, natychmiast zatrzymaj wykonywanie.

przechwytywanie danych ze zestawienia zmian za pomocą języka Python w technologii DLT

Użyj funkcji apply_changes() w interfejsie API języka Python, aby używać funkcjonalności przechwytywania zmian danych dla DLT (CDC) do przetwarzania danych źródłowych z kanału danych o zmianach (CDF).

Ważny

Aby zastosować zmiany, należy zadeklarować tabelę docelową przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu tabeli docelowej apply_changes() należy uwzględnić kolumny __START_AT i __END_AT o tym samym typie danych co pola sequence_by.

Aby utworzyć wymaganą tabelę docelową, możesz użyć funkcji create_streaming_table() w interfejsie języka Python DLT.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Notatka

W przypadku przetwarzania APPLY CHANGES domyślne zachowanie INSERT i zdarzeń UPDATE polega na upsert zdarzeń CDC ze źródła: zaktualizuj wszystkie wiersze w tabeli docelowej zgodne z określonymi kluczami lub wstaw nowy wiersz, gdy pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE można określić przy użyciu warunku APPLY AS DELETE WHEN.

Aby dowiedzieć się więcej na temat przetwarzania CDC z użyciem strumienia zmian, zobacz Interfejsy API APPLY CHANGES: uproszczenie przechwytywania zmian danych przy użyciu DLT. Aby zapoznać się z przykładem użycia funkcji apply_changes(), zobacz Przykład: przetwarzanie dla typów SCD 1 i SCD 2 przy użyciu danych źródłowych CDF.

Ważny

Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu tabeli docelowej apply_changes należy uwzględnić kolumny __START_AT i __END_AT o tym samym typie danych co pole sequence_by.

Zobacz Interfejsy API APPLY CHANGES: upraszczanie przechwytywania zmian danych za pomocą DLT.

Argumenty
target

Typ: str

Nazwa tabeli, którą należy zaktualizować. Za pomocą funkcji create_streaming_table() można utworzyć tabelę docelową przed wykonaniem funkcji apply_changes().

Ten parametr jest wymagany.
source

Typ: str

Źródło danych zawierające rekordy CDC.

Ten parametr jest wymagany.
keys

Typ: list

Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej.

Możesz określić jedną z następujących opcji:

- Lista ciągów: ["userId", "orderId"]
— Lista funkcji usługi Spark SQL col(): [col("userId"), col("orderId"]

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).

Ten parametr jest wymagany.
sequence_by

Typ: str lub col()

Nazwa kolumny określająca kolejność logiczną zdarzeń CDC w danych źródłowych. Biblioteka DLT używa tego sekwencjonowania do obsługi zdarzeń zmiany, które docierają z niewłaściwej kolejności.

Możesz określić jedną z następujących opcji:

- Ciąg: "sequenceNum"
- Funkcja col() Spark SQL: col("sequenceNum")

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).

Określona kolumna musi być sortowalnym typem danych.

Ten parametr jest wymagany.
ignore_null_updates

Typ: bool

Umożliwiaj przyjmowanie aktualizacji zawierających podzestaw kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i ignore_null_updates jest True, kolumny z null zachowują istniejące wartości w obiekcie docelowym. To również dotyczy kolumn zagnieżdżonych o wartości null. Gdy ignore_null_updates jest False, istniejące wartości są zastępowane wartościami null.

Ten parametr jest opcjonalny.

Wartość domyślna to False.
apply_as_deletes

Typ: str lub expr()

Określa, kiedy zdarzenie CDC powinno być traktowane jako DELETE, a nie upsert. Aby obsłużyć dane poza kolejnością, usunięty wiersz jest tymczasowo zachowywany jako znacznik w bazowej tabeli Delta, a widok jest tworzony w metastore, który filtruje te znaczniki. Interwał przechowywania można skonfigurować za pomocą polecenia
pipelines.cdc.tombstoneGCThresholdInSeconds właściwość tabeli.

Możesz określić jedną z następujących opcji:

- Ciąg: "Operation = 'DELETE'"
- Funkcja expr() Spark SQL: expr("Operation = 'DELETE'")

Ten parametr jest opcjonalny.
apply_as_truncates

Typ: str lub expr()

Określa, kiedy zdarzenie CDC powinno być traktowane jako pełna tabela TRUNCATE. Ponieważ ta klauzula wyzwala pełny obcięcie tabeli docelowej, powinna być używana tylko w określonych przypadkach użycia wymagających tej funkcji.

Parametr apply_as_truncates jest obsługiwany tylko dla typu SCD 1. Typ SCD 2 nie obsługuje operacji ucięcia.

Możesz określić jedną z następujących opcji:

- Ciąg: "Operation = 'TRUNCATE'"
- Funkcja expr() Spark SQL: expr("Operation = 'TRUNCATE'")

Ten parametr jest opcjonalny.
column_list

except_column_list

Typ: list

Podzbiór kolumn do uwzględnienia w tabeli docelowej. Użyj column_list, aby określić pełną listę kolumn do uwzględnienia. Użyj except_column_list, aby określić kolumny do wykluczenia. Wartość można zadeklarować jako listę ciągów lub jako funkcje usługi Spark SQL col():

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).

Ten parametr jest opcjonalny.

Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy do funkcji nie zostanie przekazany żaden argument column_list lub except_column_list.
stored_as_scd_type

Typ: str lub int

Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2.

Ustaw wartość 1 dla typu SCD 1 lub 2 dla typu SCD 2.

Ta klauzula jest opcjonalna.

Wartość domyślna to SCD typ 1.
track_history_column_list

track_history_except_column_list

Typ: list

Podzbiór kolumn wyjściowych, które mają być śledzone pod kątem historii w tabeli docelowej. Użyj track_history_column_list, aby określić pełną listę kolumn do śledzenia. Używać
track_history_except_column_list określić kolumny, które mają być wykluczone ze śledzenia. Wartość można zadeklarować jako listę ciągów lub jako funkcje usługi Spark SQL col():
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).

Ten parametr jest opcjonalny.

Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy brak track_history_column_list lub...
track_history_except_column_list argument jest przekazywany do funkcji.

zmienianie przechwytywania danych z migawek bazy danych za pomocą języka Python w technologii DLT

Ważny

Interfejs API APPLY CHANGES FROM SNAPSHOT znajduje się w wersji zapoznawczej dla użytkowników publicznych.

Użyj funkcji apply_changes_from_snapshot() w interfejsie API języka Python, aby używać funkcji przechwytywania zmian DLT (CDC) do przetwarzania danych źródłowych z migawek bazy danych.

Ważny

Aby wprowadzić zmiany, należy zdeklarować docelową tabelę do przesyłania danych. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu tabeli docelowej apply_changes_from_snapshot() należy również uwzględnić kolumny __START_AT i __END_AT o tym samym typie danych co pole sequence_by.

Aby utworzyć wymaganą tabelę docelową, możesz użyć funkcji create_streaming_table() w interfejsie języka Python DLT.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Notatka

W przypadku przetwarzania APPLY CHANGES FROM SNAPSHOT domyślne zachowanie polega na wstawieniu nowego wiersza, gdy pasujący rekord z tymi samymi kluczami nie istnieje w obiekcie docelowym. Jeśli istnieje pasujący rekord, jest aktualizowany tylko wtedy, gdy którakolwiek z wartości w wierszu uległa zmianie. Wiersze z kluczami obecnymi w obiekcie docelowym, ale już nieobecnymi w źródle, są usuwane.

Aby dowiedzieć się więcej o przetwarzaniu CDC za pomocą migawek, zobacz Interfejsy API DO ZASTOSOWANIA ZMIAN: Uproszczenie przechwytywania zmian danych za pomocą DLT. Przykłady użycia funkcji apply_changes_from_snapshot() można znaleźć w przykładach okresowego pozyskiwania migawek i historycznego pozyskiwania migawek.

Argumenty
target

Typ: str

Nazwa tabeli do zaktualizowania. Możesz użyć funkcji create_streaming_table(), aby utworzyć tabelę docelową przed uruchomieniem funkcji apply_changes().

Ten parametr jest wymagany.
source

Typ: str lub lambda function

Albo nazwa tabeli lub widoku do przechwytywania okresowego, albo funkcja lambda w języku Python, która zwraca ramkę danych do przetworzenia oraz wersję migawki. Zobacz Implementuj argument source.

Ten parametr jest wymagany.
keys

Typ: list

Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej.

Możesz określić jedną z następujących opcji:

- Lista ciągów: ["userId", "orderId"]
— Lista funkcji usługi Spark SQL col(): [col("userId"), col("orderId"]

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).

Ten parametr jest wymagany.
stored_as_scd_type

Typ: str lub int

Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2.

Ustaw wartość 1 dla typu SCD 1 lub 2 dla typu SCD 2.

Ta klauzula jest opcjonalna.

Wartość domyślna to SCD typ 1.
track_history_column_list

track_history_except_column_list

Typ: list

Podzbiór kolumn wyjściowych do śledzenia historii w tabeli docelowej. Użyj track_history_column_list, aby określić pełną listę kolumn do śledzenia. Użyj
track_history_except_column_list określić kolumny, które mają być wykluczone ze śledzenia. Wartość można zadeklarować jako listę ciągów lub jako funkcje usługi Spark SQL col():
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId), ale nie można użyć col(source.userId).

Ten parametr jest opcjonalny.

Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, jeśli nie track_history_column_list lub
track_history_except_column_list argument jest przekazywany do funkcji.

Implementowanie argumentu source

Funkcja apply_changes_from_snapshot() zawiera argument source. W przypadku przetwarzania migawek historycznych argument source powinien być funkcją lambda języka Python, która zwraca dwie wartości do funkcji apply_changes_from_snapshot(): ramkę danych języka Python zawierającą dane migawki do przetworzenia i wersję migawki.

Poniżej znajduje się podpis funkcji lambda:

lambda Any => Optional[(DataFrame, Any)]
  • Argumentem funkcji lambda jest najnowsza przetworzona wersja migawki.
  • Wartość zwracana funkcji lambda jest None lub krotka dwóch wartości: Pierwsza wartość krotki to ramka danych zawierająca migawkę do przetworzenia. Drugą wartością krotki jest wersja migawki, która reprezentuje kolejność logiczną migawki.

Przykład, który implementuje i wywołuje funkcję lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Środowisko uruchomieniowe DLT wykonuje następujące kroki za każdym razem, gdy pipeline zawierający funkcję apply_changes_from_snapshot() zostaje uruchomiony:

  1. Uruchamia funkcję next_snapshot_and_version, aby załadować następną ramkę migawki i odpowiadającą jej wersję migawki.
  2. Jeśli nie zostanie zwrócony żaden DataFrame, proces zostanie zakończony, a aktualizacja pipeline'u zostanie ukończona.
  3. Wykrywa zmiany w nowym zrzucie i przyrostowo stosuje je do tabeli docelowej.
  4. Wraca do kroku nr 1, aby załadować następną migawkę i jej wersję.

Ograniczenia

Interfejs DLT w języku Python ma następujące ograniczenie:

Funkcja pivot() nie jest obsługiwana. Operacja pivot na platformie Spark wymaga ładowania danych wejściowych w trybie gorliwym w celu obliczenia schematu wyjściowego. Ta funkcja nie jest obsługiwana w technologii DLT.