스트리밍 데이터에 Delta Lake 사용
지금까지 살펴본 데이터는 모두 파일의 정적 데이터였습니다. 그러나 많은 데이터 분석 시나리오에는 거의 실시간으로 처리해야 하는 스트리밍 데이터가 포함됩니다. 예를 들어 IoT(사물 인터넷) 디바이스에서 내보낸 판독값을 캡처하고 테이블에 저장해야 할 수 있습니다.
Spark Structured Streaming
일반적인 스트림 처리 솔루션에는 원본에서 데이터 스트림을 지속적으로 읽고 필요에 따라 처리하여 특정 필드를 선택하고, 값을 집계, 그룹화 또는 기타 방식으로 데이터를 조작하고, 결과를 싱크에 쓰는 작업이 포함됩니다.
Spark에는 처리를 위해 스트리밍 데이터가 캡처되는 무한한 데이터 프레임을 기반으로 하는 API인 Spark 구조적 스트리밍을 통한 데이터 스트리밍 기본 지원이 포함됩니다. Spark 구조적 스트리밍 데이터 프레임은 네트워크 포트, 실시간 메시지 브로커링 서비스(예: Azure Event Hubs 또는 Kafka) 또는 파일 시스템 위치를 포함하여 다양한 종류의 스트리밍 원본에서 데이터를 읽을 수 있습니다.
팁
Spark 구조적 스트리밍에 대한 자세한 내용은 Spark 설명서에서 구조적 스트리밍 프로그래밍 가이드를 참조하세요.
Delta Lake 테이블을 사용한 스트리밍
Delta Lake 테이블을 Spark 구조적 스트리밍의 원본 또는 싱크로 사용할 수 있습니다. 예를 들어 IoT 디바이스에서 실시간 데이터 스트림을 캡처하고 싱크로서 Delta Lake 테이블에 직접 스트림을 쓸 수 있으므로 테이블을 쿼리하여 최근에 스트리밍된 데이터를 볼 수 있습니다. 또는 스트리밍 원본으로서 Delta Lake 테이블을 읽어 테이블에 추가되는 새 데이터를 지속적으로 보고할 수 있습니다.
Delta Lake 테이블을 스트리밍 원본으로 사용
다음 PySpark 예제에서는 Delta Lake 테이블을 사용하여 인터넷 판매 주문의 세부 정보를 저장합니다. 새 데이터가 추가될 때 Delta Lake 테이블 폴더에서 데이터를 읽는 스트림이 만들어집니다.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("/delta/internetorders")
# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
참고
Delta Lake 테이블을 스트리밍 원본으로 사용하는 경우 추가 작업만 스트림에 포함할 수 있습니다. ignoreChanges
또는 ignoreDeletes
옵션을 지정하지 않을 경우 데이터 수정으로 인해 오류가 발생합니다.
데이터를 Delta Lake 테이블에서 스트리밍 데이터 프레임으로 데이터를 읽은 후 Spark 구조적 스트리밍 API를 사용하여 처리할 수 있습니다. 위의 예제에서 데이터 프레임은 단순히 표시됩니다. 하지만 Spark 구조적 스트리밍을 사용하여 임시 창을 통해 데이터를 집계하고(예: 1분마다 주문 수 계산) 근 실시간 시각화를 위해 집계된 결과를 다운스트림 프로세스로 보낼 수 있습니다.
Delta Lake 테이블을 스트리밍 싱크로 사용
다음 PySpark 예제에서는 폴더의 JSON 파일에서 데이터 스트림을 읽습니다. 각 파일의 JSON 데이터에는 {"device":"Dev1","status":"ok"}
형식의 IoT 디바이스 상태가 포함됩니다. 파일이 폴더에 추가될 때마다 새 데이터가 스트림에 추가됩니다. 입력 스트림은 Delta Lake 테이블의 폴더 위치에 델타 형식으로 기록되는 무한대 데이터 프레임입니다.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
참고
checkpointLocation
옵션은 스트림 처리 상태를 추적하는 검사점 파일을 작성하는 데 사용됩니다. 이 파일을 사용하면 스트림 처리가 중단된 지점에서 장애로부터 복구할 수 있습니다.
스트리밍 프로세스가 시작된 후에는 스트리밍 출력이 기록되는 Delta Lake 테이블을 쿼리하여 최신 데이터를 볼 수 있습니다. 예를 들어 다음 코드는 Delta Lake 테이블 폴더에 대한 카탈로그 테이블을 만들고 쿼리합니다.
%%sql
CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';
SELECT device, status
FROM DeviceTable;
Delta Lake 테이블에 기록되는 데이터 스트림을 중지하려면 스트리밍 쿼리의 stop
메서드를 사용할 수 있습니다.
delta_stream.stop()
팁
데이터 스트리밍에 Delta Lake 테이블을 사용하는 방법에 대한 자세한 내용은 Delta Lake 설명서에서 테이블 스트리밍 읽기 및 쓰기 를 참조하세요.