Udostępnij za pośrednictwem


Dokumentacja języka Python w tabelach delta Live Tables

Ten artykuł zawiera szczegółowe informacje dotyczące interfejsu programowania Delta Live Tables w języku Python.

Aby uzyskać informacje na temat interfejsu API SQL, zobacz dokumentację języka SQL delta Live Tables.

Aby uzyskać szczegółowe informacje dotyczące konfigurowania automatycznego modułu ładującego, zobacz Co to jest moduł automatycznego ładowania?.

Przed rozpoczęciem

Poniżej przedstawiono ważne zagadnienia dotyczące implementowania potoków za pomocą interfejsu języka Python delta Live Tables:

  • Ponieważ język Python table() i view() funkcje są wywoływane wielokrotnie podczas planowania i uruchamiania aktualizacji potoku, nie dołączaj kodu do jednej z tych funkcji, które mogą mieć skutki uboczne (na przykład kod modyfikujący dane lub wysyłający wiadomość e-mail). Aby uniknąć nieoczekiwanego zachowania, funkcje języka Python definiujące zestawy danych powinny zawierać tylko kod wymagany do zdefiniowania tabeli lub widoku.
  • Aby wykonywać operacje, takie jak wysyłanie wiadomości e-mail lub integrowanie z zewnętrzną usługą monitorowania, szczególnie w funkcjach definiujących zestawy danych, użyj punktów zaczepienia zdarzeń. Zaimplementowanie tych operacji w funkcjach definiujących zestawy danych spowoduje nieoczekiwane zachowanie.
  • Język Python table i view funkcje muszą zwracać ramkę danych. Niektóre funkcje działające na ramkach danych nie zwracają ramek danych i nie powinny być używane. Te operacje obejmują funkcje, takie jak collect(), , count()toPandas(), save()i saveAsTable(). Ponieważ przekształcenia ramki danych są wykonywane po rozwiązaniu pełnego grafu przepływu danych, użycie takich operacji może mieć niezamierzone skutki uboczne.

Importowanie modułu dlt języka Python

Funkcje języka Python w tabelach na żywo funkcji delta są definiowane w dlt module. Potoki zaimplementowane za pomocą interfejsu API języka Python muszą zaimportować ten moduł:

import dlt

Tworzenie tabel delta live zmaterializowanego widoku lub tabeli przesyłania strumieniowego

W języku Python funkcja Delta Live Tables określa, czy zestaw danych ma być aktualizowany jako zmaterializowany widok, czy tabela przesyłania strumieniowego na podstawie definiującego zapytania. Dekorator @table może służyć do definiowania zarówno zmaterializowanych widoków, jak i tabel przesyłania strumieniowego.

Aby zdefiniować zmaterializowany widok w języku Python, zastosuj do @table zapytania, które wykonuje statyczny odczyt względem źródła danych. Aby zdefiniować tabelę przesyłania strumieniowego, zastosuj się @table do zapytania, które wykonuje odczyt strumieniowy względem źródła danych lub użyj funkcji create_streaming_table(). Oba typy zestawów danych mają tę samą specyfikację składni w następujący sposób:

Uwaga

Aby użyć argumentu cluster_by w celu włączenia klastrowania liquid, potok musi być skonfigurowany do korzystania z kanału w wersji zapoznawczej.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Tworzenie widoku tabel na żywo delty

Aby zdefiniować widok w języku Python, zastosuj @view dekorator. @table Podobnie jak dekorator, można użyć widoków w tabelach delta live dla zestawów danych statycznych lub przesyłanych strumieniowo. Poniżej przedstawiono składnię definiującą widoki przy użyciu języka Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Przykład: Definiowanie tabel i widoków

