다음을 통해 공유


Delta Live Tables 사용하여 데이터 변환

이 문서에서는 Delta Live Tables 사용하여 데이터 세트에 대한 변환을 선언하고 쿼리 논리를 통해 레코드를 처리하는 방법을 지정하는 방법을 설명합니다. 또한 Delta Live Tables 파이프라인을 빌드하기 위한 일반적인 변환 패턴의 예제도 포함되어 있습니다.

DataFrame을 반환하는 모든 쿼리에 대해 데이터 세트를 정의할 수 있습니다. Apache Spark 내장 작업, UDF, 사용자 지정 논리 및 MLflow 모델을 Delta Live Tables 파이프라인의 변환으로 사용할 수 있습니다. 데이터를 Delta Live Tables 파이프라인으로 수집한 후 업스트림 소스에 대해 새 데이터 세트를 정의하여 새 스트리밍 tables, 구체화된 views및 views데이터를 만들 수 있습니다.

Delta Live 상태 저장 처리를 효과적으로 수행하는 방법을 배우려면 Delta Live 의 워터마크와 함께 상태 저장 처리에 대해 확인하십시오.

views, 구체화된 views및 스트리밍 tables 사용하는 경우

파이프라인 쿼리를 구현할 때 최상의 데이터 세트 형식을 선택하여 효율적이고 유지 관리가 가능한지 확인합니다.

보기를 사용하여 다음을 수행하는 것이 좋습니다.

  • 원하는 크거나 복잡한 쿼리를 관리하기 쉬운 쿼리로 분할합니다.
  • 기대치를 사용하여 중간 결과의 유효성을 검사합니다.
  • 유지할 필요가 없는 결과에 대한 스토리지 및 컴퓨팅 비용을 줄입니다. tables 구체화되므로 추가 계산 및 스토리지 리소스가 필요합니다.

다음과 같은 경우 구체화된 뷰를 사용하는 것이 좋습니다.

  • 여러 다운스트림 쿼리는 table을 소비합니다. views 요청 시 계산되므로 뷰를 쿼리할 때마다 뷰가 다시 계산됩니다.
  • 다른 파이프라인이나 작업, 쿼리는 table을 사용합니다. views가 실체화되지 않으므로 동일한 파이프라인에서만 사용할 수 있습니다.
  • 개발 중에 쿼리 결과를 보려는 경우. tables 구체화되고 파이프라인 외부에서 보고 쿼리할 수 있으므로 개발 중에 tables 사용하면 계산의 정확성을 확인하는 데 도움이 될 수 있습니다. 유효성을 검사한 후 구체화할 필요가 없는 쿼리를 views으로 변환합니다.

다음과 같은 경우 스트리밍 table 사용하는 것이 좋습니다.

  • 쿼리는 지속적으로 또는 증분적으로 증가하는 데이터 원본에 대해 정의됩니다.
  • 쿼리 결과는 증분 방식으로 계산되어야 합니다.
  • 파이프라인에는 높은 처리량과 짧은 대기 시간이 필요합니다.

참고 항목

스트리밍 tables는 언제나 스트리밍 소스를 기준으로 정의됩니다. 스트리밍 원본을 APPLY CHANGES INTO 사용하여 CDC 피드에서 업데이트를 적용할 수도 있습니다. 변경 내용 적용 API: Delta Live Tables를 사용하여 변경 데이터 캡처를 간소화하는 방법에 대해 알아보세요.

tables을(를) schema에서 제외하십시오.

외부 사용을 위한 것이 아닐 경우 중간 tables를 계산해야 한다면, TEMPORARY 키워드를 사용하여 그것들이 schema에 게시되지 않도록 할 수 있습니다. 임시 tables 여전히 Delta Live Tables 의미 체계에 따라 데이터를 저장하고 처리하지만 현재 파이프라인 외부에서 액세스해서는 안 됩니다. 임시 table는 이를 만드는 파이프라인의 수명 동안 유지됩니다. 다음 구문을 사용하여 임시 tables선언합니다.

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Python

@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

단일 파이프라인에서 스트리밍 tables 및 구체화 views 합치기

