Condividi tramite


Informazioni di riferimento sul linguaggio Python DLT

Questo articolo contiene informazioni dettagliate sull'interfaccia di programmazione Python DLT.

Per informazioni sull'API SQL, vedere le informazioni di riferimento sul linguaggio SQL DLT .

Per informazioni dettagliate sulla configurazione del caricatore automatico, vedere Che cos'è il caricatore automatico?.

Prima di iniziare

Di seguito sono riportate considerazioni importanti quando si implementano le pipeline con l'interfaccia Python DLT:

  • Poiché le funzioni di table() Python e view() vengono richiamate più volte durante la pianificazione e l'esecuzione di un aggiornamento della pipeline, non includere il codice in una di queste funzioni che potrebbero avere effetti collaterali, ad esempio codice che modifica i dati o invia un messaggio di posta elettronica. Per evitare comportamenti imprevisti, le funzioni Python che definiscono i set di dati devono includere solo il codice necessario per definire la tabella o la vista.
  • Per eseguire operazioni come l'invio di messaggi di posta elettronica o l'integrazione con un servizio di monitoraggio esterno, in particolare nelle funzioni che definiscono i set di dati, usare hook di eventi. L'implementazione di queste operazioni nelle funzioni che definiscono i set di dati causerà un comportamento imprevisto.
  • Le funzioni table Python e view devono restituire un dataframe. Alcune funzioni che operano su dataframe non restituiscono dataframe e non devono essere usate. Queste operazioni includono funzioni come collect(), count(), toPandas(), save()e saveAsTable(). Poiché le trasformazioni dataframe vengono eseguite dopo il grafico del flusso di dati completo è stato risolto, l'uso di tali operazioni potrebbe avere effetti collaterali imprevisti.

Importare il modulo Python dlt

Le funzioni Python DLT vengono definite nel modulo dlt. Le pipeline implementate con l'API Python devono importare questo modulo:

import dlt

Creare una vista materializzata DLT o una tabella di streaming

In Python DLT determina se aggiornare un set di dati come vista materializzata o tabella di streaming in base alla query di definizione. L'@table decorator può essere usato per definire sia le viste materializzate che le tabelle di streaming.

Per definire una vista materializzata in Python, applicare @table a una query che esegue una lettura statica su un'origine dati. Per definire una tabella di streaming, applicare @table a una query che esegue una lettura in streaming da un'origine dati o usare la funzione create_streaming_table(). Entrambi i tipi di set di dati hanno la stessa specifica di sintassi come indicato di seguito:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Creare una vista DLT

Per definire una visualizzazione in Python, applicare l'@view decorator. Analogamente alla @table decorator, è possibile usare le visualizzazioni in DLT per set di dati statici o di streaming. Di seguito è riportata la sintassi per la definizione delle viste con Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Esempio: Definire tabelle e viste

Per definire una tabella o una vista in Python, applicare il @dlt.view o @dlt.table decorator a una funzione. È possibile usare il nome della funzione o il parametro name per assegnare il nome della tabella o della vista. L'esempio seguente definisce due set di dati diversi: una vista denominata taxi_raw che accetta un file JSON come origine di input e una tabella denominata filtered_data che accetta la vista taxi_raw come input:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("taxi_raw").where(...)

Esempio: Accedere a un set di dati definito nella stessa pipeline

Nota

Anche se le funzioni dlt.read() e dlt.read_stream() sono ancora disponibili e completamente supportate dall'interfaccia Python DLT, Databricks consiglia di usare sempre le funzioni spark.read.table() e spark.readStream.table() a causa dei seguenti elementi:

  • Le funzioni spark supportano la lettura di set di dati interni ed esterni, inclusi i set di dati nell'archiviazione esterna o definiti in altre pipeline. Le funzioni dlt supportano solo la lettura di set di dati interni.
  • Le funzioni di spark supportano la specifica delle opzioni, come skipChangeCommits, per le operazioni di lettura. La specifica delle opzioni non è supportata dalle funzioni di dlt.

