자습서: 엔드투엔드 Lakehouse 분석 파이프라인 실행
이 자습서에서는 Azure Databricks Lakehouse에 대한 엔드투엔드 분석 파이프라인을 설정하는 방법을 보여 줍니다.
Important
이 자습서에서는 대화형 Notebook을 사용하여 Unity 카탈로그 지원 클러스터에서 Python으로 공통 ETL 작업을 수행합니다. Unity Catalog를 사용하지 않는 경우 Azure Databricks에서 첫 번째 ETL 워크로드 실행을 참조하세요.
이 자습서의 태스크
이 문서의 학습을 마치면 다음 개념을 이해할 수 있습니다.
- Unity 카탈로그를 지원하는 컴퓨팅 클러스터 시작
- Databricks Notebook 만들기
- Unity 카탈로그 외부 위치에서 데이터 쓰기 및 읽기
- 자동 로더를 사용하여 Unity 카탈로그 테이블에 대한 증분 데이터 수집 구성
- Notebook 셀을 실행하여 데이터 처리, 쿼리 및 미리 보기
- Databricks 작업으로 Notebook 예약
- Databricks SQL에서 Unity 카탈로그 쿼리
Azure Databricks는 데이터 전문가가 ETL(추출, 변환 및 로드) 파이프라인을 신속하게 개발하고 배포할 수 있도록 지원하는 프로덕션 준비 도구 모음을 제공합니다. 데이터 관리자는 Unity 카탈로그를 사용해서 조직 전체 사용자를 위해 스토리지 자격 증명, 외부 위치 및 데이터베이스 개체를 구성하고 보호할 수 있습니다. 분석가는 Databricks SQL을 사용해서 프로덕션 ETL 워크로드에 사용되는 동일한 테이블에 대해 SQL 쿼리를 실행할 수 있으므로, 대규모 실시간 비즈니스 인텔리전스가 가능합니다.
Delta Live Tables를 사용하여 ETL 파이프라인을 빌드할 수도 있습니다. Databricks는 프로덕션 ETL 파이프라인을 빌드, 배포 및 유지 관리하는 복잡성을 줄이기 위해 Delta Live Tables를 만들었습니다. 자습서: 첫 번째 Delta Live Tables 파이프라인 실행을 참조하세요.
요구 사항
참고 항목
클러스터 제어 권한이 없어도 클러스터 액세스 권한이 있는 한 아래 단계를 대부분 수행할 수 있습니다.
1단계: 클러스터 만들기
탐색 데이터 분석 및 데이터 엔지니어링을 수행하기 위해 명령 실행에 필요한 컴퓨팅 리소스를 제공하도록 클러스터를 만듭니다.
- 사이드바에서 컴퓨팅을 클릭합니다.
- 사이드바에서 새로 만들기를 클릭한 다음 클러스터를 선택합니다. 그러면 새 클러스터 컴퓨팅 페이지가 열립니다.
- 클러스터에 대해 고유한 이름을 지정합니다.
- 단일 노드 라디오 단추를 선택합니다.
- 액세스 모드 드롭다운에서 단일 사용자를 선택합니다.
- 이메일 주소가 단일 사용자 필드에 표시되는지 확인합니다.
- Unity 카탈로그를 사용하려면 원하는 Databricks 런타임 버전 11.1 이상을 선택합니다.
- 클러스터를 만드려면 컴퓨팅 만들기를 클릭합니다.
Databricks 클러스터에 대한 자세한 내용은 컴퓨팅을 참조하세요.
2단계: Databricks Notebook 만들기
작업 영역에서 Notebook을 생성하려면 사이드바의 새로 만들기를 클릭한 다음 Notebook을 클릭합니다. 작업 영역에서 빈 전자 필기장이 열립니다.
Notebook 만들기 및 관리에 대한 자세한 내용은 Notebook 관리를 참조하세요.
3단계: Unity 카탈로그로 관리되는 외부 위치에서 데이터 쓰기 및 읽기
Databricks는 증분 데이터 수집을 위해 자동 로더를 사용할 것을 권장합니다. 자동 로더는 클라우드 개체 스토리지에 도착하는 새 파일을 자동으로 감지하고 처리합니다.
유니티 카탈로그를 사용하여 외부 위치에 대한 보안 액세스를 관리합니다. 외부 위치에 READ FILES
권한이 있는 사용자 또는 서비스 주체는 Auto Loader를 사용하여 데이터를 수집할 수 있습니다.
일반적으로 데이터는 다른 시스템의 쓰기로 인해 외부 위치에 도착합니다. 이 데모에서는 JSON 파일을 외부 위치에 기록하여 데이터 도착을 시뮬레이션할 수 있습니다.
아래 코드를 Notebook 셀에 복사합니다. catalog
의 문자열 값을 CREATE CATALOG
및 USE CATALOG
권한이 있는 카탈로그 이름으로 바꿉니다. external_location
의 문자열 값을 READ FILES
, WRITE FILES
및 CREATE EXTERNAL TABLE
권한이 있는 외부 위치의 경로로 바꿉니다.
외부 위치를 전체 스토리지 컨테이너로 정의할 수 있지만 종종 컨테이너에 중첩된 디렉터리를 가리킵니다.
외부 위치 경로의 올바른 형식은 "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
입니다.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
이 셀을 실행하려면 12바이트를 읽는 줄을 인쇄하고 문자열 "Hello world!"를 인쇄한 다음 제공된 카탈로그에 있는 모든 데이터베이스를 표시해야 합니다. 이 셀을 실행할 수 없으면 Unity 카탈로그 지원 작업 영역에 있는지 확인하고 이 자습서를 완료하기 위해 필요한 적절한 권한을 작업 영역 관리자에게 요청하세요.
아래 Python 코드는 이메일 주소를 사용해서 제공된 카탈로그에 고유한 데이터베이스를 만들고 제공된 외부 위치에 고유한 스토리지 위치를 만듭니다. 이 셀을 실행하면 이 자습서와 연관된 모든 데이터가 제거되어 이 예제를 멱등적으로 실행할 수 있습니다. 연결된 시스템에서 소스 외부 위치로 도착하는 데이터 배치를 시뮬레이션하는 데 사용할 클래스가 정의되고 인스턴스화됩니다.
이 코드를 Notebook의 새 셀에 복사하고 이를 실행해서 환경을 구성합니다.
참고 항목
이 코드에 정의된 변수를 사용하면 기존 작업 영역 자산 또는 다른 사용자와의 충돌 위험 없이 안전하게 실행할 수 있습니다. 제한된 네트워크 또는 스토리지 권한은 이 코드를 실행할 때 오류를 발생시킵니다. 이러한 제한 사항을 해결하려면 작업 영역 관리자에게 문의하세요.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
이제 다음 코드를 셀에 복사하여 실행함으로써 일괄 데이터를 수집할 수 있습니다. 이 셀을 수동으로 최대 60회까지 실행하여 새 데이터 도착을 트리거할 수 있습니다.
RawData.land_batch()
4단계: Unity 카탈로그로 데이터를 수집하도록 자동 로더 구성
Databricks에서는 Delta Lake를 사용하여 데이터를 저장하는 것이 좋습니다. Delta Lake는 ACID 트랜잭션을 제공하고 데이터 레이크하우스를 가능하게 하는 오픈 소스 스토리지 레이어입니다. Delta Lake는 Databricks에서 만든 테이블의 기본 형식입니다.
Unity 카탈로그 테이블에 데이터를 수집하도록 자동 로더를 구성하려면 다음 코드를 복사하여 Notebook의 빈 셀에 붙여넣습니다.
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
자동 로더에 대한 자세한 내용은 자동 로더란?을 참조하세요.
Unity 카탈로그를 사용하는 구조적 스트리밍에 대한 자세한 내용은 구조적 스트리밍에 Unity 카탈로그 사용을 참조하세요.
5단계: 데이터 처리 및 상호 작용
Notebook은 셀 단위로 논리 셀을 실행합니다. 다음 단계에 따라 셀에서 논리를 실행합니다.
이전 단계에서 완료한 셀을 실행하려면 셀을 선택하고 Shift+Enter를 누릅니다.
방금 만든 테이블을 쿼리하려면 다음 코드를 복사하여 빈 셀에 붙여넣은 다음, Shift+Enter를 눌러 셀을 실행합니다.
df = spark.read.table(table)
DataFrame에서 데이터를 미리 보려면 다음 코드를 복사하여 빈 셀에 붙여넣은 다음, Shift+Enter를 눌러 셀을 실행합니다.
display(df)
데이터 시각화를 위한 대화형 옵션에 대한 자세한 내용은 Databricks Notebook의 시각화를 참조하세요.
6단계: 작업 예약
Databricks 작업에 추가하여 Databricks Notebook을 프로덕션 스크립트로 실행할 수 있습니다. 이 단계에서는 수동으로 트리거할 수 있는 새 작업을 만듭니다.
Notebook을 작업으로 예약하려면 다음을 수행합니다.
- 머리글 표시줄 오른쪽에서 예약을 클릭합니다.
- 작업 이름에 고유한 이름을 입력합니다.
- 수동을 클릭합니다.
- 클러스터 드롭다운에서 1단계에서 만든 클러스터를 선택합니다.
- 만들기를 클릭합니다.
- 표시된 창에서 지금 실행을 클릭합니다.
- 작업 실행 결과를 보려면 마지막 실행 타임스탬프 옆에 있는 아이콘을 클릭합니다.
작업에 대한 자세한 내용은 Databricks 작업이란?을 참조하세요.
7단계: Databricks SQL에서 테이블 쿼리
현재 카탈로그에 대한 USE CATALOG
권한, 현재 스키마에 대한 USE SCHEMA
권한 및 테이블에 대한 SELECT
권한이 있는 사용자는 누구나 기본 설정하는 Databricks API에서 테이블의 콘텐츠를 쿼리할 수 있습니다.
Databricks SQL에서 쿼리를 실행하려면 실행 중인 SQL 웨어하우스에 액세스할 수 있어야 합니다.
이 자습서의 앞부분에서 만든 테이블의 이름은 target_table
입니다. 첫 번째 셀에서 제공한 카탈로그와 e2e_lakehouse_<your-username>
패턴의 데이터베이스를 사용하여 쿼리할 수 있습니다. 카탈로그 탐색기를 사용하여 생성된 데이터 개체를 찾을 수 있습니다.
추가 통합
Azure Databricks를 사용한 데이터 엔지니어링을 위한 통합 및 도구에 대해 자세히 알아봅니다.