Aby zdefiniować tabelę lub widok w języku Python, zastosuj @dlt.view funkcję lub @dlt.table dekorator. Możesz użyć nazwy funkcji lub parametru name , aby przypisać tabelę lub nazwę widoku. W poniższym przykładzie zdefiniowano dwa różne zestawy danych: widok o nazwie taxi_raw , który przyjmuje plik JSON jako źródło wejściowe i tabelę o nazwie filtered_data , która przyjmuje taxi_raw widok jako dane wejściowe:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("LIVE.taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("LIVE.taxi_raw").where(...)

Przykład: uzyskiwanie dostępu do zestawu danych zdefiniowanego w tym samym potoku

Uwaga

dlt.read() Mimo że funkcje i dlt.read_stream() są nadal dostępne i w pełni obsługiwane przez interfejs języka Python funkcji Delta Live Tables, usługa Databricks zaleca zawsze używanie spark.read.table() funkcji i spark.readStream.table() z następujących powodów:

  • Funkcje spark obsługują odczytywanie wewnętrznych i zewnętrznych zestawów danych, w tym zestawów danych w magazynie zewnętrznym lub zdefiniowanych w innych potokach. Funkcje dlt obsługują tylko odczytywanie wewnętrznych zestawów danych.
  • Funkcje spark obsługują określanie opcji, takich jak skipChangeCommits, do odczytu operacji. Określanie opcji nie jest obsługiwane przez dlt funkcje.

Aby uzyskać dostęp do zestawu danych zdefiniowanego w tym samym potoku, użyj spark.read.table() funkcji lub spark.readStream.table() , poprzedzając LIVE słowo kluczowe nazwą zestawu danych:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("LIVE.customers_raw").where(...)

Przykład: odczyt z tabeli zarejestrowanej w magazynie metadanych

Aby odczytać dane z tabeli zarejestrowanej w magazynie metadanych Hive, w argumencie funkcji pomiń LIVE słowo kluczowe i opcjonalnie zakwalifikuj nazwę tabeli o nazwie bazy danych:

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

Aby zapoznać się z przykładem odczytu z tabeli wykazu aparatu Unity, zobacz Pozyskiwanie danych do potoku wykazu aparatu Unity.

Przykład: uzyskiwanie dostępu do zestawu danych przy użyciu spark.sql

Zestaw danych można również zwrócić przy użyciu spark.sql wyrażenia w funkcji zapytania. Aby odczytać z wewnętrznego zestawu danych, należy LIVE. wstępnie otworzyć nazwę zestawu danych:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Tworzenie tabeli, która ma być używana jako element docelowy operacji przesyłania strumieniowego

create_streaming_table() Użyj funkcji , aby utworzyć tabelę docelową dla rekordów wyjściowych przez operacje przesyłania strumieniowego, w tym apply_changes(), apply_changes_from_snapshot()i rekordy wyjściowe @append_flow.

Uwaga

Funkcje create_target_table() i create_streaming_live_table() są przestarzałe. Usługa Databricks zaleca aktualizowanie istniejącego create_streaming_table() kodu w celu korzystania z funkcji.

Uwaga

Aby użyć argumentu cluster_by w celu włączenia klastrowania liquid, potok musi być skonfigurowany do korzystania z kanału w wersji zapoznawczej.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argumenty
name

Typ: str

Nazwa tabeli.

Ten parametr jest wymagany.
comment

Typ: str

Opcjonalny opis tabeli.
spark_conf

Typ: dict

Opcjonalna lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania.
table_properties

Typ: dict

Opcjonalna lista właściwości tabeli.
partition_cols

Typ: array

Opcjonalna lista co najmniej jednej kolumny używanej do partycjonowania tabeli.
cluster_by

Typ: array

Opcjonalnie włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania.

Zobacz Użyj płynnego klastrowania dla tabel typu Delta).
path

Typ: str

Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, system zostanie domyślnie ustawiony na lokalizację przechowywania potoku.
schema

Typ: str lub StructType

Opcjonalna definicja schematu dla tabeli. Schematy można zdefiniować jako ciąg DDL SQL lub w języku Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Typ: dict

Opcjonalne ograniczenia dotyczące jakości danych dla tabeli. Zobacz wiele oczekiwań.
row_filter (Publiczna wersja zapoznawcza)

Typ: str

Opcjonalna klauzula filtru wierszy dla tabeli. Zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.

Kontrolowanie sposobu materializacji tabel