Per accedere a un set di dati definito nella stessa pipeline, usare le funzioni spark.read.table() o spark.readStream.table():

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("customers_raw").where(...)

Nota

Quando si eseguono query su viste o tabelle nella pipeline, è possibile specificare direttamente il catalogo e lo schema oppure usare le impostazioni predefinite configurate nella pipeline. In questo esempio la tabella customersviene scritta e letta dal catalogo e dallo schema predefiniti configurati per la pipeline.

Esempio: Leggere da una tabella registrata in un metastore

Per leggere i dati da una tabella registrata nel metastore Hive, nell'argomento della funzione è possibile qualificare il nome della tabella con il nome del database:

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

Per un esempio di lettura da una tabella del catalogo Unity, vedere Inserire dati in una pipeline del catalogo Unity.

esempio di : accedere a un set di dati usando spark.sql

È anche possibile restituire un set di dati usando un'espressione spark.sql in una funzione di query. Per leggere da un set di dati interno, è possibile lasciare il nome non qualificato per usare il catalogo e lo schema predefiniti oppure anteporrli:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM catalog_name.schema_name.customers_cleaned WHERE city = 'Chicago'")

Eliminare definitivamente i record da una vista materializzata o da una tabella di streaming

Per eliminare definitivamente i record da una vista materializzata o da una tabella di streaming con vettori di eliminazione abilitati, ad esempio per la conformità al GDPR, è necessario eseguire operazioni aggiuntive sulle tabelle Delta sottostanti dell'oggetto. Per garantire l'eliminazione dei record da una vista materializzata, vedere Eliminare definitivamente i record da una vista materializzata con vettori di eliminazione abilitati. Per garantire l'eliminazione dei record da una tabella di streaming, vedere Eliminare definitivamente i record da una tabella di streaming.

Utilizzare l'API DLT sink per scrivere su servizi esterni di streaming di eventi o su tabelle Delta

Importante

L'API sink DLT si trova in anteprima pubblica.

Nota

  • L'esecuzione di un aggiornamento completo non cancella i dati dai sink. Tutti i dati elaborati verranno aggiunti al sink e i dati esistenti non verranno modificati.
  • Le aspettative DLT non sono supportate con l'API sink.

Per scrivere in un servizio di streaming di eventi, ad esempio Apache Kafka o Hub eventi di Azure o in una tabella Delta da una pipeline DLT, usare la funzione create_sink() inclusa nel modulo Python dlt. Dopo aver creato un sink con la funzione create_sink(), si usa il sink in un flusso di accodamento per scrivere i dati nel sink. il flusso di accodamento è l'unico tipo di flusso supportato con la funzione create_sink(). Altri tipi di flusso, ad esempio apply_changes, non sono supportati.

Di seguito è riportata la sintassi per creare un sink con la funzione create_sink():

create_sink(<sink_name>, <format>, <options>)
Argomenti
name
Tipo: str
Stringa che identifica il sink e viene utilizzata per fare riferimento e gestire il sink. I nomi dei sink devono essere univoci per la pipeline, incluso nei codici sorgente, come ad esempio notebook o moduli che fanno parte della pipeline.
Questo parametro è obbligatorio.
format
Tipo: str
Stringa che definisce il formato di output, kafka o delta.
Questo parametro è obbligatorio.
options
Tipo: dict
Elenco facoltativo di opzioni sink, formattate come {"key": "value"}, in cui la chiave e il valore sono entrambe stringhe. Sono supportate tutte le opzioni di Databricks Runtime supportate dai sink Kafka e Delta. Per le opzioni Kafka, vedere Configurare lo scrittore di streaming strutturato Kafka. Per le opzioni Delta, vedere la tabella Delta come destinazione.

esempio : Creare un sink Kafka con la funzione create_sink()

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

Esempio: Creare un Sink Delta con la funzione create_sink() e un percorso del file system

Nell'esempio seguente viene creato un sink che scrive in una tabella Delta passando il percorso del file system alla tabella:

create_sink(
  "my_delta_sink",
    "delta",
    { "path": "//path/to/my/delta/table" }
)

