워터마크를 적용하여 데이터 처리 임계값 제어
이 문서에서는 워터마크의 기본 개념을 소개하고 일반적인 상태 저장 스트리밍 작업에서 워터마크 사용에 관한 권장 사항을 제공합니다. 메모리 문제를 야기시켜 장기 실행 스트리밍 작업 중에 처리 대기 시간이 증가시킬 수 있으므로 상태 저장 스트리밍 작업에 워터마크를 적용하여 상태로 유지되는 데이터의 양이 무한히 확장하지 않게 해야 합니다.
워터마크란?
구조적 스트리밍은 워터마크를 사용하여 지정된 상태 엔터티에 대해 업데이트를 계속 처리하는 기간의 임계값을 제어합니다. 상태 엔터티의 일반적인 예는 다음과 같습니다.
- 시간 창에 대한 집계입니다.
- 두 스트림을 조인할 때 생기는 고유 키입니다.
워터마크를 선언할 때 스트리밍 데이터 프레임에 타임스탬프 필드와 워터마크 임계값을 지정합니다. 새 데이터가 도착하면 상태 관리자는 지정된 필드에서 가장 최근 타임스탬프를 추적하고 대기 시간 임계값 내의 모든 레코드를 처리합니다.
다음 예제에서는 창 개수에 10분 워터마크 임계값을 적용합니다.
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
이 예에서는 다음이 적용됩니다.
-
event_time
열은 10분 워터마크와 5분 연속 창을 정의하는 데 사용됩니다. - 겹치지 않는 각 5분 창에 대해 관찰된 각
id
창에 대해 개수가 수집됩니다. - 각 개수에 대한 상태 정보는 창의 끝이 관찰된 최신
event_time
보다 10분 이전까지 유지됩니다.
Important
워터마크 임계값은 지정된 임계값 내에 도착하는 레코드가 정의된 쿼리의 의미 체계에 따라 처리되도록 보장합니다. 지정된 임계값을 벗어나는 지연 도착 레코드는 여전히 쿼리 메트릭을 사용하여 처리될 수 있지만 이것이 보장되지는 않습니다.
워터마크는 처리 시간과 처리량에 어떻게 영향을 주나요?
워터마크는 출력 모드와 상호 작용하여 데이터가 싱크에 기록되는 시점을 제어합니다. 워터마크는 처리할 상태 정보의 총 양을 줄이기 때문에 효율적인 상태 저장 스트리밍 처리량을 위해 워터마크를 효과적으로 사용하는 것이 중요합니다.
참고 항목
모든 상태 저장 작업에 대해 모든 출력 모드가 지원되는 것은 아닙니다.
창 집계에 대한 워터마크 및 출력 모드
다음 표에서는 워터마크가 정의된 타임스탬프에서 집계가 있는 쿼리에 대한 처리를 자세히 설명합니다.
출력 모드 | 동작 |
---|---|
Append | 워터마크 임계값을 통과하면 행이 대상 테이블에 기록됩니다. 모든 쓰기는 대기 시간 임계값에 따라 지연됩니다. 임계값을 통과하면 이전 집계 상태가 삭제됩니다. |
업데이트 | 행은 결과가 계산될 때 대상 테이블에 기록되며 새 데이터가 도착하면 업데이트 및 덮어쓸 수 있습니다. 임계값을 통과하면 이전 집계 상태가 삭제됩니다. |
Complete | 집계 상태가 삭제되지 않습니다. 대상 테이블은 각 트리거를 사용하여 다시 작성됩니다. |
스트림-스트림 조인에 대한 워터마크 및 출력
여러 스트림 간의 조인은 추가 모드만 지원하며 일치하는 레코드는 검색된 각 일괄 처리로 기록됩니다. 내부 조인의 경우 Databricks는 각 스트리밍 데이터 원본에 워터마크 임계값을 설정하는 것이 좋습니다. 이렇게 하면 이전 레코드의 상태 정보를 버릴 수 있습니다. 워터마크가 없으면 구조적 스트리밍은 조인의 양쪽에서 모든 키를 각 트리거와 조인하려고 시도합니다.
구조적 스트리밍에는 외부 조인을 지원하는 특별한 의미 체계가 있습니다. 일치하지 않게 된 후 키를 null 값으로 작성해야 하는 시점을 나타내기 때문에 외부 조인에는 워터마크가 필수입니다. 외부 조인은 데이터 처리 중에 일치하지 않는 레코드를 기록하는 데 유용할 수 있지만 조인은 테이블에 추가 작업으로만 쓰기 때문에 대기 시간 임계값이 경과할 때까지 이 누락된 데이터는 기록되지 않습니다.
구조적 스트리밍에서 여러 워터마크 정책을 사용하여 지연 데이터 임계값 제어
여러 구조적 스트리밍 입력을 사용하는 경우 늦게 도착하는 데이터에 대한 허용 오차 임계값을 제어하도록 여러 워터마크를 설정할 수 있습니다. 워터마크를 구성하면 상태 정보를 제어하고 대기 시간에 영향을 미칠 수 있습니다.
스트리밍 쿼리에 여러 입력 스트림이 통합되거나 함께 조인되어 있을 수 있습니다. 입력 스트림마다 상태 저장 작업에 허용되어야 하는 지연 데이터의 임계값이 다를 수 있습니다. 각 입력 스트림에서 withWatermarks("eventTime", delay)
를 사용하여 이러한 임계값을 지정합니다. 다음은 스트림-스트림 조인을 사용하는 예제 쿼리입니다.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
쿼리를 실행하는 동안 구조적 스트리밍은 각 입력 스트림에 표시되는 최대 이벤트 시간을 개별적으로 추적하고, 해당 지연에 따라 워터마크를 계산하고, 상태 저장 작업에 사용할 단일 전역 워터마크를 선택합니다. 기본적으로 최소값은 스트림 중 하나가 다른 스트림에 뒤처질 경우 실수로 너무 늦게 삭제되지 않도록 하기 때문에 전역 워터마크로 선택됩니다(예: 스트림 중 하나가 업스트림 오류로 인해 데이터 수신을 중지함). 즉, 전역 워터마크는 가장 느린 스트림의 속도로 안전하게 이동하고 그에 따라 쿼리 출력이 지연됩니다.
더 빠른 결과를 얻으려면 SQL 구성 spark.sql.streaming.multipleWatermarkPolicy
max
설정하여 전역 워터마크로 최대값을 선택하도록 여러 워터마크 정책을 설정할 수 있습니다(기본값은 min
). 이렇게 하면 글로벌 워터마크가 가장 빠른 스트림의 속도로 움직이게 합니다. 그러나 이 구성은 가장 느린 스트림에서 데이터를 삭제합니다. 따라서 이 구성은 신중하게 사용하는 것이 좋습니다.
워터마크 내에 중복 항목 삭제
Databricks Runtime 13.3 LTS 이상에서는 고유 식별자를 사용하여 워터마크 임계값 내에서 레코드를 중복 제거할 수 있습니다.
구조적 스트리밍은 정확히 한 번 처리 보장을 제공하지만 데이터 원본에서 레코드를 자동으로 중복 제거하지는 않습니다.
dropDuplicatesWithinWatermark
사용하여 지정된 필드에 대한 레코드를 중복 제거하여 일부 필드가 다르더라도(예: 이벤트 시간 또는 도착 시간) 스트림에서 중복 항목을 제거할 수 있습니다.
지정된 워터마크 내에 도착하는 중복 레코드는 삭제됩니다. 이 보장은 한 방향으로만 엄격하며 지정된 임계값을 벗어나는 중복 레코드도 삭제될 수 있습니다. 모든 중복 항목을 제거하려면 중복된 이벤트 간의 최대 타임스탬프 차이보다 더 긴 워터마크의 지연 임계값을 설정해야 합니다.
다음 예제와 같이 dropDuplicatesWithinWatermark
메서드를 사용하려면 워터마크를 지정해야 합니다.
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])