스트리밍 tables은 Apache Spark 구조적 스트리밍의 처리 보장을 기반으로 하며, 추가 전용 데이터 원본에서 쿼리를 처리하도록 구성됩니다. where 새 행은 원본 table에 항상 삽입되며, 수정되지 않습니다.

참고 항목

기본적으로 스트리밍 tables에는 추가 전용 데이터 소스가 필요하지만, 스트리밍 소스가 업데이트 또는 삭제가 필요한 또 다른 스트리밍 table인 경우에는, 이 동작을 skipChangeCommits 플래그 로 재정의할 수 있습니다.

일반적인 스트리밍 패턴에는 원본 데이터를 수집하여 파이프라인에서 초기 데이터 세트를 만드는 작업이 포함됩니다. 이러한 초기 데이터 세트는 일반적으로 브론즈 tables 호출되며 종종 간단한 변환을 수행합니다.

반면, 일반적으로 골드 tables으로 불리는 파이프라인의 최종 tables는 종종 복잡한 집계나 APPLY CHANGES INTO 작업의 대상에서 읽기를 필요로 합니다. 이러한 작업은 기본적으로 추가가 아닌 업데이트를 만들기 때문에 스트리밍 tables의 입력으로 지원되지 않습니다. 이러한 변환은 views가 구현된 경우에 더 적합합니다.

스트리밍 tables과 구현된 views을 하나의 단일 파이프라인으로 혼합함으로써 파이프라인을 간소화하고, 비용이 많이 드는 원시 데이터의 재수집 또는 재처리를 피하고, SQL의 모든 기능을 사용하여 효율적으로 인코딩되고 필터링된 데이터 세트를 통해 복잡한 집계를 계산할 수 있습니다. 다음 예는 이러한 유형의 혼합 처리를 보여 줍니다.

참고 항목