esempio : Creare un sink Delta con la funzione create_sink() e il nome di una tabella nel catalogo Unity

Nota

Il sink Delta supporta tabelle esterne e gestite di Unity Catalog e tabelle gestite del metastore Hive. I nomi delle tabelle devono essere completamente qualificati. Ad esempio, le tabelle del catalogo Unity devono usare un identificatore a tre livelli: <catalog>.<schema>.<table>. Le tabelle metastore Hive devono usare <schema>.<table>.

L'esempio seguente crea un sink che scrive in una tabella Delta passando il nome di una tabella nel catalogo Unity:

create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)

Esempio : Usare un flusso di accodamento per scrivere in un sink Delta

Nel seguente esempio si crea un sink che scrive in una tabella Delta e poi si crea un flusso di accodamento per scrivere in quel sink.

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

esempio : usare un flusso di accodamento per scrivere in un sink Kafka

Nell'esempio seguente viene creato un sink che scrive in un topic Kafka e poi viene creato un flusso di aggiunta per scrivere a quel sink:

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))

Lo schema del dataframe scritto in Kafka deve includere le colonne specificate in Configurare il writer di streaming strutturato Kafka.

Creare una tabella da usare come destinazione delle operazioni di streaming

Usare la funzione create_streaming_table() per creare una tabella di destinazione per i record di output dalle operazioni di streaming, tra cui apply_changes(), apply_changes_from_snapshot()e i record di output di @append_flow.

Nota

Le funzioni create_target_table() e create_streaming_live_table() sono deprecate. Databricks consiglia di aggiornare il codice esistente per usare la funzione create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argomenti
name
Tipo: str
Nome della tabella.
Questo parametro è obbligatorio.
comment
Tipo: str
Descrizione facoltativa per la tabella.
spark_conf
Tipo: dict
Elenco facoltativo delle configurazioni di Spark per l'esecuzione di questa query.
table_properties
Tipo: dict
Elenco facoltativo delle proprietà della tabella per la tabella.
partition_cols
Tipo: array
Elenco facoltativo di una o più colonne da utilizzare per il partizionamento della tabella.
cluster_by
Tipo: array
Facoltativamente, abilitare il clustering liquido nella tabella e definire le colonne da usare come chiavi di clustering.
Consultare Usare il raggruppamento liquido per le tabelle Delta.
path
Tipo: str
Un'opzione di archiviazione per i dati della tabella. Se non è impostata, il sistema usa per impostazione predefinita il percorso di archiviazione della pipeline.
schema
Tipo: str o StructType
Definizione dello schema facoltativa per la tabella. Gli schemi possono essere definiti come stringa DDL SQL o con python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail
Tipo: dict
Vincoli di qualità dei dati facoltativi per la tabella. Vedi molte aspettative.
row_filter (anteprima pubblica)
Tipo: str
Clausola di filtro di riga facoltativa per la tabella. Vedere Pubblicare tabelle con filtri di riga e maschere di colonna.

Controllare la modalità di materializzazione delle tabelle

Le tabelle offrono anche un controllo aggiuntivo della loro materializzazione.

  • Specificare come tabelle di cluster usando cluster_by. È possibile usare il clustering liquido per velocizzare le query. Vedere Usare clustering liquido per le tabelle Delta.
  • Specificare il modo in cui le tabelle vengono partizionate usando partition_cols.
  • È possibile impostare le proprietà della tabella quando si definisce una vista o una tabella. Consulta le proprietà della tabella DLT .
  • Impostare un percorso di archiviazione per i dati della tabella usando l'impostazione path. Per impostazione predefinita, i dati della tabella vengono archiviati nel percorso di archiviazione della pipeline se path non è impostato.
  • Nel tuo schema di definizione, è possibile usare colonne generate . Consultare Esempio: Specificare uno schema e le colonne del cluster.

Nota

Per le tabelle di dimensioni inferiori a 1 TB, Databricks consiglia di lasciare che DLT gestisca l'organizzazione dei dati. Non dovresti specificare colonne di partizione a meno che non ti aspetti che la tua tabella cresca oltre un terabyte.