Tabele oferują również dodatkową kontrolę nad ich materializacją:

  • Określ sposób partycjonowania tabel przy użyciu polecenia partition_cols. Możesz użyć partycjonowania, aby przyspieszyć wykonywanie zapytań.
  • Właściwości tabeli można ustawić podczas definiowania widoku lub tabeli. Zobacz Właściwości tabeli Tabele na żywo funkcji Delta.
  • Ustaw lokalizację przechowywania danych tabeli przy użyciu path ustawienia . Domyślnie dane tabeli są przechowywane w lokalizacji magazynu potoku, jeśli path nie są ustawione.
  • W definicji schematu można użyć wygenerowanych kolumn . Zobacz Przykład: określanie schematu i kolumn partycji.

Uwaga

W przypadku tabel o rozmiarze mniejszym niż 1 TB usługa Databricks zaleca umożliwienie usłudze Delta Live Tables kontrolowania organizacji danych. Nie należy określać kolumn partycji, chyba że spodziewasz się, że tabela przekroczy terabajt.

Przykład: określanie schematu i kolumn partycji

Opcjonalnie można określić schemat tabeli przy użyciu języka Python StructType lub ciągu DDL SQL. Po określeniu ciągu DDL definicja może zawierać wygenerowane kolumny.

Poniższy przykład tworzy tabelę o nazwie sales ze schematem określonym przy użyciu języka Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Poniższy przykład określa schemat tabeli przy użyciu ciągu DDL, definiuje wygenerowaną kolumnę i definiuje kolumnę partycji:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Domyślnie tabele na żywo delty wywnioskują schemat z table definicji, jeśli nie określisz schematu.

Konfigurowanie tabeli przesyłania strumieniowego w celu ignorowania zmian w źródłowej tabeli przesyłania strumieniowego

Uwaga

Domyślnie tabele przesyłania strumieniowego wymagają źródeł tylko do dołączania. Gdy tabela przesyłania strumieniowego używa innej tabeli przesyłania strumieniowego jako źródła, a źródłowa tabela przesyłania strumieniowego wymaga aktualizacji lub usunięcia, na przykład przetwarzania RODO "prawo do zapomnienia" skipChangeCommits można ustawić flagę podczas odczytywania źródłowej tabeli przesyłania strumieniowego w celu zignorowania tych zmian. Aby uzyskać więcej informacji na temat tej flagi, zobacz Ignorowanie aktualizacji i usuwania.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Przykład: Definiowanie ograniczeń tabeli

Ważne

Ograniczenia tabel są dostępne w publicznej wersji zapoznawczej.

Podczas określania schematu można zdefiniować klucze podstawowe i obce. Ograniczenia są informacyjne i nie są wymuszane. Zobacz klauzulę CONSTRAINT w dokumentacji języka SQL.

W poniższym przykładzie zdefiniowano tabelę z ograniczeniem klucza podstawowego i obcego:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Przykład: Definiowanie filtru wierszy i maski kolumn

Ważne

Filtry wierszy i maski kolumn są w publicznej wersji zapoznawczej.

Aby utworzyć zmaterializowany widok lub tabelę przesyłania strumieniowego z filtrem wierszy i maską kolumn, użyj klauzuli ROW FILTER i klauzuli MASK. W poniższym przykładzie pokazano, jak zdefiniować zmaterializowany widok i tabelę przesyłania strumieniowego z filtrem wierszy i maską kolumn:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Aby uzyskać więcej informacji na temat filtrów wierszy i masek kolumn, zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.

Właściwości tabel delta live języka Python

W poniższych tabelach opisano opcje i właściwości, które można określić podczas definiowania tabel i widoków za pomocą tabel różnicowych na żywo:

Uwaga

Aby użyć argumentu cluster_by w celu włączenia klastrowania liquid, potok musi być skonfigurowany do korzystania z kanału w wersji zapoznawczej.

@table lub @view
name

Typ: str

Opcjonalna nazwa tabeli lub widoku. Jeśli nie jest zdefiniowana, nazwa funkcji jest używana jako nazwa tabeli lub widoku.
comment

Typ: str

Opcjonalny opis tabeli.
spark_conf

Typ: dict

Opcjonalna lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania.
table_properties

Typ: dict

Opcjonalna lista właściwości tabeli.
path

Typ: str

Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, system zostanie domyślnie ustawiony na lokalizację przechowywania potoku.
partition_cols

Typ: a collection of str

Opcjonalna kolekcja, na przykład jedna list lub więcej kolumn do użycia na potrzeby partycjonowania tabeli.
cluster_by

