Dela via


Läsa in och bearbeta data stegvis med Delta Live Tables-flöden

Den här artikeln förklarar vad flöden är och hur du kan använda flöden i Delta Live Tables-pipelines för att stegvis bearbeta data från en källa till en målströmningstabell. I Delta Live Tables definieras flöden på två sätt:

  1. Ett flöde definieras automatiskt när du skapar en fråga som uppdaterar en strömmande tabell.
  2. Delta Live Tables innehåller också funktioner för att explicit definiera flöden för mer komplex bearbetning, till exempel lägga till en strömningstabell från flera strömningskällor.

I den här artikeln beskrivs de implicita flöden som skapas när du definierar en fråga för att uppdatera en strömmande tabell och innehåller sedan information om syntaxen för att definiera mer komplexa flöden.

Vad är ett flöde?

I Delta Live Tables är ett flöde en strömmande fråga som bearbetar källdata stegvis för att uppdatera en målströmningstabell. De flesta Delta Live Tables-datauppsättningar som du skapar i en pipeline definierar flödet som en del av frågan och kräver inte att flödet uttryckligen definieras. Du kan till exempel skapa en strömmande tabell i Delta Live Tables i ett enda DDL-kommando i stället för att använda separata tabell- och flödesinstruktioner för att skapa strömningstabellen:

Kommentar

Det här CREATE FLOW exemplet tillhandahålls endast i illustrativa syften och innehåller nyckelord som inte är giltiga Delta Live Tables-syntax.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Förutom det standardflöde som definieras av en fråga tillhandahåller Delta Live Tables Python- och SQL-gränssnitten tilläggsflöde funktioner. Tilläggsflödet stöder bearbetning som kräver läsning av data från flera strömmande källor för att uppdatera en enda strömmande tabell. Du kan till exempel använda funktionen lägg till flöde när du har en befintlig strömmande tabell och ett befintligt flöde och vill lägga till en ny strömmande källa som skriver till den befintliga strömningstabellen.

Använd tilläggsflöde för att skriva till en strömmande tabell från flera källströmmar

Använd @append_flow-dekoreraren i Python-gränssnittet eller CREATE FLOW-satsen i SQL-gränssnittet för att skriva till en strömmande tabell från flera strömmande källor. Använd tilläggsflödet för bearbetning av uppgifter, till exempel följande:

  • Lägg till strömmande källor som lägger till data i en befintlig strömmande tabell utan att kräva en fullständig uppdatering. Du kan till exempel ha en tabell som kombinerar regionala data från varje region som du arbetar i. När nya regioner distribueras kan du lägga till nya regiondata i tabellen utan att utföra en fullständig uppdatering. Se Exempel: Skriv till en strömtabell från flera Kafka-ämnen.
  • Uppdatera en strömmande tabell genom att lägga till saknade historiska data (återfyllnad). Du har till exempel en befintlig strömmande tabell som skrivs till av ett Apache Kafka-topic. Du har också historiska data lagrade i en tabell som du behöver infoga exakt en gång i strömningstabellen, och du kan inte strömma data eftersom bearbetningen innefattar att utföra en komplex aggregering innan du infogar data. Se Exempel: Kör en engångsdatapåfyllning.
  • Kombinera data från flera källor och skriv till en enda strömmande tabell i stället för att använda UNION-satsen i en fråga. Med användning av tilläggsflöde i stället för UNION kan du uppdatera måltabellen stegvis utan att köra en fullständig uppdatering. Se Exempel: Använd bearbetning av tilläggsflöde i stället för UNION.

Målet för posterna som matas ut av bearbetningen av tilläggsflödet kan vara en befintlig tabell eller en ny tabell. För Python-frågor använder du funktionen create_streaming_table() för att skapa en måltabell.

Viktigt!

  • Om du behöver definiera datakvalitetsbegränsningar med förväntningardefinierar du förväntningarna på måltabellen som en del av funktionen create_streaming_table() eller en befintlig tabelldefinition. Du kan inte definiera förväntningar i @append_flow definitionen.
  • Flöden identifieras med ett flödesnamn och det här namnet används för att identifiera kontrollpunkter för strömning. Användning av flödesnamnet för att identifiera kontrollpunkten innebär följande:
    • Om ett befintligt flöde i en pipeline byter namn överförs inte kontrollpunkten och det omdöpta flödet är i praktiken ett helt nytt flöde.
    • Du kan inte återanvända ett flödesnamn i en pipeline eftersom den befintliga kontrollpunkten inte matchar den nya flödesdefinitionen.

Följande är syntaxen för @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Exempel: Skriva till en strömmande tabell från flera Kafka-ämnen

I följande exempel skapas en strömningstabell med namnet kafka_target och data skrivs till den från två Kafka-ämnen.

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Mer information om den read_kafka() tabellvärdesfunktion som används i SQL-frågorna finns i read_kafka i SQL-språkreferensen.

I Python kan du programmatiskt skapa flera flöden som riktar sig mot en enda tabell. I följande exempel visas det här mönstret för en lista över Kafka-ämnen.

Kommentar

Det här mönstret har samma krav som att använda en for-loop för att skapa tabeller. Du måste uttryckligen skicka ett Python-värde till funktionen som definierar flödet. Se Skapa tabeller i en for-loop.

import dlt

dlt.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Exempel: Kör en engångsdatapåfyllning

I följande exempel körs en fråga för att lägga till historiska data i en strömmande tabell:

Kommentar

Ta bort frågan när du har kört pipelinen en gång för att säkerställa en sann engångsefterfyllnad när återfyllnadsfrågan är en del av en pipeline som körs enligt schemat eller kontinuerligt. Om du vill lägga till nya data om de kommer till katalogen för återfyllnad lämnar du frågan på plats.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Exempel: Använd bearbetning av tilläggsflöde i stället för UNION

I stället för att använda en fråga med en UNION-sats kan du använda tilläggsflödesfrågor för att kombinera flera källor och skriva till en enda direktuppspelningstabell. Med hjälp av tilläggsflödesfrågor i stället för UNION kan du lägga till data i en strömningstabell från flera källor utan att behöva köra en fullständig uppdatering .

Följande Python-exempel innehåller en fråga som kombinerar flera datakällor med en UNION sats:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

Följande exempel ersätter UNION frågan med tilläggsflödesfrågor:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/apac",
    "csv"
  );