esempio di : specificare uno schema e colonne del cluster

Facoltativamente, è possibile specificare uno schema di tabella usando un StructType Python o una stringa DDL SQL. Quando specificato con una stringa DDL, la definizione può includere colonne generati automaticamente.

L'esempio seguente crea una tabella denominata sales con uno schema specificato usando un StructTypePython :

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Nell'esempio seguente viene specificato lo schema per una tabella usando una stringa DDL, viene definita una colonna generata e vengono definite le colonne di clustering:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  cluster_by = ["order_day_of_week", "customer_id"])
def sales():
  return ("...")

Per impostazione predefinita, DLT deduce lo schema dalla definizione di table se non si specifica uno schema.

Esempio di : Specificare le colonne di partizione

Nell'esempio seguente viene specificato lo schema per una tabella usando una stringa DDL, viene definita una colonna generata e viene definita una colonna di partizione:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Esempio: Definire vincoli di tabella

Importante

I vincoli di tabella si trovano in anteprima pubblica.

Quando si specifica uno schema, è possibile definire chiavi primarie ed esterne. I vincoli sono informativi e non vengono applicati. Consultare la clausola CONSTRAINT nel riferimento al linguaggio SQL.

Nell'esempio seguente viene definita una tabella con un vincolo di chiave primaria ed esterna:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Esempio: Definire un filtro di riga e una maschera di colonna

Importante

I filtri di riga e le maschere di colonna si trovano in anteprima pubblica.

Per creare una vista materializzata o una tabella di streaming con un filtro di riga e una maschera di colonna, utilizzare la clausola ROW FILTER e la clausola MASK. Nell'esempio seguente viene illustrato come definire una vista materializzata e una tabella di streaming con un filtro di riga e una maschera di colonna:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Per altre informazioni sui filtri di riga e sulle maschere di colonna, vedere Pubblicare tabelle con filtri di riga e maschere di colonna.

Configurare una tabella di streaming per ignorare le modifiche in una tabella di streaming di origine

Nota

  • Il flag skipChangeCommits funziona solo con spark.readStream usando la funzione option(). Non è possibile usare questo flag in una funzione dlt.read_stream().
  • Non è possibile usare il flag skipChangeCommits quando la tabella di streaming di origine è definita come destinazione di una funzione apply_changes().

Per impostazione predefinita, le tabelle di streaming richiedono fonti a sola aggiunta. Quando una tabella di streaming usa un'altra tabella di streaming come origine e la tabella di streaming di origine richiede aggiornamenti o eliminazioni, ad esempio l'elaborazione del GDPR "diritto all'oblio", il flag skipChangeCommits può essere impostato durante la lettura della tabella di streaming di origine per ignorare tali modifiche. Per altre informazioni su questo flag, vedere Ignorare gli aggiornamenti ed eliminare.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

proprietà DLT Python

Le tabelle seguenti descrivono le opzioni e le proprietà che è possibile specificare durante la definizione di tabelle e viste con DLT:

@table o @view
name
Tipo: str
Nome facoltativo per la tabella o la vista. Se non definito, il nome della funzione viene usato come nome della tabella o della vista.
comment
Tipo: str
Descrizione facoltativa per la tabella.
spark_conf
Tipo: dict
Elenco facoltativo delle configurazioni di Spark per l'esecuzione di questa query.
table_properties
Tipo: dict
Elenco facoltativo delle proprietà della tabella per la tabella.
path
Tipo: str
Luogo di archiviazione facoltativo per i dati della tabella. Se non è impostata, il sistema usa per impostazione predefinita il percorso di archiviazione della pipeline.
partition_cols
Tipo: a collection of str
Una raccolta facoltativa, come ad esempio un list composto da una o più colonne, da utilizzare per il partizionamento della tabella.
cluster_by
Tipo: array
Facoltativamente, abilitare il clustering liquido nella tabella e definire le colonne da usare come chiavi di clustering.
Consulta Utilizzare il clustering liquido per le tabelle Delta.
schema
Tipo: str o StructType
Definizione dello schema facoltativa per la tabella. Gli schemi possono essere definiti come una stringa SQL DDL o con il linguaggio Python StructType.
temporary
Tipo: bool
Creare una tabella, ma non pubblicare i metadati per la tabella. La parola chiave temporary indica a DLT di creare una tabella disponibile per la pipeline, ma non deve essere accessibile all'esterno della pipeline. Per ridurre il tempo di elaborazione, una tabella temporanea viene mantenuta per la durata della pipeline che lo crea e non solo per un singolo aggiornamento.
Il valore predefinito è "False".
row_filter (anteprima pubblica)
Tipo: str
Clausola di filtro di riga facoltativa per la tabella. Vedere Pubblicare tabelle con filtri di riga e maschere di colonna.
Definizione di tabella o vista
def <function-name>()
Funzione Python che definisce il set di dati. Se il parametro name non è impostato, <function-name> viene usato come nome del set di dati di destinazione.
query
Istruzione SPARK SQL che restituisce un set di dati Spark o un dataframe Koalas.
Usare dlt.read() o spark.read.table() per eseguire una lettura completa da un set di dati definito nella stessa pipeline. Per leggere un set di dati esterno, usare la funzione spark.read.table(). Non è possibile usare dlt.read() per leggere set di dati esterni. Poiché spark.read.table() può essere usato per leggere set di dati interni, set di dati definiti all'esterno della pipeline corrente e consente di specificare le opzioni per la lettura dei dati, Databricks consiglia di usarlo anziché la funzione dlt.read().
Quando si definisce un set di dati in una pipeline, per impostazione predefinita userà il catalogo e lo schema definiti nella configurazione della pipeline. È possibile usare la funzione spark.read.table() per leggere da un dataset definito nella pipeline senza ulteriori specificazioni. Ad esempio, per leggere da un set di dati denominato customers:
spark.read.table("customers")
È anche possibile usare la funzione spark.read.table() per leggere da una tabella registrata nel metastore qualificando facoltativamente il nome della tabella con il nome del database:
spark.read.table("sales.customers")
Usare dlt.read_stream() o spark.readStream.table() per eseguire una lettura di streaming da un set di dati definito nella stessa pipeline. Per eseguire una lettura di streaming da un set di dati esterno, utilizzare
La funzione spark.readStream.table(). Poiché spark.readStream.table() può essere usato per leggere set di dati interni, set di dati definiti all'esterno della pipeline corrente e consente di specificare le opzioni per la lettura dei dati, Databricks consiglia di usarlo anziché la funzione dlt.read_stream().
Per definire una query in una funzione table DLT usando la sintassi SQL, usare la funzione spark.sql. Vedere esempio: Accedere a un set di dati usando spark.sql. Per definire una query nella funzione DLT table utilizzando la sintassi PySpark in Python, usare.
Aspettative
@expect("description", "constraint")
Dichiarare un vincolo di qualità dei dati identificato da
description. Se una riga viola le aspettative, includere la riga nel set di dati di destinazione.
@expect_or_drop("description", "constraint")
Dichiarare un vincolo di qualità dei dati identificato da
description. Se una riga viola le aspettative, eliminare la riga dal set di dati di destinazione.
@expect_or_fail("description", "constraint")
Dichiarare un vincolo di qualità dei dati identificato da
description. Se una riga viola l'aspettativa, ferma immediatamente l'esecuzione.
@expect_all(expectations)
Dichiarare uno o più vincoli di qualità dei dati.
expectations è un dizionario Python, dove la chiave è la descrizione delle aspettative e il valore è il vincolo delle aspettative. Se una riga viola una qualsiasi delle aspettative, includere la riga nel set di dati di destinazione.
@expect_all_or_drop(expectations)
Dichiarare uno o più vincoli di qualità dei dati.
expectations è un dizionario Python, dove la chiave è la descrizione delle aspettative e il valore è il vincolo delle aspettative. Se una riga viola una qualsiasi delle aspettative, eliminare la riga dal set di dati di destinazione.
@expect_all_or_fail(expectations)
Dichiarare uno o più vincoli di qualità dei dati.
expectations è un dizionario Python, dove la chiave è la descrizione delle aspettative e il valore è il vincolo delle aspettative. Se una riga viola una qualsiasi delle aspettative, arrestare immediatamente l'esecuzione.

