다음을 통해 공유


첫 번째 구조적 스트리밍 워크로드 실행

이 문서에서는 Azure Databricks에서 첫 번째 구조적 스트리밍 쿼리를 실행하는 데 필요한 기본 개념에 대한 코드 예제 및 설명을 제공합니다. 거의 실시간 및 증분 처리 워크로드에 구조적 스트리밍을 사용할 수 있습니다.

구조적 스트리밍은 델타 라이브 테이블의 스트리밍 테이블에 전원을 공급하는 여러 기술 중 하나입니다. Databricks는 모든 새 ETL, 수집 및 구조적 스트리밍 워크로드에 델타 라이브 테이블을 사용하는 것이 좋습니다. 델타 라이브 테이블이란 무엇인가?을 참조하세요.

참고 항목

Delta Live Tables는 스트리밍 테이블을 선언하기 위해 약간 수정된 구문을 제공하지만 스트리밍 읽기 및 변환을 구성하기 위한 일반 구문은 Azure Databricks의 모든 스트리밍 사용 사례에 적용됩니다. 또한 Delta Live Tables는 상태 정보, 메타데이터 및 다양한 구성을 관리하여 스트리밍을 간소화합니다.

자동 로더를 사용하여 개체 스토리지에서 스트리밍 데이터 읽기

다음 예는 cloudFiles를 사용하여 형식과 옵션을 나타내는 자동 로더로 JSON 데이터를 로드하는 방법을 보여줍니다. schemaLocation 옵션을 사용하면 스키마 유추 및 진화가 가능합니다. 다음 코드를 Databricks 노트북 셀에 붙여넣고 셀을 실행하여 raw_df라는 이름의 스트리밍 데이터프레임을 만듭니다.

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Azure Databricks의 다른 읽기 작업과 마찬가지로 스트리밍 읽기를 구성해도 실제로 데이터가 로드되지는 않습니다. 스트림이 시작되기 전에 데이터에 대한 작업을 트리거해야 합니다.

참고 항목

스트리밍 데이터프레임에서 display()를 호출하면 스트리밍 작업이 시작됩니다. 대부분의 구조적 스트리밍 사용 사례의 경우 스트림을 트리거하는 작업은 싱크에 데이터를 작성해야 합니다. 구조적 스트리밍에 대한 프로덕션 고려 사항을 참조하세요.

스트리밍 변환 수행

구조적 스트리밍은 Azure Databricks 및 Spark SQL에서 사용할 수 있는 대부분의 변환을 지원합니다. MLflow 모델을 UDF로 로드하고 스트리밍 예측을 변환으로 만들 수도 있습니다.

다음 코드 예제에서는 Spark SQL 함수를 사용하여 수집된 JSON 데이터를 추가 정보로 보강하는 간단한 변환을 완료합니다.

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

결과 transformed_df에는 각 레코드가 데이터 원본에 도착할 때 로드 및 변환하는 쿼리 지침이 포함되어 있습니다.

참고 항목

구조적 스트리밍은 데이터 원본을 무제한 또는 무한 데이터 세트로 처리합니다. 따라서 일부 변환은 무한 개수의 항목을 정렬해야 하므로 구조적 스트리밍 워크로드에서 지원되지 않습니다.

대부분의 집계 및 많은 조인에는 워터마크, 창 및 출력 모드를 사용하여 상태 정보를 관리해야 합니다. 데이터 처리 임계값을 제어하기 위해 워터마크 적용하기를 참조하세요.

Delta Lake에 증분 일괄 처리 쓰기 수행

다음 예제에서는 지정된 파일 경로 및 검사점을 사용하여 Delta Lake에 씁니다.

Important

구성한 각 스트리밍 기록기에 대해 항상 고유한 검사점 위치를 지정해야 합니다. 검사점은 스트림에 대한 고유 ID를 제공하여 처리된 모든 레코드와 스트리밍 쿼리와 연결된 상태 정보를 추적합니다.

트리거에 대한 availableNow 설정은 구조화된 스트리밍이 소스 데이터 세트에서 이전에 처리되지 않은 모든 레코드를 처리한 다음 종료하도록 지시하므로 스트림이 실행 중일 때 걱정할 필요 없이 다음 코드를 안전하게 실행할 수 있습니다:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

이 예제에서는 데이터 원본에 새 레코드가 도착하지 않으므로 이 코드의 반복 실행은 새 레코드를 수집하지 않습니다.

Warning

구조적 스트리밍 실행은 자동 종료가 컴퓨팅 리소스를 종료하는 것을 방지할 수 있습니다. 예기치 않은 비용을 방지하려면 스트리밍 쿼리를 종료해야 합니다.

Delta Lake에서 데이터를 읽고, 변환하고, Delta Lake에 쓰기

Delta Lake는 구조적 스트리밍을 원본 및 싱크로 사용할 수 있는 광범위한 지원을 제공합니다. 델타 테이블 스트리밍 읽기 및 쓰기참조하세요.

다음 예시 구문에서는 델타 테이블에서 새로운 모든 레코드를 점진적으로 로드하고, 다른 델타 테이블의 스냅샷과 결합한 후, 이를 델타 테이블에 기록하는 과정을 보여 줍니다.

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

원본 테이블을 읽고 대상 테이블 및 지정된 체크포인트 위치에 쓸 수 있도록 적절한 권한을 설정해야 합니다. 데이터 원본 및 싱크에 대한 관련 값을 사용하여 꺾쇠 괄호(<>)로 표시된 모든 매개 변수를 채웁니다.

참고 항목

Delta Live Tables는 Delta Lake 파이프라인을 만들기 위한 완전한 선언적 구문을 제공하고 트리거 및 검사점과 같은 속성을 자동으로 관리합니다. 델타 라이브 테이블이란 무엇인가?참조하세요.

Kafka에서 데이터를 읽고, 변환하고, Kafka에 쓰기

Apache Kafka 및 기타 메시징 버스는 큰 데이터 세트에 사용할 수 있는 가장 낮은 대기 시간의 일부를 제공합니다. Azure Databricks를 사용하여 Kafka에서 수집된 데이터에 변환을 적용한 다음, Kafka에 데이터를 다시 쓸 수 있습니다.

참고 항목

클라우드 개체 스토리지에 데이터를 쓰면 대기 시간 오버헤드가 추가됩니다. Delta Lake의 메시징 버스에서 데이터를 저장하지만 스트리밍 워크로드에 대해 가능한 가장 낮은 대기 시간이 필요한 경우 Databricks는 레이크하우스에 데이터를 수집하고 다운스트림 메시징 버스 싱크에 대해 거의 실시간으로 변환을 적용하도록 별도의 스트리밍 작업을 구성하는 것이 좋습니다.

다음 코드 예제에서는 델타 테이블의 데이터와 조인한 다음 Kafka에 다시 작성하여 Kafka에서 데이터를 보강하는 간단한 패턴을 보여 줍니다.

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Kafka 서비스에 액세스할 수 있도록 적절한 권한이 구성되어 있어야 합니다. 데이터 원본 및 싱크에 대한 관련 값을 사용하여 꺾쇠 괄호(<>)로 표시된 모든 매개 변수를 채웁니다. Apache Kafka 및 Azure Databricks를 사용한 스트림 처리를 참조하세요.