다음을 통해 공유


Delta Live Tables를 사용하여 데이터 변환

이 문서에서는 Delta Live Tables를 사용하여 데이터 세트에 대한 변환을 선언하고 쿼리 논리를 통해 레코드를 처리하는 방법을 지정하는 방법을 설명합니다. 또한 Delta Live Tables 파이프라인을 빌드하기 위한 일반적인 변환 패턴의 예제도 포함되어 있습니다.

DataFrame을 반환하는 모든 쿼리에 대해 데이터 세트를 정의할 수 있습니다. Delta Live Tables 파이프라인에서 Apache Spark 기본 제공 작업, UDF, 사용자 지정 논리 및 MLflow 모델을 변환으로 사용할 수 있습니다. 델타 라이브 테이블 파이프라인에 데이터를 수집한 후에는 업스트림 원본에 대해 새 데이터 세트를 정의하여 새 스트리밍 테이블, 구체화된 뷰 및 뷰를 만들 수 있습니다.

Delta Live Tables를 사용하여 상태 저장 처리를 효과적으로 수행하는 방법을 알아보려면 워터마크를 사용하여 Delta Live Tables에서 상태 저장 처리 최적화를 참조 하세요.

뷰, 구체화된 뷰 및 스트리밍 테이블을 사용하는 경우

파이프라인 쿼리를 구현할 때 최상의 데이터 세트 형식을 선택하여 효율적이고 유지 관리가 가능한지 확인합니다.

보기를 사용하여 다음을 수행하는 것이 좋습니다.

  • 원하는 크거나 복잡한 쿼리를 관리하기 쉬운 쿼리로 분할합니다.
  • 기대치를 사용하여 중간 결과의 유효성을 검사합니다.
  • 유지할 필요가 없는 결과에 대한 스토리지 및 컴퓨팅 비용을 줄입니다. 테이블은 구체화되므로 추가 계산 및 스토리지 리소스가 필요합니다.

다음과 같은 경우 구체화된 뷰를 사용하는 것이 좋습니다.

  • 여러 다운스트림 쿼리가 테이블을 사용하는 경우. 뷰는 요청 시 계산되므로 뷰를 쿼리할 때마다 뷰가 다시 계산됩니다.
  • 다른 파이프라인, 작업 또는 쿼리는 테이블을 사용합니다. 뷰는 구체화되지 않으므로 동일한 파이프라인에서만 사용할 수 있습니다.
  • 개발 중에 쿼리 결과를 보려는 경우. 테이블은 구체화되고 파이프라인 외부에서 보고 쿼리할 수 있으므로 개발 중에 테이블을 사용하면 계산의 정확성을 검증하는 데 도움이 될 수 있습니다. 검증 후 구체화할 필요가 없는 쿼리를 뷰로 변환합니다.

다음과 같은 경우 스트리밍 테이블을 사용하는 것이 좋습니다.

  • 쿼리는 지속적으로 또는 증분적으로 증가하는 데이터 원본에 대해 정의됩니다.
  • 쿼리 결과는 증분 방식으로 계산되어야 합니다.
  • 파이프라인에는 높은 처리량과 짧은 대기 시간이 필요합니다.

참고 항목

스트리밍 테이블은 항상 스트리밍 원본에 대해 정의됩니다. 스트리밍 원본을 APPLY CHANGES INTO 사용하여 CDC 피드에서 업데이트를 적용할 수도 있습니다. APPLY CHANGES API: Delta Live Tables을 사용하여 변경 데이터 캡처 간소화를 참조하세요.

대상 스키마에서 테이블 제외

외부 사용을 위한 것이 아닌 중간 테이블을 계산해야 하는 경우 키워드를 사용하여 TEMPORARY 스키마에 게시되지 않도록 할 수 있습니다. 임시 테이블은 여전히 Delta Live Tables 의미 체계에 따라 데이터를 저장하고 처리하지만 현재 파이프라인 외부에서 액세스해서는 안 됩니다. 임시 테이블은 생성되는 파이프라인의 수명 동안 유지됩니다. 다음 구문을 사용하여 임시 테이블을 선언합니다.

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Python

@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

단일 파이프라인에서 스트리밍 테이블 및 구체화된 뷰 결합