Change Data Capture da un feed di modifiche con Python in DLT

Usare la funzione apply_changes() nell'API Python per usare la funzionalità DLT Change Data Capture (CDC) per elaborare i dati di origine da un feed di dati delle modifiche (CDF).

Importante

È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Quando si specifica lo schema della tabella di destinazione apply_changes(), è necessario includere le colonne __START_AT e __END_AT con lo stesso tipo di dati dei campi sequence_by.

Per creare la tabella di destinazione necessaria, è possibile usare la funzione create_streaming_table() nell'interfaccia Python DLT.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Nota

Per l'elaborazione APPLY CHANGES, il comportamento predefinito per gli eventi INSERT e UPDATE consiste nell'upsert eventi CDC dall'origine: aggiornare tutte le righe nella tabella di destinazione che corrispondono alle chiavi specificate o inserire una nuova riga quando non esiste un record corrispondente nella tabella di destinazione. La gestione degli eventi di DELETE può essere specificata con la condizione APPLY AS DELETE WHEN.

Per ulteriori informazioni sull'elaborazione CDC con un feed di modifiche, vedere LE API APPLY CHANGES: Semplificare l'acquisizione dei dati di modifica con DLT. Per un esempio di uso della funzione apply_changes(), vedere Esempio: elaborazione dei dati di tipo SCD 1 e SCD 2 con dati di origine CDF.

Importante

È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Quando si specifica lo schema della tabella di destinazione apply_changes, è necessario includere le colonne __START_AT e __END_AT con lo stesso tipo di dati del campo sequence_by.

Consulta le API APPLY CHANGES: Semplificare la cattura dei dati modificati con DLT.

Argomenti
target
Tipo: str
Nome della tabella da aggiornare. È possibile usare la funzione create_streaming_table() per creare la tabella di destinazione prima di eseguire la funzione apply_changes().
Questo parametro è obbligatorio.
source
Tipo: str
Fonte dati contenente record CDC.
Questo parametro è obbligatorio.
keys
Tipo: list
Colonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. Viene usato per identificare quali eventi CDC si applicano a record specifici nella tabella di destinazione.
È possibile specificare uno dei due valori seguenti:
  • Elenco di stringhe: ["userId", "orderId"]
  • Elenco delle funzioni di col() Spark SQL: [col("userId"), col("orderId"]

Gli argomenti delle funzioni col() non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId).
Questo parametro è obbligatorio.
sequence_by
Tipo: str o col()
Nome della colonna che specifica l'ordine logico degli eventi CDC nei dati di origine. DLT usa questa sequenziazione per gestire gli eventi di modifica che arrivano non in ordine.
È possibile specificare uno dei due valori seguenti:
  • Stringa: "sequenceNum"
  • Funzione SQL Spark col(): col("sequenceNum")

Gli argomenti per col() funzioni non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId).
La colonna specificata deve essere un tipo di dati ordinabile.
Questo parametro è obbligatorio.
ignore_null_updates
Tipo: bool
Consentire l'inserimento di aggiornamenti contenenti un sottoinsieme delle colonne di destinazione. Quando un evento CDC corrisponde a una riga esistente e ignore_null_updates è True, le colonne con un null mantengono i valori esistenti nella destinazione. Questo vale anche per le colonne nidificate con un valore di null. Quando ignore_null_updates è False, i valori esistenti vengono sovrascritti con valori di null.
Questo parametro è facoltativo.
Il valore predefinito è False.
apply_as_deletes
Tipo: str o expr()
Specifica quando un evento CDC deve essere considerato come DELETE anziché come un'operazione di inserimento o aggiornamento. Per gestire i dati non ordinati, la riga cancellata viene trattenuta temporaneamente come marcatore di cancellazione nella tabella Delta sottostante e viene creata una vista nel metastore che filtra tali marcatori. L'intervallo di conservazione può essere configurato con
pipelines.cdc.tombstoneGCThresholdInSeconds proprietà della tabella.
È possibile specificare uno dei due valori seguenti:
  • Stringa: "Operation = 'DELETE'"
  • Una funzione expr() di SQL Spark: expr("Operation = 'DELETE'")