이러한 예제에서는 자동 로더를 사용하여 클라우드 스토리지에서 파일을 로드합니다. Unity Catalog 사용하도록 설정된 파이프라인에서 자동 로더를 사용하여 파일을 로드하려면 외부 위치를 사용해야 합니다. Unity Catalog을(를) Delta Live Tables과 함께 사용하는 방법에 대한 자세한 내용은 의 "Unity Catalog을(를) Delta Live Tables 파이프라인와 함께 사용하기"를 참조하세요.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("LIVE.streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.readStream.table("LIVE.streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

자동 로더를 사용하여 Azure Storage에서 JSON 파일을 증분 방식으로 수집하는 방법에 대해 자세히 알아봅니다.

스트림 정적 조인

스트림 정적 조인은 주로 정적 차원 table추가 전용 데이터의 연속 스트림을 비정규화하는 경우에 적합합니다.

각 파이프라인 update에서 스트림의 새 레코드는 정적 table의 최신 스냅샷과 함께 조인됩니다. 스트리밍 table의 데이터가 처리된 후에 정적 table에서 레코드가 추가되거나 업데이트되는 경우, 전체 refresh가 수행되지 않는 한 결과 레코드는 다시 산출되지 않습니다.

트리거된 실행을 위해 구성된 파이프라인에서, 정적 table은 update이 시작된 시점의 결과를 반환합니다. 연속 실행을 위해 구성된 파이프라인에서 tableupdate처리할 때마다 정적 table 최신 버전이 쿼리됩니다.

다음은 스트림-정적 join의 예입니다.

Python

@dlt.table
def customer_sales():
  return spark.readStream.table("LIVE.sales").join(spark.readStream.table("LIVE.customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

집계를 효율적으로 계산

스트리밍 tables 사용하여 개수, 최소, 최대 또는 합계와 같은 간단한 분산 집계와 평균 또는 표준 편차와 같은 대수 집계를 증분 방식으로 계산할 수 있습니다. Databricks는 절이 있는 쿼리와 같이 그룹 수가 제한된 쿼리에 대해 증분 집계를 GROUP BY country 권장합니다. 오직 새 입력 데이터만 각 update에서 읽습니다.

증분 집계를 수행하는 Delta Live Tables 쿼리 작성에 대한 자세한 내용은 워터마크사용하여 창이 지정된 집계 수행을 참조하세요.

Delta Live Tables 파이프라인에서 MLflow 모델 사용

참고 항목

Unity Catalog사용 파이프라인에서 MLflow 모델을 사용하려면 preview 채널을 사용하도록 파이프라인을 구성해야 합니다. 채널을 사용 current 하려면 Hive 메타스토어에 게시하도록 파이프라인을 구성해야 합니다.

Delta Live Tables 파이프라인에서 MLflow 학습 모델을 사용할 수 있습니다. MLflow 모델은 Azure Databricks에서 변환으로 처리됩니다. 즉, Spark DataFrame 입력에 따라 작동하고 결과를 Spark DataFrame으로 반환합니다. Delta Live Tables DataFrames에 대한 데이터 세트를 정의하므로 몇 줄의 코드만으로 MLflow를 사용하는 Apache Spark 워크로드를 Delta Live Tables 변환할 수 있습니다. MLflow에 대한 자세한 내용은 gen AI 앱 및 모델 수명 주기대한 MLflow를 참조하세요.

Python 노트북에서 MLflow 모델을 이미 호출하고 있는 경우, Delta Live Tables에 코드를 적응하기 위해 @dlt.table 데코레이터를 사용하고 함수가 변환 결과를 반환하도록 정의해야 합니다. Delta Live Tables은 기본적으로 MLflow를 설치하지 않으므로, %pip install mlflow에서 MLflow 라이브러리가 설치되었는지 확인하고, 노트북 맨 위에 mlflowdlt을 가져왔는지 확인합니다. Delta Live Tables 구문에 대한 소개는 Python사용하여 파이프라인 코드 개발을 참조하세요.

Delta Live TablesMLflow 모델을 사용하려면 다음 단계를 완료합니다.

  1. MLflow 모델의 실행 ID 및 모델 이름을 가져옵니다. 실행 ID와 모델 이름은 MLflow 모델의 URI를 구성하는 데 사용됩니다.
  2. URI를 사용하여 MLflow 모델을 로드할 Spark UDF를 정의합니다.
  3. MLflow 모델을 사용하려면 table 정의에서 UDF를 호출합니다.

다음 예제에서는 이 패턴의 기본 구문을 보여 줍니다.

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

전체 예제로, 다음 코드는 대출 위험 데이터에 대해 학습된 MLflow 모델을 로드하는 Spark UDF를 loaded_model_udf 정의합니다. 예측을 만드는 데 사용되는 데이터 columns UDF에 인수로 전달됩니다. table loan_risk_predictions loan_risk_input_data의 각 행 예측을 계산합니다.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

수동 삭제 또는 업데이트 유지

Delta Live Tables 사용하면 table 레코드를 수동으로 삭제하거나 update 레코드를 삭제할 수 있으며, refresh 작업을 수행하여 다운스트림 tables를 다시 계산할 수 있습니다.

기본적으로 Delta Live Tables 파이프라인이 업데이트될 때마다 입력 데이터를 기반으로 table 결과를 다시 계산하므로 삭제된 레코드가 원본 데이터에서 다시 로드되지 않도록 해야 합니다. pipelines.reset.allowed table 속성을 false로 설정하면 table의 새로 고침을 방지하지만, tables로의 증분 쓰기나 새로운 데이터가 table로 유입되는 것을 방지하지는 않습니다.

다음 다이어그램은 두 개의 스트리밍 tables을 사용하는 예제를 보여줍니다.

  • raw_user_table 원본에서 원시 사용자 데이터를 수집합니다.
  • bmi_tableraw_user_table의 체중과 키를 사용하여 BMI 점수를 점진적으로 계산합니다.

raw_user_table 사용자 레코드를 수동으로 삭제하거나 updatebmi_table다시 계산하려고 합니다.

데이터 보존 다이어그램

다음 코드는 raw_user_table가 의도한 변경 사항을 시간이 지나도 유지할 수 있도록 하면서 파이프라인 update가 실행될 때 다운스트림 tables가 다시 계산되도록 pipelines.reset.allowedtable 속성을 false로 설정하여 전체 refresh를 비활성화하는 방법을 보여 줍니다.

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);