다음을 통해 공유


Delta Live Tables에서 워터마크와 함께 상태 저장 처리 Optimize

상태 데이터를 효과적으로 관리하기 위해, 집계, 조인 및 중복 제거를 포함하는 Delta Live Tables의 상태 저장 스트림 처리 시 워터마크를 사용하세요. 이 문서에서는 Delta Live Tables 쿼리에서 워터마크를 사용하는 방법을 설명하고 권장 작업의 예를 포함합니다.

참고 항목

집계를 수행하는 쿼리들이 증분 방식으로 처리되고, 각 update마다 완전히 다시 계산되지 않도록 하려면 워터마크를 사용해야 합니다.

watermark무엇입니까?

스트림 처리에서 watermark 집계와 같은 상태 저장 작업을 수행할 때 데이터를 처리하기 위한 시간 기반 임계값을 정의할 수 있는 Apache Spark 기능입니다. 데이터 도착은 임계값에 도달할 때까지 처리되며, 이때 임계값으로 정의된 window 시간이 닫힙니다. 워터마크는 주로 더 큰 데이터 세트 또는 장기 실행 처리를 처리할 때 쿼리 처리 중에 문제를 방지하는 데 사용할 수 있습니다. 이러한 문제에는 결과 생성 시 대기 시간이 높고 처리 중 상태로 유지되는 데이터의 양 때문에 OOM(메모리 부족) 오류도 포함될 수 있습니다. 스트리밍 데이터는 본질적으로 순서가 지정되지 않으므로 워터마크는 시간window 집계와 같은 올바른 계산 작업도 지원합니다.

스트림 처리 에서 워터마크를 사용하는 방법에 대한 자세한 내용은 Apache Spark 구조적 스트리밍 의 워터마킹 및 워터마크 적용을 참조하여 데이터 처리 임계값을 제어합니다.

watermark정의하려면 어떻게 해야 할까요?

타임스탬프 필드와 지연 데이터 도착하는 시간 임계값을 나타내는 값을 지정하여 watermark 정의합니다. 데이터는 정의된 시간 임계값 이후에 도착하는 경우 늦게 간주됩니다. 예를 들어 임계값이 10분으로 정의된 경우 10분 임계값 이후에 도착하는 레코드가 삭제될 수 있습니다.

정의된 임계값 이후에 도착하는 레코드는 삭제될 수 있으므로 대기 시간 및 정확성 요구 사항을 충족하는 임계값을 선택하는 것이 중요합니다. 임계값을 더 작게 선택하면 레코드가 더 빨리 내보내지지만 늦은 레코드가 삭제될 가능성이 더 높다는 의미이기도 합니다. 임계값이 클수록 대기 시간이 길어지지만 데이터의 완전성이 높아질 수 있습니다. 상태 크기가 커지므로 임계값이 클수록 추가 컴퓨팅 리소스가 필요할 수도 있습니다. 임계값은 데이터 및 처리 요구 사항에 따라 달라지므로 최적의 임계값을 결정하는 데 처리 테스트 및 모니터링이 중요합니다.

Python에서 withWatermark() 함수를 사용하여 watermark을 정의합니다. SQL에서 WATERMARK 절을 사용하여 watermark을 정의합니다.

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

스트림 스트림 조인에 워터마크 사용

스트림 스트림 조인의 경우, join의 양쪽 모두에 watermark을 정의하고 시간 간격 절을 정의해야 합니다. 각 join 원본은 데이터의 불완전한 관점을 가지고 있기 때문에, 스트리밍 엔진이 더 이상 일치를 만들 수 없을 때를 이해하도록 시간 간격 절이 필요합니다. 시간 간격 절은 워터마크를 정의하는 데 사용되는 것과 동일한 필드를 사용해야 합니다.

각 스트림에 워터마크에 대해 서로 다른 임계값이 필요한 경우가 있을 수 있으므로 스트림에 동일한 임계값이 필요하지 않습니다. 데이터 누락을 방지하기 위해 스트리밍 엔진은 가장 느린 스트림에 따라 하나의 전역 watermark 유지 관리합니다.

다음 예에서는 광고 노출 스트림과 광고 클릭 스트림을 조인합니다. 이 예제에서는 노출 후 3분 이내에 클릭이 발생해야 합니다. 3분 간격이 지나면 더 이상 일치시킬 수 없는 상태의 행이 삭제됩니다.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

