Delen via


Gegevens incrementeel laden en verwerken met DLT-stromen

In dit artikel wordt uitgelegd wat stromen zijn en hoe u stromen in DLT-pijplijnen kunt gebruiken om gegevens van een bron naar een doelstreamingtabel incrementeel te verwerken. In DLT worden stromen op twee manieren gedefinieerd:

  1. Een stroom wordt automatisch gedefinieerd wanneer u een query maakt waarmee een streamingtabel wordt bijgewerkt.
  2. DLT biedt ook functionaliteit om stromen expliciet te definiëren voor complexere verwerking, zoals toevoegen aan een streamingtabel vanuit meerdere streamingbronnen.

In dit artikel worden de impliciete stromen besproken die worden gemaakt wanneer u een query definieert voor het bijwerken van een streamingtabel en vindt u vervolgens details over de syntaxis om complexere stromen te definiëren.

Wat is een stroom?

In DLT is een flow een streamingquery die brongegevens incrementeel verwerkt om een doelstreamingtabel bij te werken. De meeste DLT-gegevenssets die u in een pijplijn maakt, definiëren de stroom als onderdeel van de query en hoeven de stroom niet expliciet te definiëren. U maakt bijvoorbeeld een streamingtabel in DLT in één DDL-opdracht in plaats van afzonderlijke tabel- en stroominstructies te gebruiken om de streamingtabel te maken:

Notitie

Dit CREATE FLOW voorbeeld wordt alleen ter illustratie gegeven en bevat trefwoorden die geen geldige DLT-syntaxis zijn.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Naast de standaardstroom die is gedefinieerd door een query, bieden de DLT Python- en SQL-interfaces toevoegstroom functionaliteit. Stroom voor toevoegingen ondersteunt verwerking die vereist dat gegevens uit meerdere streamingbronnen worden gelezen om een enkele streamingtabel bij te werken. U kunt bijvoorbeeld toevoegstroomfunctionaliteit gebruiken wanneer u een bestaande streamingtabel en stroom hebt en een nieuwe streamingbron wilt toevoegen die naar deze bestaande streamingtabel schrijft.

Toevoegstroom gebruiken om vanuit meerdere bronstreams naar een streamingtabel te schrijven

Gebruik de @append_flow-decorator in de Python-interface of de CREATE FLOW-clausule in de SQL-interface om naar een streamingtabel te schrijven vanuit meerdere streamingbronnen. Gebruik toevoegstroom voor het uitvoeren van taken, zoals de volgende:

  • Voeg streamingbronnen toe die gegevens toevoegen aan een bestaande streamingtabel zonder dat u een volledige vernieuwing nodig hebt. U hebt bijvoorbeeld een tabel waarin regionale gegevens worden gecombineerd van elke regio waarin u werkt. Wanneer er nieuwe regio's worden geïmplementeerd, kunt u de nieuwe regiogegevens toevoegen aan de tabel zonder dat u een volledige vernieuwing hoeft uit te voeren. Zie Voorbeeld: Schrijven vanuit meerdere Kafka-topics naar een streamingtabel.
  • Werk een streamingtabel bij door ontbrekende historische gegevens toe te voegen (backfilling). U hebt bijvoorbeeld een bestaande streamingtabel die wordt bijgewerkt door een Apache Kafka-onderwerp. U hebt ook historische gegevens opgeslagen in een tabel die u precies eenmaal in de streamingtabel hebt ingevoegd en u kunt de gegevens niet streamen omdat uw verwerking een complexe aggregatie omvat voordat u de gegevens invoegt. Zie Voorbeeld: Een eenmalige backfill van gegevens uitvoeren.
  • Combineer gegevens uit meerdere bronnen en schrijf naar één streamingtabel in plaats van de UNION component in een query te gebruiken. Als u toevoegstroomverwerking gebruikt in plaats van UNION kunt u de doeltabel incrementeel bijwerken zonder een volledige vernieuwingsupdate uit te voeren. Zie Voorbeeld: Gebruik append-flowverwerking in plaats van UNION.

Het doel voor de recorduitvoer door de verwerking van de toevoegstroom kan een bestaande tabel of een nieuwe tabel zijn. Gebruik voor Python-query's de create_streaming_table() functie om een doeltabel te maken.

Belangrijk

  • Als u beperkingen voor gegevenskwaliteit wilt definiëren met verwachtingen, definieert u de verwachtingen voor de doeltabel als onderdeel van de create_streaming_table() functie of in een bestaande tabeldefinitie. U kunt geen verwachtingen definiëren in de definitie van @append_flow.
  • Stromen worden geïdentificeerd door een stroomnaamen deze naam wordt gebruikt om streamingcontrolepunten te identificeren. Het gebruik van de stroomnaam om het controlepunt te identificeren betekent het volgende:
    • Wanneer een bestaande workflow in een pijplijn wordt hernoemd, wordt het controlepunt niet overgedragen en geldt de hernoemde workflow effectief als een volledig nieuwe workflow.
    • U kunt een stroomnaam niet opnieuw gebruiken in een pijplijn, omdat het bestaande controlepunt niet overeenkomt met de nieuwe stroomdefinitie.

Hier volgt de syntaxis voor @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Voorbeeld: schrijven naar een streamingtabel vanuit meerdere Kafka-topics

In de volgende voorbeelden wordt een streamingtabel met de naam kafka_target gemaakt en naar die streamingtabel geschreven vanuit twee Kafka-onderwerpen:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Zie read_kafka in de sql-taalverwijzing voor meer informatie over de read_kafka() tabelwaardefunctie die wordt gebruikt in de SQL-query's.

In Python kunt u programmatisch meerdere stromen maken die gericht zijn op één tabel. In het volgende voorbeeld ziet u dit patroon voor een lijst met Kafka-onderwerpen.

Notitie

Dit patroon heeft dezelfde vereisten als het gebruik van een for lus om tabellen te maken. U moet expliciet een Python-waarde doorgeven aan de functie die de stroom definieert. Zie Tabellen maken in een for-lus.

import dlt

dlt.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

voorbeeld: Een eenmalige backfill van gegevens uitvoeren

In de volgende voorbeelden wordt een query uitgevoerd om historische gegevens toe te voegen aan een streamingtabel:

Notitie

Als u een echte eenmalige backfill wilt garanderen wanneer de backfillquery deel uitmaakt van een pijplijn die op geplande basis of continu wordt uitgevoerd, verwijdert u de query nadat de pijplijn eenmaal is uitgevoerd. Als u nieuwe gegevens wilt toevoegen wanneer er nieuwe gegevens binnenkomen in de backfill-map, laat u de query op zijn plek.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

voorbeeld: Toevoegstroomverwerking gebruiken in plaats van UNION

In plaats van een query met een UNION-component te gebruiken, kunt u toevoegstroomquery's gebruiken om meerdere bronnen te combineren en naar één streamingtabel te schrijven. Met behulp van toevoegstroomquery's in plaats van UNION kunt u vanuit meerdere bronnen toevoegen aan een streamingtabel zonder een volledige vernieuwinguit te voeren.

Het volgende Python-voorbeeld bevat een query die meerdere gegevensbronnen combineert met een UNION-component:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

De volgende voorbeelden vervangen de UNION-query met queries voor toevoegstromen.

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/apac",
    "csv"
  );