스트리밍 테이블은 Apache Spark 구조적 스트리밍의 처리 보장을 상속하며 추가 전용 데이터 원본에서 쿼리를 처리하도록 구성됩니다. 여기서 새 행은 항상 수정되지 않고 원본 테이블에 삽입됩니다.

참고 항목

기본적으로 스트리밍 테이블에는 추가 전용 데이터 원본이 필요하지만 스트리밍 원본이 업데이트 또는 삭제가 필요한 다른 스트리밍 테이블인 경우 skipChangeCommits 플래그를 사용하여 이 동작을 재정의할 수 있습니다.

일반적인 스트리밍 패턴에는 원본 데이터를 수집하여 파이프라인에서 초기 데이터 세트를 만드는 작업이 포함됩니다. 이러한 초기 데이터 세트는 일반적으로 브론즈 테이블이라고 하며 간단한 변환을 수행하는 경우가 많습니다.

반면, 일반적으로 골드 테이블이라고 하는 파이프라인의 최종 테이블에는 종종 복잡한 집계 또는 작업 대상에서 읽기가 APPLY CHANGES INTO 필요합니다. 이러한 작업은 기본적으로 추가가 아닌 업데이트를 만들기 때문에 스트리밍 테이블에 대한 입력으로 지원되지 않습니다. 이러한 변환은 구체화된 뷰에 더 적합합니다.

스트리밍 테이블과 구체화된 뷰를 단일 파이프라인으로 혼합하여 파이프라인을 간소화하고, 비용이 많이 드는 원시 데이터 수집 또는 다시 처리를 방지하고, SQL의 모든 기능을 통해 효율적으로 인코딩되고 필터링된 데이터 세트를 통해 복잡한 집계를 계산할 수 있습니다. 다음 예는 이러한 유형의 혼합 처리를 보여 줍니다.

참고 항목