Typ: array

Opcjonalnie włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania.

Zobacz Użyj płynnego klastrowania dla tabel typu Delta).
schema

Typ: str lub StructType

Opcjonalna definicja schematu dla tabeli. Schematy można zdefiniować jako ciąg DDL SQL lub za pomocą języka Python StructType.
temporary

Typ: bool

Utwórz tabelę, ale nie publikuj metadanych dla tabeli. Słowo temporary kluczowe instruuje tabele delta Live Tables, aby utworzyć tabelę dostępną dla potoku, ale nie należy uzyskiwać dostępu poza potokiem. Aby skrócić czas przetwarzania, tabela tymczasowa utrzymuje się przez okres istnienia potoku, który go tworzy, a nie tylko pojedynczą aktualizację.

Wartość domyślna to "False".
row_filter (Publiczna wersja zapoznawcza)

Typ: str

Opcjonalna klauzula filtru wierszy dla tabeli. Zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.
Definicja tabeli lub widoku
def <function-name>()

Funkcja języka Python, która definiuje zestaw danych. name Jeśli parametr nie jest ustawiony, <function-name> zostanie użyty jako nazwa docelowego zestawu danych.
query

Instrukcja Spark SQL zwracająca zestaw danych Platformy Spark lub ramkę danych Koalas.

Użyj dlt.read() polecenia lub spark.read.table() , aby wykonać pełny odczyt z zestawu danych zdefiniowanego w tym samym potoku. Aby odczytać zewnętrzny zestaw danych, użyj spark.read.table() funkcji . Nie można użyć dlt.read() polecenia do odczytywania zewnętrznych zestawów danych. Ponieważ spark.read.table() może służyć do odczytywania wewnętrznych zestawów danych, zestawów danych zdefiniowanych poza bieżącym potokiem i umożliwia określenie opcji odczytywania danych, usługa Databricks zaleca używanie go zamiast dlt.read() funkcji.

Gdy używasz spark.read.table() funkcji do odczytu z zestawu danych zdefiniowanego w tym samym potoku, poprzedzając LIVE słowo kluczowe nazwą zestawu danych w argumencie funkcji. Aby na przykład odczytać z zestawu danych o nazwie customers:

spark.read.table("LIVE.customers")

Możesz również użyć spark.read.table() funkcji do odczytu z tabeli zarejestrowanej w magazynie metadanych, pomijając LIVE słowo kluczowe i opcjonalnie kwalifikując nazwę tabeli o nazwie bazy danych:

spark.read.table("sales.customers")

Użyj polecenia dlt.read_stream() lub spark.readStream.table() , aby wykonać odczyt strumieniowy z zestawu danych zdefiniowanego w tym samym potoku. Aby wykonać odczyt strumieniowy z zewnętrznego zestawu danych, użyj elementu
spark.readStream.table() funkcja. Ponieważ spark.readStream.table() może służyć do odczytywania wewnętrznych zestawów danych, zestawów danych zdefiniowanych poza bieżącym potokiem i umożliwia określenie opcji odczytywania danych, usługa Databricks zaleca używanie go zamiast dlt.read_stream() funkcji.

Aby zdefiniować zapytanie w funkcji Delta Live Tables table przy użyciu składni SQL, użyj spark.sql funkcji . Zobacz Przykład: uzyskiwanie dostępu do zestawu danych przy użyciu spark.sql. Aby zdefiniować zapytanie w funkcji Delta Live Tables table przy użyciu języka Python, użyj składni PySpark .
Oczekiwania
@expect("description", "constraint")

Deklarowanie ograniczenia jakości danych zidentyfikowanego przez
description. Jeśli wiersz narusza oczekiwania, uwzględnij wiersz w docelowym zestawie danych.
@expect_or_drop("description", "constraint")

Deklarowanie ograniczenia jakości danych zidentyfikowanego przez
description. Jeśli wiersz narusza oczekiwania, upuść wiersz z docelowego zestawu danych.
@expect_or_fail("description", "constraint")

