Поделиться через


Постепенная загрузка и обработка данных в потоках Delta Live Tables

В этой статье объясняется, что такое потоки данных и как можно использовать их в конвейерах Delta Live Tables для поэтапной обработки данных из источника в целевую потоковую таблицу. В Delta Live Tables потоки определяются двумя способами:

  1. Поток определяется автоматически при создании запроса, обновляющего потоковую таблицу.
  2. Delta Live Tables также предоставляет возможности для явного определения потоков для более сложной обработки, например, добавления данных из нескольких потоковых источников в потоковую таблицу.

В этой статье рассматриваются неявные потоки, созданные при определении запроса для обновления таблицы потоковой передачи, а затем содержатся сведения о синтаксисе для определения более сложных потоков.

Что такое поток?

В Delta Live Tables поток — это потоковый запрос, который обрабатывает исходные данные инкрементно для обновления целевой потоковой таблицы. Большинство наборов данных Delta Live Tables, создаваемых в конвейере, определяют поток как часть запроса и не требуют явного определения потока. Например, вы создаете потоковую таблицу в Delta Live Tables с помощью одной команды DDL вместо использования отдельных инструкций таблицы и потока для этого.

Примечание.

Этот CREATE FLOW пример предоставляется только для иллюстрирующих целей и содержит ключевые слова, которые не являются допустимыми синтаксисом Delta Live Table.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Помимо потока по умолчанию, определенного запросом, интерфейсы Python и SQL Delta Live Tables предоставляют функции потока добавления. Поток добавления поддерживает обработку, требующую чтения данных из нескольких источников потоковой передачи для обновления одной потоковой таблицы. Например, можно использовать функциональность добавления потока, если у вас имеется существующая потоковая таблица и поток, и вы хотите добавить новый источник потоковой передачи, которая будет записывать в эту существующую потоковую таблицу.

Используйте потоковое добавление для записи в потоковую таблицу из нескольких исходных потоков

Используйте декоратор @append_flow в интерфейсе Python или предложение CREATE FLOW в интерфейсе SQL для записи в потоковую таблицу из нескольких потоковых источников. Используйте поток добавления для задач обработки, таких как:

  • Добавьте источники потоковой передачи, добавляющие данные в существующую потоковую таблицу без полного обновления. Например, у вас может быть таблица, объединяющая региональные данные из каждого региона, в который вы работаете. При развертывании новых регионов можно добавить новые данные региона в таблицу без полного обновления. См. Пример: запись в потоковую таблицу из нескольких тем Kafka.
  • Обновите потоковую таблицу дополнением отсутствующих исторических данных (обратное заполнение). Например, у вас есть существующая таблица потоковой передачи, записанная в раздел Apache Kafka. У вас также есть исторические данные, хранящиеся в таблице, которые нужно вставить ровно один раз в таблицу потоковой обработки, и вы не можете передавать данные, так как обработка включает в себя выполнение сложной агрегации перед вставкой данных. См . пример: выполнение однократного резервного заполнения данных.
  • Объедините данные из нескольких источников и запишите их в одну потоковую таблицу вместо того чтобы использовать предложение UNION в запросе. Использование обработки потока добавления вместо UNION позволяет обновлять целевую таблицу постепенно без выполнения полного обновления. См . пример. Использование обработки потока добавления вместо UNION.

Целевой объект для выхода записей, обработанных потоком добавления, может быть существующей таблицей или новой таблицей. Для запросов Python используйте функцию create_streaming_table() для создания целевой таблицы.

Внимание

  • Если необходимо определить ограничения качества данных при ожиданиях, определите ожидания для целевой таблицы в рамках функции create_streaming_table() или на основании существующего определения таблицы. Вы не можете определить ожидания в определении @append_flow .
  • Потоки определяются именем потока, и это имя используется для идентификации контрольных точек потоковой передачи. Использование имени потока для идентификации контрольной точки означает следующее:
    • Если существующий поток в конвейере переименован, контрольная точка не переносится, и переименованный поток фактически является совершенно новым потоком.
    • Невозможно повторно использовать имя потока в конвейере, так как существующая контрольная точка не будет соответствовать новому определению потока.

Ниже приведен синтаксис для @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Пример: Запись в потоковую таблицу из нескольких топиков Kafka

В следующем примере создается потоковая таблица с именем kafka_target и производится запись в эту таблицу потоковой передачи из двух топиков Kafka:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Дополнительные сведения о табличной функции read_kafka(), используемой в запросах SQL, см. в разделе read_kafka справочника по языку SQL.

В Python можно программно создать несколько потоков, предназначенных для одной таблицы. В следующем примере показан этот шаблон для списка разделов Kafka.

Примечание.

Этот шаблон имеет те же требования, что и при использовании цикла for для создания таблиц. Необходимо явно передать значение Python в функцию, определяющую поток. См. раздел Создание таблиц в цикле.

import dlt

dlt.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Пример. Запуск одноразовой обратной заполнения данных

В следующих примерах выполняется запрос для добавления исторических данных в таблицу потоковой передачи:

Примечание.

Чтобы обеспечить настоящее однократное дозаполнение, когда запрос обратного заполнения является частью конвейера, работающего по расписанию или в непрерывном режиме, удалите запрос после однократного запуска конвейера. Чтобы добавить новые данные, если он поступает в каталог обратной заполнения, оставьте запрос на месте.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Пример. Использование обработки потока добавления вместо UNION

Вместо использования запроса с предложением UNION можно использовать запросы добавления для объединения нескольких источников и записи в одну потоковую таблицу. Использование запросов на добавление в поток вместо UNION позволяет добавлять данные в потоковую таблицу из нескольких источников без выполнения полного обновления.

В следующем примере Python содержится запрос, который объединяет несколько источников данных с предложением UNION :

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

В следующих примерах запрос заменится запросом UNION на запросы потока добавления:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/apac",
    "csv"
  );