Questo parametro è facoltativo.
apply_as_truncates
Tipo: str o expr()
Specifica quando un evento CDC deve essere considerato come una tabella completa TRUNCATE. Poiché questa clausola attiva un troncamento completo della tabella di destinazione, deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità.
Il parametro apply_as_truncates è supportato solo per scD di tipo 1. Il tipo SCD 2 non supporta le operazioni di troncamento.
È possibile specificare uno dei due valori seguenti:
  • Stringa: "Operation = 'TRUNCATE'"
  • Funzione Spark SQL expr(): expr("Operation = 'TRUNCATE'")

Questo parametro è facoltativo.
column_list
except_column_list
Tipo: list
Subset di colonne da includere nella tabella di destinazione. Usare column_list per specificare l'elenco completo di colonne da includere. Usare except_column_list per specificare le colonne da escludere. È possibile dichiarare un valore come elenco di stringhe o come funzioni di spark SQL col():
  • column_list = ["userId", "name", "city"].
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Gli argomenti delle funzioni col() non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId).
Questo parametro è facoltativo.
L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando non viene passato alcun argomento column_list o except_column_list alla funzione.
stored_as_scd_type
Tipo: str o int
Indica se archiviare i record come scD di tipo 1 o SCD di tipo 2.
Impostare su 1 per SCD di tipo 1 o 2 per SCD di tipo 2.
Questa clausola è facoltativa.
Il valore predefinito è SCD di tipo 1.
track_history_column_list
track_history_except_column_list
Tipo: list
Un sottoinsieme di colonne di output di cui tenere traccia della cronologia nella tabella di destinazione. Utilizzare track_history_column_list per specificare l'elenco completo delle colonne da tenere traccia. Usare
track_history_except_column_list specificare le colonne da escludere dal rilevamento. È possibile dichiarare un valore come elenco di stringhe o come funzioni di spark SQL col():
  • track_history_column_list = ["userId", "name", "city"].
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Gli argomenti a col() non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId).
Questo parametro è facoltativo.
L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando non è presente track_history_column_list o ...
track_history_except_column_list parametro viene passato alla funzione.

Cattura delle modifiche dai snapshot del database utilizzando Python in DLT

Importante

L'API APPLY CHANGES FROM SNAPSHOT si trova in anteprima pubblica.

Usare la funzione apply_changes_from_snapshot() nell'API Python per usare la funzionalità DLT Change Data Capture (CDC) per elaborare i dati di origine dagli snapshot del database.

Importante

È necessario dichiarare una tabella di streaming di destinazione in cui applicare le modifiche. Facoltativamente, è possibile specificare lo schema per la tabella di destinazione. Quando si specifica lo schema della tabella di destinazione apply_changes_from_snapshot(), è necessario includere anche le colonne __START_AT e __END_AT con lo stesso tipo di dati del campo sequence_by.

Per creare la tabella di destinazione necessaria, è possibile usare la funzione create_streaming_table() nell'interfaccia Python DLT.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Nota

Per l'elaborazione APPLY CHANGES FROM SNAPSHOT, il comportamento predefinito consiste nell'inserire una nuova riga quando un record corrispondente con le stesse chiavi non esiste nella destinazione. Se esiste un record corrispondente, viene aggiornato solo se uno dei valori nella riga è stato modificato. Le righe con chiavi presenti nella destinazione ma non più presenti nell'origine vengono eliminate.

Per ulteriori informazioni sull'elaborazione CDC con snapshot, vedere le API APPLY CHANGES: Semplifica Change Data Capture con DLT. Per esempi sull'uso della funzione apply_changes_from_snapshot(), vedere gli esempi di inserimento di snapshot periodici e di inserimento di snapshot cronologici .