이러한 예제에서는 자동 로더를 사용하여 클라우드 스토리지에서 파일을 로드합니다. Unity 카탈로그를 사용하는 파이프라인에서 자동 로더를 사용하여 파일을 로드하려면 외부 위치를 사용해야 합니다. Delta Live Tables과 함께 Unity 카탈로그를 사용하는 방법에 대해 자세히 알아보려면 Delta Live Tables 파이프라인과 함께 Unity 카탈로그 사용을 참조하세요.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("LIVE.streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.readStream.table("LIVE.streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

자동 로더를 사용하여 Azure Storage에서 JSON 파일을 증분 방식으로 수집하는 방법에 대해 자세히 알아봅니다.

스트림 정적 조인

스트림 정적 조인은 주로 정적 차원 테이블로 추가 전용 데이터의 연속 스트림을 비정규화하는 경우에 적합합니다.

각 파이프라인 업데이트에서 스트림의 새 레코드는 정적 테이블의 최신 스냅샷과 조인됩니다. 스트리밍 테이블의 해당 데이터를 처리한 후 정적 테이블에서 레코드를 추가하거나 업데이트하는 경우 전체 새로 고침이 수행되지 않는 한 결과 레코드는 다시 계산되지 않습니다.

트리거된 실행을 위해 구성된 파이프라인에서 정적 테이블은 업데이트가 시작된 시점의 결과를 반환합니다. 연속 실행을 위해 구성된 파이프라인에서 테이블이 업데이트를 처리할 때마다 최신 버전의 정적 테이블이 쿼리됩니다.

다음은 스트림 정적 조인의 예입니다.

Python

@dlt.table
def customer_sales():
  return spark.readStream.table("LIVE.sales").join(spark.readStream.table("LIVE.customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

집계를 효율적으로 계산

스트리밍 테이블을 사용하여 개수, 최소, 최대 또는 합계와 같은 간단한 분산 집계와 평균 또는 표준 편차와 같은 대수 집계를 증분 방식으로 계산할 수 있습니다. Databricks는 절이 있는 쿼리와 같이 그룹 수가 제한된 쿼리에 대해 증분 집계를 GROUP BY country 권장합니다. 업데이트할 때마다 새 입력 데이터만 읽습니다.

증분 집계를 수행하는 Delta Live Tables 쿼리를 작성하는 방법에 대한 자세한 내용은 워터마크를 사용하여 창에 표시된 집계 수행을 참조 하세요.

Delta Live Tables 파이프라인에서 MLflow 모델 사용

참고 항목

Unity 카탈로그 사용 파이프라인에서 MLflow 모델을 사용하려면 채널을 사용하도록 파이프라인을 preview 구성해야 합니다. 채널을 사용 current 하려면 Hive 메타스토어에 게시하도록 파이프라인을 구성해야 합니다.

Delta Live Tables 파이프라인에서 MLflow 학습 모델을 사용할 수 있습니다. MLflow 모델은 Azure Databricks에서 변환으로 처리됩니다. 즉, Spark DataFrame 입력에 따라 작동하고 결과를 Spark DataFrame으로 반환합니다. Delta Live Tables는 DataFrames에 대한 데이터 세트를 정의하므로 MLflow를 사용하는 Apache Spark 워크로드를 몇 줄의 코드만으로 델타 라이브 테이블로 변환할 수 있습니다. MLflow에 대한 자세한 내용은 MLflow를 사용한 ML 수명 주기 관리를 참조하세요.

MLflow 모델을 호출하는 Python Notebook이 이미 있는 경우 데코레이터를 사용하고 @dlt.table 변환 결과를 반환하도록 함수가 정의되도록 하여 이 코드를 Delta Live Tables에 적용할 수 있습니다. Delta Live Tables는 기본적으로 MLflow를 설치하지 않으므로 MLFlow 라이브러리 %pip install mlflow 를 전자 필기장 맨 위에 설치했는지 mlflow dlt 확인합니다. Delta Live Tables 구문에 대한 소개는 Python을 사용하여 파이프라인 코드 개발을 참조 하세요.

Delta Live Tables에서 MLflow 모델을 사용하려면 다음 단계를 완료합니다.

  1. MLflow 모델의 실행 ID 및 모델 이름을 가져옵니다. 실행 ID와 모델 이름은 MLflow 모델의 URI를 구성하는 데 사용됩니다.
  2. URI를 사용하여 MLflow 모델을 로드할 Spark UDF를 정의합니다.
  3. MLflow 모델을 사용하려면 테이블 정의에서 UDF를 호출합니다.

다음 예제에서는 이 패턴의 기본 구문을 보여 줍니다.

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

전체 예제로, 다음 코드는 대출 위험 데이터에 대해 학습된 MLflow 모델을 로드하는 Spark UDF를 loaded_model_udf 정의합니다. 예측을 만드는 데 사용되는 데이터 열은 UDF에 인수로 전달됩니다. 이 테이블 loan_risk_predictions 은 .의 loan_risk_input_data각 행에 대한 예측을 계산합니다.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

수동 삭제 또는 업데이트 유지

Delta Live Tables를 사용하면 테이블에서 레코드를 수동으로 삭제하거나 업데이트하고 새로 고침 작업을 수행하여 다운스트림 테이블을 다시 계산할 수 있습니다.

기본적으로 Delta Live Tables는 파이프라인이 업데이트될 때마다 입력 데이터를 기반으로 테이블 결과를 다시 계산하므로 삭제된 레코드가 원본 데이터에서 다시 로드되지 않도록 해야 합니다. pipelines.reset.allowed 테이블 속성을 설정하면 false 테이블에 대한 새로 고침이 방지되지만 테이블에 대한 증분 쓰기 또는 새 데이터가 테이블로 흐르는 것을 방지하지는 않습니다.

다음 다이어그램에서는 두 개의 스트리밍 테이블을 사용하는 예제를 보여 줍니다.

  • raw_user_table 원본에서 원시 사용자 데이터를 수집합니다.
  • bmi_tableraw_user_table의 체중과 키를 사용하여 BMI 점수를 점진적으로 계산합니다.

에서 사용자 레코드 raw_user_table 를 수동으로 삭제하거나 업데이트하고 bmi_table.

데이터 보존 다이어그램

다음 코드에서는 의도한 변경 내용이 시간에 따라 유지되지만 파이프라인 업데이트가 실행될 때 다운스트림 테이블이 다시 계산되도록 전체 새로 고침 raw_user_table 을 사용하지 않도록 테이블 속성을 false 설정하는 pipelines.reset.allowed 방법을 보여 줍니다.

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);