Deklarowanie ograniczenia jakości danych zidentyfikowanego przez
description. Jeśli wiersz narusza oczekiwania, natychmiast zatrzymaj wykonywanie.
@expect_all(expectations)

Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych.
expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, uwzględnij wiersz w docelowym zestawie danych.
@expect_all_or_drop(expectations)

Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych.
expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, upuść wiersz z docelowego zestawu danych.
@expect_all_or_fail(expectations)

Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych.
expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, natychmiast zatrzymaj wykonywanie.

Zmienianie przechwytywania danych z zestawienia zmian za pomocą języka Python w tabelach delta live tables

apply_changes() Użyj funkcji w interfejsie API języka Python, aby używać funkcji przechwytywania zmian danych w tabelach delta Live Tables (CDC) do przetwarzania danych źródłowych ze źródła danych zmian (CDF).

Ważne

Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu apply_changes() tabeli docelowej należy uwzględnić __START_AT kolumny i __END_AT z tym samym typem danych co sequence_by pola.

Aby utworzyć wymaganą tabelę docelową, możesz użyć funkcji create_streaming_table() w interfejsie języka Python delta Live Tables.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Uwaga

W przypadku APPLY CHANGES przetwarzania domyślne zachowanie dla INSERT zdarzeń i UPDATE dotyczy upsert zdarzeń CDC ze źródła: zaktualizować wszystkie wiersze w tabeli docelowej zgodne z określonymi kluczami lub wstawić nowy wiersz, gdy pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE można określić za pomocą APPLY AS DELETE WHEN warunku.

Aby dowiedzieć się więcej na temat przetwarzania CDC za pomocą zestawienia zmian, zobacz Interfejsy API ZASTOSUJ ZMIANY: Upraszczanie przechwytywania zmian przy użyciu tabel funkcji Delta Live Tables. Aby zapoznać się z przykładem użycia apply_changes() funkcji, zobacz Przykład: typ SCD 1 i typ SCD 2 przetwarzanie z danymi źródłowymi CDF.

Ważne

Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu apply_changes tabeli docelowej należy uwzględnić __START_AT kolumny i __END_AT z tym samym typem sequence_by danych co pole.

Zobacz Interfejsy API ZASTOSUJ ZMIANY: upraszczanie przechwytywania danych zmian za pomocą tabel różnicowych na żywo.

Argumenty
target

Typ: str

Nazwa tabeli do zaktualizowania. Za pomocą funkcji create_streaming_table() można utworzyć tabelę docelową przed wykonaniem apply_changes() funkcji.

Ten parametr jest wymagany.
source

Typ: str

Źródło danych zawierające rekordy CDC.

Ten parametr jest wymagany.
keys

Typ: list

Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej.

Możesz określić jedną z następujących opcji:

