Dokumentacja języka PYTHON DLT
Ten artykuł zawiera szczegółowe informacje dotyczące interfejsu programowania języka Python DLT.
Aby uzyskać informacje na temat interfejsu API SQL, zobacz dokumentację języka SQL DLT.
Aby uzyskać szczegółowe informacje dotyczące konfigurowania automatycznego modułu ładującego, zobacz Co to jest moduł automatycznego ładowania?.
Przed rozpoczęciem
Poniżej przedstawiono ważne zagadnienia dotyczące implementowania potoków za pomocą interfejsu JĘZYKA Python DLT:
- Ponieważ funkcje
table()
iview()
języka Python są wywoływane wiele razy podczas planowania i uruchamiania aktualizacji potoku, nie dołączaj kodu w jednej z tych funkcji, które mogą mieć skutki uboczne (na przykład kod modyfikujący dane lub wysyłający wiadomość e-mail). Aby uniknąć nieoczekiwanego zachowania, funkcje języka Python definiujące zestawy danych powinny zawierać tylko kod wymagany do zdefiniowania tabeli lub widoku. - Aby wykonywać operacje, takie jak wysyłanie wiadomości e-mail lub integrowanie z zewnętrzną usługą monitorowania, użyj haków zdarzeń, szczególnie w funkcjach definiujących zestawy danych. Zaimplementowanie tych operacji w funkcjach definiujących zestawy danych spowoduje nieoczekiwane zachowanie.
- Funkcje
table
iview
języka Python muszą zwracać ramkę danych. Niektóre funkcje działające na ramkach danych nie zwracają ramek danych i nie powinny być używane. Te operacje obejmują funkcje, takie jakcollect()
,count()
,toPandas()
,save()
isaveAsTable()
. Ponieważ transformacje ramki danych są wykonywane po, gdy został rozwiązany graf pełnego przepływu danych, użycie takich operacji może mieć niezamierzone skutki uboczne.
Importowanie modułu dlt
Python
Funkcje języka Python DLT są definiowane w module dlt
. Potoki zaimplementowane za pomocą interfejsu API języka Python muszą zaimportować ten moduł:
import dlt
Utwórz zmaterializowany widok DLT lub tabelę przesyłania strumieniowego
W języku Python DLT określa, czy zestaw danych ma być aktualizowany jako zmaterializowany widok, czy też tabela przesyłania strumieniowego w oparciu o definiujące zapytanie. Dekoratora @table
można używać zarówno do definiowania zmaterializowanych widoków, jak i tabel strumieniowych.
Aby zdefiniować zmaterializowany widok w języku Python, zastosuj @table
do zapytania wykonującego statyczny odczyt względem źródła danych. Aby zdefiniować tabelę przesyłania strumieniowego, zastosuj @table
do zapytania, które wykonuje odczyt strumieniowy względem źródła danych lub użyj funkcji create_streaming_table(). Oba typy zestawów danych mają tę samą specyfikację składni w następujący sposób:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Tworzenie widoku DLT
Aby zdefiniować widok w języku Python, zastosuj dekorator @view
. Podobnie jak dekorator @table
, można użyć widoków w technologii DLT dla zestawów danych statycznych lub przesyłanych strumieniowo. Poniżej przedstawiono składnię definiującą widoki przy użyciu języka Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Przykład: Definiowanie tabel i widoków
Aby zdefiniować tabelę lub widok w języku Python, zastosuj @dlt.view
lub dekorator @dlt.table
do funkcji. Możesz użyć nazwy funkcji lub parametru name
, aby przypisać tabelę lub nazwę widoku. Poniższy przykład definiuje dwa różne zestawy danych: widok o nazwie taxi_raw
, który przyjmuje plik JSON jako źródło wejściowe i tabelę o nazwie filtered_data
, która przyjmuje widok taxi_raw
jako dane wejściowe:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("taxi_raw").where(...)
Przykład: uzyskiwanie dostępu do zestawu danych zdefiniowanego w tym samym potoku
Uwaga
Mimo że funkcje dlt.read()
i dlt.read_stream()
są nadal dostępne i w pełni obsługiwane przez interfejs DLT w języku Python, usługa Databricks zaleca zawsze używanie funkcji spark.read.table()
i spark.readStream.table()
z następujących powodów:
- Funkcje
spark
obsługują odczytywanie zarówno wewnętrznych, jak i zewnętrznych zestawów danych, w tym tych zapisanych w magazynie zewnętrznym lub zdefiniowanych w innych potokach. Funkcjedlt
obsługują tylko odczytywanie wewnętrznych zestawów danych. - Funkcje
spark
obsługują określanie opcji, takich jakskipChangeCommits
, w celu odczytu operacji. Określanie opcji nie jest obsługiwane przez funkcjedlt
.
Aby uzyskać dostęp do zestawu danych zdefiniowanego w tym samym potoku, użyj funkcji spark.read.table()
lub spark.readStream.table()
.
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("customers_raw").where(...)
Notatka
Podczas wykonywania zapytań dotyczących widoków lub tabel w potoku można bezpośrednio określić wykaz i schemat lub użyć wartości domyślnych skonfigurowanych w potoku. W tym przykładzie tabela customers
jest zapisywana i odczytywana z domyślnego katalogu i schematu skonfigurowanych dla twojego potoku.
Przykład: odczyt z tabeli zarejestrowanej w magazynie metadanych
Aby odczytać dane z tabeli zarejestrowanej w magazynie metadanych Hive, w argumencie funkcji można zakwalifikować nazwę tabeli przy użyciu nazwy bazy danych:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Aby zapoznać się z przykładem odczytu z tabeli w katalogu Unity, zobacz Pozyskiwanie danych do potoku katalogu Unity.
przykład : uzyskiwanie dostępu do zestawu danych przy użyciu spark.sql
Zestaw danych można również zwrócić przy użyciu wyrażenia spark.sql
w funkcji zapytania. Aby odczytać z wewnętrznego zestawu danych, możesz pozostawić nazwę bez kwalifikacji, aby użyć domyślnego wykazu i schematu, lub można je wstępnie utworzyć:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")
Trwale usuń rekordy z zmaterializowanego widoku lub tabeli przesyłania strumieniowego
Aby trwale usunąć rekordy z zmaterializowanego widoku lub tabeli przesyłania strumieniowego z włączonymi wektorami usuwania, takimi jak zgodność z RODO, należy wykonać dodatkowe operacje na podstawowych tabelach delty obiektu. Aby zapewnić usunięcie rekordów z zmaterializowanego widoku, zobacz Trwałe usuwanie rekordów z zmaterializowanego widoku z włączonymi wektorami usuwania. Aby zapewnić usunięcie rekordów z tabeli przesyłania strumieniowego, zobacz Trwałe usuwanie rekordów z tabeli przesyłania strumieniowego.
zapisywanie w zewnętrznych usługach przesyłania strumieniowego zdarzeń lub w tabelach delta przy użyciu interfejsu API sink
DLT
Notatka
- Uruchomienie aktualizacji pełnej odświeżania nie powoduje wyczyszczenia danych z ujść. Wszystkie ponownie przetworzone dane zostaną dołączone do ujścia, a istniejące dane nie zostaną zmienione.
- Oczekiwania DLT nie są obsługiwane w API
sink
.
Aby zapisać dane do usługi przesyłania strumieniowego zdarzeń, takiej jak Apache Kafka lub Azure Event Hubs, bądź do tabeli Delta z potoku DLT, użyj funkcji create_sink()
zawartej w module dlt
Python. Po utworzeniu ujścia za pomocą funkcji create_sink()
należy użyć ujścia w przepływie dołączania do zapisywania danych w ujściu. Przepływ dołączania jest jedynym typem przepływu obsługiwanym przez funkcję create_sink()
. Inne typy przepływów, takie jak apply_changes
, nie są obsługiwane.
Poniżej przedstawiono składnię umożliwiającą utworzenie ujścia za pomocą funkcji create_sink()
:
create_sink(<sink_name>, <format>, <options>)
Argumenty |
---|
name Typ: str Ciąg, który identyfikuje ujście i służy do odwoływania się do niego oraz zarządzania nim. Nazwy wyjść muszą być unikatowe dla pipeline'u, we wszystkich fragmentach kodu źródłowego, takich jak notatniki lub moduły będące częścią pipeline'u. Ten parametr jest wymagany. |
format Typ: str Ciąg definiujący format danych wyjściowych kafka lub delta .Ten parametr jest wymagany. |
options Typ: dict Opcjonalna lista opcji ujścia, sformatowana jako {"key": "value"} , gdzie zarówno klucz, jak i wartość są ciągami znaków. Obsługiwane są wszystkie opcje środowiska Databricks Runtime, które są obsługiwane przez wyjścia Kafka i Delta. Aby uzyskać informacje o opcjach Kafka, zobacz Konfigurowanie zapisywania przesyłania strumieniowego w Kafka. Aby uzyskać informacje o opcjach funkcji delta, zobacz tabelę delty jako ujście. |
Przykład: tworzenie sinku dla platformy Kafka za pomocą funkcji create_sink()
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
Przykład: tworzenie ujścia delty za pomocą funkcji create_sink()
i ścieżki systemu plików
Poniższy przykład tworzy odbiornik danych, który zapisuje do tabeli Delta, przekazując ścieżkę systemu plików do tej tabeli.
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)
Przykład: tworzenie ujścia Delta za pomocą funkcji create_sink()
oraz nazwy tabeli w Unity Catalog
Notatka
Zlew Delta supportuje zewnętrzne i zarządzane tabele Unity Catalog oraz zarządzane tabele w magazynie metadanych Hive. Nazwy tabel muszą być jednoznacznie określone. Na przykład tabele Unity Catalog muszą używać identyfikatora trójpoziomowego: <catalog>.<schema>.<table>
. Tabele metadanych Hive muszą używać <schema>.<table>
.
Poniższy przykład tworzy ujście, które zapisuje w tabeli Delta, podając nazwę tabeli w katalogu Unity.
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)
Przykład: użycie przepływu dołączania do pisania do delta sinka
Poniższy przykład tworzy miejsce docelowe, które zapisuje dane do tabeli Delta, a następnie tworzy proces dołączania, aby zapisywać w tym miejscu docelowym.
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
Przykład: używanie przepływu dołączania do zapisu w ujściu platformy Kafka
Poniższy przykład tworzy ujście, które zapisuje w temacie platformy Kafka, a następnie tworzy przepływ dołączania do zapisu w tym ujściu:
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))
Schemat ramki danych zapisanej na platformie Kafka powinien zawierać kolumny określone w Konfigurowanie składnika zapisywania przesyłania strumieniowego ze strukturą platformy Kafka.
Utwórz tabelę do użycia jako element docelowy operacji przesyłania strumieniowego
Użyj funkcji create_streaming_table()
, aby utworzyć tabelę docelową dla rekordów generowanych przez operacje przesyłania strumieniowego, w tym apply_changes(), apply_changes_from_snapshot()i @append_flow.
Notatka
Funkcje create_target_table()
i create_streaming_live_table()
są przestarzałe. Usługa Databricks zaleca aktualizowanie istniejącego kodu w celu korzystania z funkcji create_streaming_table()
.
create_streaming_table(
name = "<table-name>",
comment = "<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumenty |
---|
name Typ: str Nazwa tabeli. Ten parametr jest wymagany. |
comment Typ: str Opcjonalny opis tabeli. |
spark_conf Typ: dict Opcjonalna lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania. |
table_properties Typ: dict Opcjonalna lista właściwości tabeli dla tabeli. |
partition_cols Typ: array Opcjonalna lista co najmniej jednej kolumny używanej do partycjonowania tabeli. |
cluster_by Typ: array Opcjonalnie włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania. Zobacz Użyj klastrowania cieczy dla tabel Delta. |
path Typ: str Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, system zostanie domyślnie ustawiony na lokalizację przechowywania potoku. |
schema Typ: str lub StructType Opcjonalna definicja schematu dla tabeli. Schematy można zdefiniować jako ciąg DDL SQL lub w języku Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Typ: dict Opcjonalne ograniczenia dotyczące jakości danych dla tabeli. Zobacz różne oczekiwania. |
row_filter (publiczna wersja zapoznawcza)Typ: str Opcjonalna klauzula filtru wierszy dla tabeli. Zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn. |
Kontrolowanie sposobu materializacji tabel
Tabele oferują również dodatkową kontrolę nad ich materializacją:
- Określ sposób tabel klastra przy użyciu
cluster_by
. Możesz użyć płynnego klastrowania, aby przyspieszyć zapytania. Zobacz Użyj klastrowania płynnego dla tabel Delta. - Określ sposób partycjonowania tabel przy użyciu
partition_cols
. - Właściwości tabeli można ustawić podczas definiowania widoku lub tabeli. Zobacz właściwości tabeli DLT .
- Ustaw lokalizację przechowywania danych tabeli przy użyciu ustawienia
path
. Domyślnie dane tabeli są przechowywane w lokalizacji magazynu potoku, jeślipath
nie jest ustawiona. - W definicji schematu można użyć wygenerowanych kolumn . Zobacz Przykład: określ schemat i kolumny klastrowe.
Notatka
W przypadku tabel o rozmiarze mniejszym niż 1 TB usługa Databricks zaleca, aby DLT kontrolowało organizację danych. Nie należy określać kolumn partycji, chyba że spodziewasz się, że tabela przekroczy terabajt.
Przykład: określić schemat i kolumny klastra
Opcjonalnie można określić schemat tabeli przy użyciu języka Python StructType
lub ciągu DDL SQL. Gdy zdefiniowano ciąg DDL, definicja może zawierać kolumny wygenerowane .
Poniższy przykład tworzy tabelę o nazwie sales
ze schematem określonym przy użyciu StructType
języka Python:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
Poniższy przykład określa schemat tabeli przy użyciu ciągu DDL, definiuje wygenerowaną kolumnę i definiuje kolumny klastrowania:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
cluster_by = ["order_day_of_week", "customer_id"])
def sales():
return ("...")
Domyślnie biblioteka DLT wywnioskuje schemat z definicji table
, jeśli nie określisz schematu.
Przykład: określanie kolumn partycji
Poniższy przykład określa schemat tabeli przy użyciu ciągu DDL, definiuje wygenerowaną kolumnę i definiuje kolumnę partycji:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Przykład: Definiowanie ograniczeń tabeli
Podczas określania schematu można zdefiniować klucze podstawowe i obce. Ograniczenia są informacyjne i nie są wymuszane. Zobacz klauzulę CONSTRAINT w dokumentacji języka SQL.
W poniższym przykładzie zdefiniowano tabelę z ograniczeniem klucza podstawowego i obcego:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Przykład: Definiowanie filtru wierszy i maski kolumn
Ważny
Filtry wierszy i maski kolumn znajdują się w publicznej wersji zapoznawczej.
Aby utworzyć zmaterializowany widok lub tabelę przesyłania strumieniowego z filtrem wierszy oraz maską kolumn, użyj klauzuli ROW FILTER oraz klauzuli MASK. W poniższym przykładzie pokazano, jak zdefiniować zmaterializowany widok i tabelę przesyłania strumieniowego z filtrem wierszy i maską kolumn:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Aby uzyskać więcej informacji na temat filtrów wierszy i masek kolumn, zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.
Skonfiguruj tabelę streamingową, aby ignorować zmiany w źródłowej tabeli streamingowej
Notatka
Domyślnie tabele przesyłania strumieniowego wymagają źródeł tylko do dołączania. Jeśli tabela przesyłania strumieniowego używa innej tabeli przesyłania strumieniowego jako źródła, a źródłowa tabela przesyłania strumieniowego wymaga aktualizacji lub usunięcia, na przykład przetwarzania RODO "prawo do zapomnienia", podczas odczytywania źródłowej tabeli przesyłania strumieniowego można ustawić flagę skipChangeCommits
w celu zignorowania tych zmian. Aby uzyskać więcej informacji na temat tej flagi, zobacz Ignoruj aktualizacje i usuwa.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
właściwości DLT w Pythonie
W poniższych tabelach opisano opcje i właściwości, które można określić podczas definiowania tabel i widoków za pomocą biblioteki DLT:
@table lub @view |
---|
name Typ: str Opcjonalna nazwa tabeli lub widoku. Jeśli nie jest zdefiniowana, nazwa funkcji jest używana jako nazwa tabeli lub widoku. |
comment Typ: str Opcjonalny opis tabeli. |
spark_conf Typ: dict Opcjonalna lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania. |
table_properties Typ: dict Opcjonalna lista właściwości tabeli dla tabeli. |
path Typ: str Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiony, system domyślnie przyjmuje lokalizację przechowywania potoku. |
partition_cols Typ: a collection of str Opcjonalna kolekcja, na przykład list z co najmniej jednej kolumny do partycjonowania tabeli. |
cluster_by Typ: array Opcjonalnie włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania. Zobacz Użyj klastrowania cieczy dla tabel Delta. |
schema Typ: str lub StructType Opcjonalna definicja schematu dla tabeli. Schematy można zdefiniować jako ciąg DDL SQL lub w języku Python StructType . |
temporary Typ: bool Utwórz tabelę, ale nie publikuj metadanych dla tabeli. Słowo kluczowe temporary instruuje DLT o utworzeniu tabeli, która jest dostępna dla potoku, ale nie powinna być używana poza potokiem. Aby skrócić czas przetwarzania, tabela tymczasowa utrzymuje się przez okres istnienia potoku, który ją tworzy, a nie tylko pojedynczej aktualizacji.Wartość domyślna to "False". |
row_filter (publiczna wersja zapoznawcza)Typ: str Opcjonalna klauzula filtru wierszy dla tabeli. Zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn. |
Definicja tabeli lub widoku |
---|
def <function-name>() Funkcja języka Python, która definiuje zestaw danych. Jeśli parametr name nie jest ustawiony, <function-name> jest używana jako nazwa docelowego zestawu danych. |
query Instrukcja Spark SQL zwracająca zestaw danych Platformy Spark lub ramkę danych Koalas. Użyj dlt.read() lub spark.read.table() , aby wykonać pełny odczyt z zestawu danych zdefiniowanego w tym samym potoku. Aby odczytać zewnętrzny zestaw danych, użyj funkcji spark.read.table() . Nie można użyć dlt.read() do odczytywania zewnętrznych zestawów danych. Ponieważ spark.read.table() może służyć do odczytywania wewnętrznych zestawów danych, zestawów danych zdefiniowanych poza bieżącym potokiem i umożliwia określenie opcji odczytywania danych, usługa Databricks zaleca użycie go zamiast funkcji dlt.read() .Podczas definiowania zestawu danych w potoku domyślnie będzie on używać wykazu i schematu zdefiniowanego w konfiguracji potoku. Możesz użyć funkcji spark.read.table() do odczytu z zestawu danych zdefiniowanego w ramach procesu bez kwalifikacji. Aby na przykład odczytać z zestawu danych o nazwie customers :spark.read.table("customers") Możesz również użyć funkcji spark.read.table() do odczytu z tabeli zarejestrowanej w magazynie metadanych, opcjonalnie kwalifikując nazwę tabeli o nazwie bazy danych:spark.read.table("sales.customers") Użyj dlt.read_stream() lub spark.readStream.table() , aby wykonać odczyt strumieniowy z zestawu danych zdefiniowanego w tym samym potoku. Aby wykonać odczyt strumieniowy z zewnętrznego zestawu danych, użyjfunkcja spark.readStream.table() . Ponieważ spark.readStream.table() może służyć do odczytywania wewnętrznych zestawów danych, zestawów danych zdefiniowanych poza bieżącym potokiem i umożliwia określenie opcji odczytywania danych, usługa Databricks zaleca użycie go zamiast funkcji dlt.read_stream() .Aby zdefiniować zapytanie w funkcji table DLT przy użyciu składni JĘZYKA SQL, użyj funkcji spark.sql . Zobacz Przykład: uzyskiwanie dostępu do zestawu danych przy użyciu spark.sql . Aby zdefiniować zapytanie w funkcji table DLT przy użyciu języka Python, użyj składni PySpark. |
Oczekiwania |
---|
@expect("description", "constraint") Deklarowanie ograniczenia jakości danych zidentyfikowanego przez description . Jeśli wiersz narusza oczekiwania, uwzględnij wiersz w docelowym zestawie danych. |
@expect_or_drop("description", "constraint") Deklarowanie ograniczenia jakości danych zidentyfikowanego przez description . Jeśli wiersz narusza oczekiwania, upuść wiersz z docelowego zestawu danych. |
@expect_or_fail("description", "constraint") Deklarowanie ograniczenia jakości danych zidentyfikowanego przez description . Jeśli wiersz narusza warunki, natychmiast zatrzymaj wykonywanie. |
@expect_all(expectations) Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych. expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, uwzględnij wiersz w docelowym zestawie danych. |
@expect_all_or_drop(expectations) Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych. expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, upuść wiersz z docelowego zestawu danych. |
@expect_all_or_fail(expectations) Zadeklaruj co najmniej jedno ograniczenie dotyczące jakości danych. expectations to słownik języka Python, w którym klucz jest opisem oczekiwania, a wartość jest ograniczeniem oczekiwania. Jeśli wiersz narusza jakiekolwiek oczekiwania, natychmiast zatrzymaj wykonywanie. |
przechwytywanie danych ze zestawienia zmian za pomocą języka Python w technologii DLT
Użyj funkcji apply_changes()
w interfejsie API języka Python, aby używać funkcjonalności przechwytywania zmian danych dla DLT (CDC) do przetwarzania danych źródłowych z kanału danych o zmianach (CDF).
Ważny
Aby zastosować zmiany, należy zadeklarować tabelę docelową przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu tabeli docelowej apply_changes()
należy uwzględnić kolumny __START_AT
i __END_AT
o tym samym typie danych co pola sequence_by
.
Aby utworzyć wymaganą tabelę docelową, możesz użyć funkcji create_streaming_table() w interfejsie języka Python DLT.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Notatka
W przypadku przetwarzania APPLY CHANGES
domyślne zachowanie INSERT
i zdarzeń UPDATE
polega na upsert zdarzeń CDC ze źródła: zaktualizuj wszystkie wiersze w tabeli docelowej zgodne z określonymi kluczami lub wstaw nowy wiersz, gdy pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE
można określić przy użyciu warunku APPLY AS DELETE WHEN
.
Aby dowiedzieć się więcej na temat przetwarzania CDC z użyciem strumienia zmian, zobacz Interfejsy API APPLY CHANGES: uproszczenie przechwytywania zmian danych przy użyciu DLT. Aby zapoznać się z przykładem użycia funkcji apply_changes()
, zobacz Przykład: przetwarzanie dla typów SCD 1 i SCD 2 przy użyciu danych źródłowych CDF.
Ważny
Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu tabeli docelowej apply_changes
należy uwzględnić kolumny __START_AT
i __END_AT
o tym samym typie danych co pole sequence_by
.
Zobacz Interfejsy API APPLY CHANGES: upraszczanie przechwytywania zmian danych za pomocą DLT.
Argumenty |
---|
target Typ: str Nazwa tabeli, którą należy zaktualizować. Za pomocą funkcji create_streaming_table() można utworzyć tabelę docelową przed wykonaniem funkcji apply_changes() .Ten parametr jest wymagany. |
source Typ: str Źródło danych zawierające rekordy CDC. Ten parametr jest wymagany. |
keys Typ: list Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej. Możesz określić jedną z następujących opcji: - Lista ciągów: ["userId", "orderId"] — Lista funkcji usługi Spark SQL col() : [col("userId"), col("orderId"] Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId) , ale nie można użyć col(source.userId) .Ten parametr jest wymagany. |
sequence_by Typ: str lub col() Nazwa kolumny określająca kolejność logiczną zdarzeń CDC w danych źródłowych. Biblioteka DLT używa tego sekwencjonowania do obsługi zdarzeń zmiany, które docierają z niewłaściwej kolejności. Możesz określić jedną z następujących opcji: - Ciąg: "sequenceNum" - Funkcja col() Spark SQL: col("sequenceNum") Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId) , ale nie można użyć col(source.userId) .Określona kolumna musi być sortowalnym typem danych. Ten parametr jest wymagany. |
ignore_null_updates Typ: bool Umożliwiaj przyjmowanie aktualizacji zawierających podzestaw kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i ignore_null_updates jest True , kolumny z null zachowują istniejące wartości w obiekcie docelowym. To również dotyczy kolumn zagnieżdżonych o wartości null . Gdy ignore_null_updates jest False , istniejące wartości są zastępowane wartościami null .Ten parametr jest opcjonalny. Wartość domyślna to False . |
apply_as_deletes Typ: str lub expr() Określa, kiedy zdarzenie CDC powinno być traktowane jako DELETE , a nie upsert. Aby obsłużyć dane poza kolejnością, usunięty wiersz jest tymczasowo zachowywany jako znacznik w bazowej tabeli Delta, a widok jest tworzony w metastore, który filtruje te znaczniki. Interwał przechowywania można skonfigurować za pomocą poleceniapipelines.cdc.tombstoneGCThresholdInSeconds właściwość tabeli.Możesz określić jedną z następujących opcji: - Ciąg: "Operation = 'DELETE'" - Funkcja expr() Spark SQL: expr("Operation = 'DELETE'") Ten parametr jest opcjonalny. |
apply_as_truncates Typ: str lub expr() Określa, kiedy zdarzenie CDC powinno być traktowane jako pełna tabela TRUNCATE . Ponieważ ta klauzula wyzwala pełny obcięcie tabeli docelowej, powinna być używana tylko w określonych przypadkach użycia wymagających tej funkcji.Parametr apply_as_truncates jest obsługiwany tylko dla typu SCD 1. Typ SCD 2 nie obsługuje operacji ucięcia.Możesz określić jedną z następujących opcji: - Ciąg: "Operation = 'TRUNCATE'" - Funkcja expr() Spark SQL: expr("Operation = 'TRUNCATE'") Ten parametr jest opcjonalny. |
column_list except_column_list Typ: list Podzbiór kolumn do uwzględnienia w tabeli docelowej. Użyj column_list , aby określić pełną listę kolumn do uwzględnienia. Użyj except_column_list , aby określić kolumny do wykluczenia. Wartość można zadeklarować jako listę ciągów lub jako funkcje usługi Spark SQL col() :- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId) , ale nie można użyć col(source.userId) .Ten parametr jest opcjonalny. Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy do funkcji nie zostanie przekazany żaden argument column_list lub except_column_list . |
stored_as_scd_type Typ: str lub int Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2. Ustaw wartość 1 dla typu SCD 1 lub 2 dla typu SCD 2.Ta klauzula jest opcjonalna. Wartość domyślna to SCD typ 1. |
track_history_column_list track_history_except_column_list Typ: list Podzbiór kolumn wyjściowych, które mają być śledzone pod kątem historii w tabeli docelowej. Użyj track_history_column_list , aby określić pełną listę kolumn do śledzenia. Używaćtrack_history_except_column_list określić kolumny, które mają być wykluczone ze śledzenia. Wartość można zadeklarować jako listę ciągów lub jako funkcje usługi Spark SQL col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId) , ale nie można użyć col(source.userId) .Ten parametr jest opcjonalny. Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy brak track_history_column_list lub...track_history_except_column_list argument jest przekazywany do funkcji. |
zmienianie przechwytywania danych z migawek bazy danych za pomocą języka Python w technologii DLT
Ważny
Interfejs API APPLY CHANGES FROM SNAPSHOT
znajduje się w wersji zapoznawczej dla użytkowników publicznych.
Użyj funkcji apply_changes_from_snapshot()
w interfejsie API języka Python, aby używać funkcji przechwytywania zmian DLT (CDC) do przetwarzania danych źródłowych z migawek bazy danych.
Ważny
Aby wprowadzić zmiany, należy zdeklarować docelową tabelę do przesyłania danych. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu tabeli docelowej apply_changes_from_snapshot()
należy również uwzględnić kolumny __START_AT
i __END_AT
o tym samym typie danych co pole sequence_by
.
Aby utworzyć wymaganą tabelę docelową, możesz użyć funkcji create_streaming_table() w interfejsie języka Python DLT.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Notatka
W przypadku przetwarzania APPLY CHANGES FROM SNAPSHOT
domyślne zachowanie polega na wstawieniu nowego wiersza, gdy pasujący rekord z tymi samymi kluczami nie istnieje w obiekcie docelowym. Jeśli istnieje pasujący rekord, jest aktualizowany tylko wtedy, gdy którakolwiek z wartości w wierszu uległa zmianie. Wiersze z kluczami obecnymi w obiekcie docelowym, ale już nieobecnymi w źródle, są usuwane.
Aby dowiedzieć się więcej o przetwarzaniu CDC za pomocą migawek, zobacz Interfejsy API DO ZASTOSOWANIA ZMIAN: Uproszczenie przechwytywania zmian danych za pomocą DLT. Przykłady użycia funkcji apply_changes_from_snapshot()
można znaleźć w przykładach okresowego pozyskiwania migawek i historycznego pozyskiwania migawek.
Argumenty |
---|
target Typ: str Nazwa tabeli do zaktualizowania. Możesz użyć funkcji create_streaming_table(), aby utworzyć tabelę docelową przed uruchomieniem funkcji apply_changes() .Ten parametr jest wymagany. |
source Typ: str lub lambda function Albo nazwa tabeli lub widoku do przechwytywania okresowego, albo funkcja lambda w języku Python, która zwraca ramkę danych do przetworzenia oraz wersję migawki. Zobacz Implementuj argument source .Ten parametr jest wymagany. |
keys Typ: list Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej. Możesz określić jedną z następujących opcji: - Lista ciągów: ["userId", "orderId"] — Lista funkcji usługi Spark SQL col() : [col("userId"), col("orderId"] Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId) , ale nie można użyć col(source.userId) .Ten parametr jest wymagany. |
stored_as_scd_type Typ: str lub int Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2. Ustaw wartość 1 dla typu SCD 1 lub 2 dla typu SCD 2.Ta klauzula jest opcjonalna. Wartość domyślna to SCD typ 1. |
track_history_column_list track_history_except_column_list Typ: list Podzbiór kolumn wyjściowych do śledzenia historii w tabeli docelowej. Użyj track_history_column_list , aby określić pełną listę kolumn do śledzenia. Użyjtrack_history_except_column_list określić kolumny, które mają być wykluczone ze śledzenia. Wartość można zadeklarować jako listę ciągów lub jako funkcje usługi Spark SQL col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumenty col() funkcji nie mogą zawierać kwalifikatorów. Można na przykład użyć col(userId) , ale nie można użyć col(source.userId) .Ten parametr jest opcjonalny. Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, jeśli nie track_history_column_list lubtrack_history_except_column_list argument jest przekazywany do funkcji. |
Implementowanie argumentu source
Funkcja apply_changes_from_snapshot()
zawiera argument source
. W przypadku przetwarzania migawek historycznych argument source
powinien być funkcją lambda języka Python, która zwraca dwie wartości do funkcji apply_changes_from_snapshot()
: ramkę danych języka Python zawierającą dane migawki do przetworzenia i wersję migawki.
Poniżej znajduje się podpis funkcji lambda:
lambda Any => Optional[(DataFrame, Any)]
- Argumentem funkcji lambda jest najnowsza przetworzona wersja migawki.
- Wartość zwracana funkcji lambda jest
None
lub krotka dwóch wartości: Pierwsza wartość krotki to ramka danych zawierająca migawkę do przetworzenia. Drugą wartością krotki jest wersja migawki, która reprezentuje kolejność logiczną migawki.
Przykład, który implementuje i wywołuje funkcję lambda:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
Środowisko uruchomieniowe DLT wykonuje następujące kroki za każdym razem, gdy pipeline zawierający funkcję apply_changes_from_snapshot()
zostaje uruchomiony:
- Uruchamia funkcję
next_snapshot_and_version
, aby załadować następną ramkę migawki i odpowiadającą jej wersję migawki. - Jeśli nie zostanie zwrócony żaden DataFrame, proces zostanie zakończony, a aktualizacja pipeline'u zostanie ukończona.
- Wykrywa zmiany w nowym zrzucie i przyrostowo stosuje je do tabeli docelowej.
- Wraca do kroku nr 1, aby załadować następną migawkę i jej wersję.
Ograniczenia
Interfejs DLT w języku Python ma następujące ograniczenie:
Funkcja pivot()
nie jest obsługiwana. Operacja pivot
na platformie Spark wymaga ładowania danych wejściowych w trybie gorliwym w celu obliczenia schematu wyjściowego. Ta funkcja nie jest obsługiwana w technologii DLT.