워터마크를 사용하여 창이 있는 집계 수행

스트리밍 데이터에 대한 일반적인 상태 저장 작업은 창이 있는 집계입니다. 윈도우 집계는 그룹화된 집계와 유사합니다. 그러나 집계 values은 정의된 window에 속하는 행의 set에 대해 반환됩니다.

window 특정 길이로 정의할 수 있으며 해당 window일부인 모든 행에서 집계 작업을 수행할 수 있습니다. Spark Streaming은 다음 세 가지 유형의 창을 지원합니다.

  • 연속(고정) 창: 고정 크기, 겹치지 않는 일련의 연속 시간 간격입니다. 입력 레코드는 하나의 window에 속합니다.
  • 슬라이딩 윈도우: 연속 창과 비슷하게 슬라이딩 윈도우는 고정 크기이지만 창이 겹칠 수 있으며 레코드가 여러 창으로 떨어질 수 있습니다.

데이터가 window의 끝을 지나 watermark의 길이에 도달하면, window에 대해 새로운 데이터가 더 이상 허용되지 않으며, 집계 결과가 출력되고 window의 상태가 삭제됩니다.

다음 예제에서는 window라는 고정된 값을 사용하여 매 5분마다 노출 수의 합계를 계산합니다. 이 예제에서 select 절은 별칭 impressions_window사용하고 window 자체는 GROUP BY 절의 일부로 정의됩니다. window는 이 예제에서 clickTimestampcolumn인 watermark와 동일한 타임스탬프 column을 기반으로 해야 합니다.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

시간당 고정 기간 동안 수익을 계산하는 Python의 유사한 예:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

스트리밍 레코드 중복 제거

구조적 스트리밍은 정확히 한 번만 처리할 수 있지만 데이터 원본에서 레코드를 자동으로 중복 제거하지는 않습니다. 예를 들어 많은 메시지 큐에 한 번 이상의 보장이 있으므로 이러한 메시지 큐 중 하나에서 읽을 때 중복 레코드가 필요합니다. 함수를 dropDuplicatesWithinWatermark() 사용하여 지정된 필드에 대한 레코드를 중복 제거하여 일부 필드가 다른 경우에도(예: 이벤트 시간 또는 도착 시간) 스트림에서 중복 항목을 제거할 수 있습니다. dropDuplicatesWithinWatermark() 함수를 사용하려면 watermark 지정해야 합니다. watermark 지정된 시간 범위 내에 도착하는 모든 중복 데이터가 삭제됩니다.

순서가 잘못된 데이터로 인해 watermark 값이 잘못 진행되므로 정렬된 데이터가 중요합니다. 그런 다음 이전 데이터가 도착하면 늦게 삭제된 것으로 간주됩니다. withEventTimeOrder 옵션을 사용하여 watermark에 지정된 타임스탬프에 따라 초기 스냅샷을 순서대로 처리합니다. 이 withEventTimeOrder 옵션은 데이터 세트를 정의하는 코드 또는 파이프라인 설정에서 선언할 수 있습니다spark.databricks.delta.withEventTimeOrder.enabled. 예시:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

참고 항목

withEventTimeOrder 옵션은 Python에서만 지원됩니다.

다음 예제에서는 데이터가 clickTimestamp에 따라 정렬되며, 서로 5초 이내에 도착하고 중복된 userIdclickAdIdcolumns를 포함한 레코드는 삭제됩니다.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

상태 저장 처리를 위한 파이프라인 구성 Optimize

프로덕션 문제와 과도한 대기 시간을 방지하기 위해 Databricks는 상태 저장 스트림 처리에 대해 RocksDB 기반 상태 관리를 사용하도록 설정하는 것이 좋습니다. 특히 처리 시 많은 양의 중간 상태를 저장해야 하는 경우

세버리스 파이프라인은 상태 저장소 구성을 자동으로 관리합니다.

파이프라인을 배포하기 전에 다음 구성을 설정하여 RocksDB 기반 상태 관리를 사용하도록 설정할 수 있습니다.

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

RocksDB에 대한 구성 권장 사항을 포함하여 RocksDB 상태 저장소에 대한 자세한 내용은 Azure Databricks에서 RocksDB 상태 저장소 구성을 참조하세요.