- Lista ciągów: ["userId", "orderId"]
- Lista funkcji Spark SQL col() : [col("userId"), col("orderId"]

Argumenty funkcji nie mogą zawierać col() kwalifikatorów. Można na przykład użyć elementu col(userId), ale nie można użyć polecenia col(source.userId).

Ten parametr jest wymagany.
sequence_by

Typ: str lub col()

Nazwa kolumny określająca kolejność logiczną zdarzeń CDC w danych źródłowych. Funkcja Delta Live Tables używa tej sekwencjonowania w celu obsługi zdarzeń zmiany, które docierają poza kolejność.

Możesz określić jedną z następujących opcji:

- Ciąg: "sequenceNum"
- Funkcja Spark SQL col() : col("sequenceNum")

Argumenty funkcji nie mogą zawierać col() kwalifikatorów. Można na przykład użyć elementu col(userId), ale nie można użyć polecenia col(source.userId).

Określona kolumna musi być sortowalnym typem danych.

Ten parametr jest wymagany.
ignore_null_updates

Typ: bool

Zezwalaj na pozyskiwanie aktualizacji zawierających podzestaw kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i ignore_null_updates ma Truewartość , kolumny z zachowaniem null istniejących wartości w obiekcie docelowym. Dotyczy to również zagnieżdżonych kolumn z wartością null. Gdy ignore_null_updates wartość to False, istniejące wartości są zastępowane wartościami null .

Ten parametr jest opcjonalny.

Wartość domyślna to False.
apply_as_deletes

Typ: str lub expr()

Określa, kiedy zdarzenie CDC powinno być traktowane jako DELETE a nie upsert. Aby obsłużyć dane poza kolejnością, usunięty wiersz jest tymczasowo zachowywany jako grób w bazowej tabeli delty, a widok jest tworzony w magazynie metadanych, który filtruje te grobowce. Interwał przechowywania można skonfigurować za pomocą polecenia
pipelines.cdc.tombstoneGCThresholdInSeconds właściwość tabeli.

Możesz określić jedną z następujących opcji:

- Ciąg: "Operation = 'DELETE'"
- Funkcja Spark SQL expr() : expr("Operation = 'DELETE'")

Ten parametr jest opcjonalny.
apply_as_truncates

Typ: str lub expr()

Określa, kiedy zdarzenie CDC powinno być traktowane jako pełna tabela TRUNCATE. Ponieważ ta klauzula wyzwala pełny obcięcie tabeli docelowej, powinna być używana tylko w określonych przypadkach użycia wymagających tej funkcji.

Parametr apply_as_truncates jest obsługiwany tylko dla typu SCD 1. Typ SCD 2 nie obsługuje operacji obcięć.

Możesz określić jedną z następujących opcji:

- Ciąg: "Operation = 'TRUNCATE'"
- Funkcja Spark SQL expr() : expr("Operation = 'TRUNCATE'")

Ten parametr jest opcjonalny.
column_list

except_column_list

Typ: list

Podzbiór kolumn do uwzględnienia w tabeli docelowej. Użyj column_list polecenia , aby określić pełną listę kolumn do uwzględnienia. Użyj except_column_list polecenia , aby określić kolumny do wykluczenia. Możesz zadeklarować wartość jako listę ciągów lub jako funkcje Spark SQL col() :

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

Argumenty funkcji nie mogą zawierać col() kwalifikatorów. Można na przykład użyć elementu col(userId), ale nie można użyć polecenia col(source.userId).

Ten parametr jest opcjonalny.

Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy do funkcji nie zostanie przekazany żaden column_list argument lub except_column_list argument.
stored_as_scd_type

Typ: str lub int

Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2.

Ustaw wartość 1 dla typu SCD 1 lub 2 dla typu SCD 2.

Ta klauzula jest opcjonalna.

Wartość domyślna to SCD typ 1.
track_history_column_list

track_history_except_column_list

Typ: list

Podzbiór kolumn wyjściowych do śledzenia historii w tabeli docelowej. Użyj track_history_column_list polecenia , aby określić pełną listę kolumn do śledzenia. Używanie
track_history_except_column_list aby określić kolumny, które mają być wykluczone ze śledzenia. Możesz zadeklarować wartość jako listę ciągów lub jako funkcje Spark SQL col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenty funkcji nie mogą zawierać col() kwalifikatorów. Można na przykład użyć elementu col(userId), ale nie można użyć polecenia col(source.userId).

Ten parametr jest opcjonalny.

Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, jeśli nie track_history_column_list lub
track_history_except_column_list argument jest przekazywany do funkcji .

Zmienianie przechwytywania danych z migawek bazy danych za pomocą języka Python w tabelach delta live

Ważne

Interfejs APPLY CHANGES FROM SNAPSHOT API jest w publicznej wersji zapoznawczej.

apply_changes_from_snapshot() Użyj funkcji w interfejsie API języka Python, aby używać funkcji przechwytywania zmian danych funkcji delta live tables (CDC) do przetwarzania danych źródłowych z migawek bazy danych.

Ważne

Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu apply_changes_from_snapshot() tabeli docelowej należy również uwzględnić __START_AT kolumny i __END_AT o tym samym typie sequence_by danych co pole.

Aby utworzyć wymaganą tabelę docelową, możesz użyć funkcji create_streaming_table() w interfejsie języka Python delta Live Tables.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Uwaga

W przypadku APPLY CHANGES FROM SNAPSHOT przetwarzania domyślne zachowanie polega na wstawieniu nowego wiersza, gdy pasujący rekord z tymi samymi kluczami nie istnieje w obiekcie docelowym. Jeśli istnieje pasujący rekord, jest aktualizowany tylko wtedy, gdy którakolwiek z wartości w wierszu uległa zmianie. Wiersze z kluczami obecnymi w obiekcie docelowym, ale nie są już obecne w źródle.

Aby dowiedzieć się więcej na temat przetwarzania CDC za pomocą migawek, zobacz Interfejsy API ZASTOSUJ ZMIANY: Upraszczanie przechwytywania zmian przy użyciu tabel delta live. Przykłady użycia funkcji można znaleźć w przykładach okresowego pozyskiwania apply_changes_from_snapshot() migawek i historycznych przykładów pozyskiwania migawek.

Argumenty
target

Typ: str

Nazwa tabeli do zaktualizowania. Za pomocą funkcji create_streaming_table() można utworzyć tabelę docelową przed uruchomieniem apply_changes() funkcji.

Ten parametr jest wymagany.
source

Typ: str lub lambda function

Nazwa tabeli lub widoku do migawki okresowo lub funkcja lambda języka Python, która zwraca ramkę danych migawki do przetworzenia i wersję migawki. Zobacz Implementowanie argumentu źródłowego.

Ten parametr jest wymagany.
keys

Typ: list

Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej.

Możesz określić jedną z następujących opcji:

- Lista ciągów: ["userId", "orderId"]
- Lista funkcji Spark SQL col() : [col("userId"), col("orderId"]

Argumenty funkcji nie mogą zawierać col() kwalifikatorów. Można na przykład użyć elementu col(userId), ale nie można użyć polecenia col(source.userId).

Ten parametr jest wymagany.
stored_as_scd_type

Typ: str lub int

Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2.

Ustaw wartość 1 dla typu SCD 1 lub 2 dla typu SCD 2.

Ta klauzula jest opcjonalna.

Wartość domyślna to SCD typ 1.
track_history_column_list

track_history_except_column_list

Typ: list

Podzbiór kolumn wyjściowych do śledzenia historii w tabeli docelowej. Użyj track_history_column_list polecenia , aby określić pełną listę kolumn do śledzenia. Używanie
track_history_except_column_list aby określić kolumny, które mają być wykluczone ze śledzenia. Możesz zadeklarować wartość jako listę ciągów lub jako funkcje Spark SQL col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenty funkcji nie mogą zawierać col() kwalifikatorów. Można na przykład użyć elementu col(userId), ale nie można użyć polecenia col(source.userId).

Ten parametr jest opcjonalny.

Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, jeśli nie track_history_column_list lub
track_history_except_column_list argument jest przekazywany do funkcji .

Implementowanie argumentu source

Funkcja apply_changes_from_snapshot() zawiera source argument . W przypadku przetwarzania migawek historycznych argument powinien być funkcją lambda języka Python, source która zwraca dwie wartości do apply_changes_from_snapshot() funkcji: ramkę danych języka Python zawierającą dane migawki do przetworzenia i wersję migawki.

Poniżej znajduje się podpis funkcji lambda:

lambda Any => Optional[(DataFrame, Any)]
  • Argument funkcji lambda jest ostatnio przetworzoną wersją migawki.
  • Wartość zwracana funkcji lambda to None lub krotka dwóch wartości: Pierwsza wartość krotki to ramka danych zawierająca migawkę do przetworzenia. Drugą wartością krotki jest wersja migawki, która reprezentuje kolejność logiczną migawki.

Przykład, który implementuje i wywołuje funkcję lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Środowisko uruchomieniowe delta Live Tables wykonuje następujące kroki za każdym razem, gdy potok zawierający apply_changes_from_snapshot() funkcję jest wyzwalany:

  1. Uruchamia funkcję w next_snapshot_and_version celu załadowania następnej ramki danych migawki i odpowiedniej wersji migawki.
  2. Jeśli żadna ramka danych nie zostanie zwrócona, przebieg zostanie zakończony, a aktualizacja potoku zostanie oznaczona jako ukończona.
  3. Wykrywa zmiany w nowej migawki i przyrostowo stosuje je do tabeli docelowej.
  4. Wraca do kroku 1, aby załadować następną migawkę i jego wersję.

Ograniczenia

Interfejs delta Live Tables w języku Python ma następujące ograniczenie:

Funkcja nie jest obsługiwana pivot() . Operacja pivot na platformie Spark wymaga chętnego ładowania danych wejściowych w celu obliczenia schematu wyjściowego. Ta funkcja nie jest obsługiwana w tabelach delta live.