다음을 통해 공유


Delta Live Tables 흐름을 사용하여 증분 방식으로 데이터 로드 및 처리

이 문서는 흐름이란 무엇인지와 Delta Live Tables 파이프라인에서 흐름을 사용하여 원본에서 대상 스트리밍 table으로 데이터를 증분 처리하는 방법을 설명합니다. Delta Live Tables흐름은 다음 두 가지 방법으로 정의됩니다.

  1. 쿼리를 생성하여 스트리밍 table를 업데이트하면 흐름이 자동으로 정의됩니다.
  2. 또한 Delta Live Tables는 여러 스트리밍 원본에서 스트리밍 table에 연결을 추가하는 등 보다 복잡한 처리를 위한 흐름을 명시적으로 정의하도록 하는 기능을 제공합니다.

이 문서에서는 스트리밍 tableupdate 쿼리를 정의할 때 생성되는 암시적 흐름에 대해 설명한 다음 더 복잡한 흐름을 정의하는 구문에 대한 세부 정보를 제공합니다.

흐름이란?

Delta Live Tables흐름 원본 데이터를 증분 방식으로 처리하여 대상 스트리밍 tableupdate 스트리밍 쿼리입니다. 파이프라인에서 만드는 대부분의 Delta Live Tables 데이터 세트는 흐름을 쿼리의 일부로 정의하며 흐름을 명시적으로 정의할 필요가 없습니다. 예를 들어, 별도의 table 및 흐름 문을 사용하여 스트리밍 table을 만드는 대신, 단일 DDL 명령으로 Delta Live Tables에서 스트리밍 table을 만듭니다.

참고 항목

CREATE FLOW 예제는 설명 목적으로만 제공되며 유효한 Delta Live Tables 구문이 아닌 키워드를 포함합니다.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

쿼리로 정의된 기본 흐름 외에도 Delta Live Tables Python 및 SQL 인터페이스는 추가 흐름 기능을 제공합니다. 추가 흐름은 단일 스트리밍 tableupdate 위해 여러 스트리밍 원본에서 데이터를 읽도록 요구하는 처리를 지원합니다. 예를 들어, 기존 스트리밍 table과 흐름이 있을 때 이 기존 스트리밍 table에 기록하는 새 스트리밍 소스를 추가하려면 추가 흐름 기능을 사용할 수 있습니다.

부가 흐름을 사용하여 여러 개의 원본 스트림으로부터 스트리밍 table에 쓰기

Python 인터페이스의 @append_flow 데코레이터 또는 SQL 인터페이스의 CREATE FLOW 절을 사용하여 여러 스트리밍 원본으로부터 스트리밍 table에 기록합니다. 다음과 같은 작업을 처리하는 데 추가 흐름을 사용합니다.

  • 전체 refresh가 필요 없이 기존 스트리밍 table에 데이터를 추가하는 스트리밍 소스를 추가합니다. 예를 들어, 여러분이 운영하는 모든 지역의 지역 데이터를 결합하는 table가 있을 수 있습니다. 새 지역이 도입되면 전체 refresh을 수행하지 않고 새 지역 데이터를 table에 추가할 수 있습니다. 예제:여러 Kafka 토픽의 스트리밍 table 씁니다.
  • 스트리밍 Updatetable 누락된 기록 데이터(백필)를 추가하여. 예를 들어, Apache Kafka 토픽에서 데이터를 기록하는 기존 스트리밍 table이 있습니다. 귀하는 또한 기록 데이터를 table에 저장하고 있으며, 이 데이터를 스트리밍 table에 정확히 한 번 삽입해야 합니다. 그러나 데이터를 삽입하기 전에 복잡한 집계를 수행하는 것이 처리에 포함되므로 데이터를 스트리밍할 수 없습니다. 예제: 일회성 데이터 백필을 실행합니다.
  • 쿼리에서 UNION 절을 사용하는 대신 여러 원본의 데이터를 결합하여 단일 스트리밍 table에 기록하십시오. UNION 대신 추가 흐름 처리를 사용하면, 전체 refreshupdate을 실행하지 않고 대상 table를 증분 방식으로 update할 수 있습니다. 예제 : UNION 대신 추가 흐름 처리를 사용합니다.

추가 흐름 처리에 의한 레코드 출력 대상은 기존 table 또는 새 table수 있습니다. Python 쿼리의 경우 create_streaming_table() 함수를 사용하여 대상 table만듭니다.

Important

  • 기대치데이터 품질 제약 조건을 정의해야 하는 경우 대상 table 대한 기대치를 create_streaming_table() 함수의 일부 또는 기존 table 정의로 정의합니다. 정의에서 @append_flow 기대치를 정의할 수 없습니다.
  • 흐름은 흐름 이름으로 식별되며 이 이름은 스트리밍 검사점을 식별하는 데 사용됩니다. 흐름 이름을 사용하여 검사점을 식별하는 것은 다음을 의미합니다.
    • 파이프라인의 기존 흐름 이름이 바뀌면 검사점이 이월되지 않으며 이름이 바뀐 흐름은 사실상 완전히 새로운 흐름입니다.
    • 기존 검사점이 새 흐름 정의와 일치하지 않으므로 파이프라인에서 흐름 이름을 다시 사용할 수 없습니다.

다음은 다음 구문입니다 @append_flow.

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

예제: 여러 Kafka 토픽에서 스트리밍 table로 쓰기

다음 예제에서는 kafka_target 스트리밍 table 만들고 두 Kafka 토픽에서 해당 스트리밍 table 씁니다.

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

SQL 쿼리에서 사용되는 read_kafka()table값 함수에 대한 자세한 내용은 SQL 언어 참조의 read_kafka를 참조하세요.

예: 일회성 데이터 백필 실행

다음 예제에서는 스트리밍 table에 과거 데이터를 추가하는 쿼리를 실행합니다.

참고 항목

진정한 일회성 백필링을 보장하려면, 백필 쿼리가 예약된 기준으로 또는 지속적으로 실행되는 파이프라인의 일부일 때, 파이프라인을 한 번 실행한 후 쿼리에 remove을 수행하십시오. 백필 디렉터리에 도착하는 경우 새 데이터를 추가하려면 쿼리를 그대로 둡니다.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

예: 대신 추가 흐름 처리 사용 UNION

UNION 절과 함께 쿼리를 사용하는 대신, 추가 흐름 쿼리를 사용하여 여러 원본을 결합하고 단일 스트리밍 table에 출력할 수 있습니다. UNION 대신 추가 흐름 쿼리를 사용하면, 여러 원본에서 스트리밍되는 table에 추가하면서 전체 refresh를 실행할 필요가 없습니다.

다음 Python 예제에는 여러 데이터 원본을 절과 결합하는 쿼리가 UNION 포함되어 있습니다.

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

다음 예제에서는 쿼리를 UNION 추가 흐름 쿼리로 바꿉니다.

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/apac",
    "csv"
  );