Argomenti
target
Tipo: str
Nome della tabella da aggiornare. È possibile usare la funzione create_streaming_table() per creare la tabella di destinazione prima di eseguire la funzione apply_changes().
Questo parametro è obbligatorio.
source
Tipo: str o lambda function
Il nome di una tabella o di una vista per creare periodicamente uno snapshot, oppure una funzione lambda Python che restituisce il DataFrame dello snapshot da elaborare e la versione dello snapshot. Vedere Implementazione dell'argomento source.
Questo parametro è obbligatorio.
keys
Tipo: list
Colonna o combinazione di colonne che identificano in modo univoco una riga nei dati di origine. Viene usato per identificare quali eventi CDC si applicano a record specifici nella tabella di destinazione.
È possibile specificare uno dei due valori seguenti:
  • Elenco di stringhe: ["userId", "orderId"]
  • Elenco delle funzioni di col() Spark SQL: [col("userId"), col("orderId"]

Gli argomenti delle funzioni col() non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId).
Questo parametro è obbligatorio.
stored_as_scd_type
Tipo: str o int
Indica se archiviare i record come scD di tipo 1 o SCD di tipo 2.
Impostare il valore su 1 per SCD di tipo 1 o 2 per SCD di tipo 2.
Questa clausola è facoltativa.
Il valore predefinito è SCD di tipo 1.
track_history_column_list
track_history_except_column_list
Tipo: list
Un sottoinsieme di colonne di output da monitorare per la cronologia nella tabella di destinazione. Utilizzare track_history_column_list per specificare l'elenco completo delle colonne da tenere traccia. Usare
track_history_except_column_list specificare le colonne da escludere dal rilevamento. È possibile dichiarare un valore come elenco di stringhe o come funzioni di spark SQL col():
  • track_history_column_list = ["userId", "name", "city"].
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Gli argomenti delle funzioni col() non possono includere qualificatori. Ad esempio, è possibile usare col(userId), ma non è possibile usare col(source.userId).
Questo parametro è facoltativo.
L'impostazione predefinita consiste nell'includere tutte le colonne nella tabella di destinazione quando non è specificato track_history_column_list o...
track_history_except_column_list parametro viene passato alla funzione.

Implementare l'argomento source

La funzione apply_changes_from_snapshot() include l'argomento source. Per l'elaborazione degli snapshot cronologici, l'argomento source deve essere una funzione lambda Python che restituisce due valori alla funzione apply_changes_from_snapshot(): un dataframe Python contenente i dati dello snapshot da elaborare e una versione snapshot.

Di seguito è riportata la firma della funzione lambda:

lambda Any => Optional[(DataFrame, Any)]
  • L'argomento della funzione lambda è la versione snapshot elaborata più di recente.
  • Il valore restituito della funzione lambda è None o una tupla di due valori: il primo valore della tupla è un dataframe contenente lo snapshot da elaborare. Il secondo valore della tupla rappresenta la versione dello snapshot che indica l'ordine logico dello snapshot.

Esempio che implementa e chiama la funzione lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Il runtime DLT esegue i passaggi seguenti ogni volta che viene attivata la pipeline che contiene la funzione apply_changes_from_snapshot():

  1. Esegue la funzione next_snapshot_and_version per caricare il dataframe dello snapshot successivo e la versione snapshot corrispondente.
  2. Se non viene restituito alcun dataframe, l'esecuzione viene terminata e l'aggiornamento della pipeline viene contrassegnato come completo.
  3. Rileva le modifiche nel nuovo snapshot e le applica in modo incrementale alla tabella di destinazione.
  4. Torna al passaggio 1 per caricare lo snapshot successivo e la relativa versione.

limitazioni

L'interfaccia DLT Python presenta la limitazione seguente:

La funzione pivot() non è supportata. L'operazione pivot in Spark richiede il caricamento anticipato dei dati di input per calcolare lo schema di output. Questa funzionalità non è supportata in DLT.