Condividi tramite


Carica ed elabora i dati in modo incrementale con i flussi delle Delta Live Tables

Questo articolo illustra cosa sono i flussi e come utilizzare i flussi nelle pipeline di Delta Live Tables per elaborare in modo incrementale i dati da un'origine verso una tabella di streaming di destinazione. In Tabelle live Delta i flussi vengono definiti in due modi:

  1. Un flusso viene definito automaticamente quando si crea una query che aggiorna una tabella di streaming.
  2. Le tabelle live delta forniscono anche funzionalità per definire in modo esplicito i flussi per un'elaborazione più complessa, ad esempio l'aggiunta a una tabella di streaming da più origini di streaming.

Questo articolo illustra i flussi impliciti creati quando si definisce una query per aggiornare una tabella di streaming e vengono quindi forniti dettagli sulla sintassi per definire flussi più complessi.

Che cos'è un flusso?

In Delta Live Tables, un flusso è una query di streaming che elabora i dati di origine in modo incrementale per aggiornare una tabella di streaming di destinazione. La maggior parte degli insiemi di dati Delta Live Tables creati in una pipeline definisce il flusso come parte della query e non richiede di definire esplicitamente il flusso. Ad esempio, si crea una tabella di streaming in Tabelle Live Delta in un singolo comando DDL anziché usare istruzioni di tabella e flusso separate per creare la tabella di streaming:

Nota

Questo esempio CREATE FLOW viene fornito solo a scopo illustrativo e include parole chiave che non sono una sintassi delta live tables valida.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Oltre al flusso predefinito definito da una query, le interfacce Python e SQL di Delta Live Tables forniscono funzionalità di accodamento. Il flusso di aggiunta supporta l'elaborazione che richiede la lettura di dati da più fonti di streaming per aggiornare una tabella di streaming. Ad esempio, è possibile usare la funzionalità di aggiunta di flussi quando si dispone di una tabella di streaming e un flusso esistenti e si vuole aggiungere una nuova origine di streaming che aggiorna questa tabella di streaming esistente.

Usare il flusso di accodamento per scrivere in una tabella di streaming da più flussi di origine

Usare il decoratore @append_flow nell'interfaccia Python o la clausola CREATE FLOW nell'interfaccia SQL per scrivere in una tabella di streaming attingendo da più origini di streaming. Usare il flusso di accodamento per l'elaborazione di attività come le seguenti:

  • Aggiungere origini di streaming che aggiungono dati a una tabella di streaming esistente senza richiedere un aggiornamento completo. Ad esempio, potrebbe esserci una tabella che combina i dati regionali di ogni regione in cui si opera. Man mano che vengono implementate nuove aree, è possibile aggiungere i dati della nuova area alla tabella senza eseguire un aggiornamento completo. Consultare esempio: Scrivere su una tabella di streaming da diversi temi Kafka.
  • Aggiornare una tabella di streaming aggiungendo dati cronologici mancanti (riempimento). Ad esempio, si dispone di una tabella di streaming esistente scritta in da un argomento Apache Kafka. I dati cronologici sono archiviati anche in una tabella che è necessario inserire esattamente una volta nella tabella di streaming e non è possibile trasmettere i dati perché l'elaborazione include l'esecuzione di un'aggregazione complessa prima di inserire i dati. Vedere Esempio: Eseguire un backfill di dati monouso.
  • Combinare dati da più origini e scrivere in una singola tabella di streaming anziché usare la clausola UNION in una query. L'elaborazione del flusso di accodamento anziché UNION consente di aggiornare la tabella di destinazione in modo incrementale senza eseguire un aggiornamento completo . Vedere Esempio: Usare l'elaborazione del flusso di accodamento anziché UNION.

La destinazione per i record restituiti dall'elaborazione del flusso di accodamento può essere una tabella esistente o una nuova tabella. Per le query Python, usare la funzione create_streaming_table() per creare una tabella di destinazione.

Importante

  • Se devi definire vincoli di qualità dei dati con aspettative , definisci le aspettative sulla tabella di destinazione come parte della funzione create_streaming_table() o su una definizione di tabella esistente. Non è possibile definire le aspettative nella @append_flow definizione.
  • I flussi vengono identificati da un nome di flusso e questo nome viene usato per identificare i checkpoint di streaming. L'uso del nome del flusso per identificare il checkpoint indica quanto segue:
    • Se un flusso esistente in una pipeline viene rinominato, il checkpoint non viene portato avanti e il flusso rinominato è effettivamente un flusso completamente nuovo.
    • Non è possibile riutilizzare un nome di flusso in una pipeline, perché il checkpoint esistente non corrisponde alla nuova definizione del flusso.

Di seguito è riportata la sintassi per @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

esempio : Scrivere in una tabella di streaming da più argomenti Kafka

Gli esempi seguenti creano una tabella di streaming denominata kafka_target e scrive in tale tabella di streaming da due argomenti Kafka:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Per altre informazioni sulla funzione con valori di tabella read_kafka() usata nelle query SQL, vedere read_kafka nelle informazioni di riferimento sul linguaggio SQL.

In Python è possibile creare più flussi a livello di codice destinati a una singola tabella. Nell'esempio seguente viene illustrato questo modello per un elenco di argomenti Kafka.

Nota

Questo modello ha gli stessi requisiti dell'uso di un ciclo for per creare tabelle. È necessario passare in modo esplicito un valore Python alla funzione che definisce il flusso. Vedere Creare tabelle in un ciclo for.

import dlt

dlt.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Esempio: Eseguire un riempimento dati monouso

Gli esempi seguenti eseguono una query per aggiungere dati cronologici a una tabella di streaming:

Nota

Per garantire un backfill unico quando la query di backfill fa parte di una pipeline che viene eseguita su base pianificata o continuamente, rimuovere la query dopo aver eseguito la pipeline una sola volta. Per aggiungere nuovi dati se arrivano nella directory backfill, lasciare la query sul posto.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Esempio: usare l'elaborazione del flusso di accodamento anziché UNION

Anziché utilizzare una query con una clausola UNION, è possibile utilizzare query di accodamento per combinare più origini e scrivere in una singola tabella di streaming. L'uso di query di appendimento al posto di UNION consente di aggiungere dati a una tabella di streaming da più origini senza eseguire un aggiornamento completo .

L'esempio python seguente include una query che combina più origini dati con una UNION clausola :

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

Gli esempi seguenti sostituiscono la UNION query con le query del flusso di accodamento:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/apac",
    "csv"
  );