자습서: 첫 번째 Delta Live Tables 파이프라인 실행
이 자습서에서는 첫 번째 Delta Live Tables 파이프라인을 구성하고, 기본 ETL 코드를 작성하고, 파이프라인 update실행하는 단계를 안내합니다.
이 자습서의 모든 단계는 Unity Catalog 사용하도록 설정된 작업 영역을 위해 설계되었습니다. 또한 레거시 Hive 메타스토어와 작동하도록 Delta Live Tables 파이프라인을 구성할 수 있습니다. 델타 라이브 Tables 파이프라인을 레거시 Hive 메타스토어와 함께 사용하십시오.
참고 항목
이 자습서에는 Databricks Notebook을 사용하여 새 파이프라인 코드를 개발하고 유효성을 검사하기 위한 지침이 있습니다. Python 또는 SQL 파일의 소스 코드를 사용하여 파이프라인을 구성할 수도 있습니다.
Delta Live Tables 구문을 사용하여 작성된 소스 코드가 이미 있는 경우 코드를 실행하도록 파이프라인을 구성할 수 있습니다. Delta Live Tables 파이프라인구성을 참조하세요.
Databricks SQL에서 완전히 선언적 SQL 구문을 사용하여 구체화된 views 및 스트리밍 tables을 Unity Catalog에서 관리되는 개체로 등록하고, 그에 대한 일정을 setrefresh할 수 있습니다.
예: 뉴욕 아기 이름 데이터 수집 및 처리
이 문서의 예제에서는 뉴욕 주 아기 이름의 레코드를 포함하는 공개적으로 사용할 수 있는 데이터 세트를 사용합니다. 이 예제에서는 Delta Live Tables 파이프라인을 사용하여 다음을 수행합니다.
- table으로 볼륨의 원시 CSV 데이터를 읽습니다.
- 수집 table 레코드를 읽고 Delta Live Tables기대치를 사용하여 정리된 데이터를 포함하는 새로운 table를 만듭니다.
- 파생 데이터 세트를 만드는 Delta Live Tables 쿼리에 대한 입력으로 정리된 레코드를 사용합니다.
이 코드는 Medallion 아키텍처의 간소화된 예제를 보여 줍니다. Medallion 레이크하우스 아키텍처란?을 참조하세요.
이 예제의 구현은 Python 및 SQL에 대해 제공됩니다. 단계에 따라 새 파이프라인 및 Notebook을 만든 다음 제공된 코드를 복사하여 붙여넣습니다.
전체 코드가 있는 예제 Notebook도 제공됩니다.
요구 사항
파이프라인을 시작하려면 클러스터 만들기 권한 또는 Delta Live Tables 클러스터를 정의하는 클러스터 정책에 대한 액세스 권한이 있어야 합니다. Delta Live Tables 런타임은 파이프라인을 실행하기 전에 클러스터를 만들고 올바른 권한이 없으면 실패합니다.
모든 사용자는 기본적으로 서버리스 파이프라인을 사용하여 업데이트를 트리거할 수 있습니다. 서버리스는 계정 수준에서 사용하도록 설정해야 하며 작업 영역 지역에서 사용할 수 없을 수도 있습니다. 서버리스 컴퓨팅 사용을 참조하세요.
이 자습서의 예제에서는 Unity Catalog사용합니다. Databricks는 여러 데이터베이스 개체가 대상 schema에 생성되므로, 이 자습서를 실행하기 위해 새 schema을 만드는 것을 권장합니다.
-
catalog에서 새 schema를 만들려면
ALL PRIVILEGES
또는USE CATALOG
그리고CREATE SCHEMA
권한이 있어야 합니다. - 새 schema만들 수 없는 경우 기존 schema대해 이 자습서를 실행합니다. 다음과 같은 권한이 있어야 합니다.
- 부모 catalog에 대한
USE CATALOG
. - 대상 schema에 대한
ALL PRIVILEGES
,USE SCHEMA
,CREATE MATERIALIZED VIEW
및CREATE TABLE
권한.
- 부모 catalog에 대한
- 이 자습서에서는 볼륨을 사용하여 샘플 데이터를 저장합니다. Databricks는 이 자습서에 대한 새 볼륨을 만드는 것이 좋습니다. 이 자습서에 대한 새 schema을 생성하면 해당 schema에서 새 볼륨을 만들 수 있습니다.
- 기존 schema새 볼륨을 만들려면 다음 권한이 있어야 합니다.
- 부모 catalog에 대한
USE CATALOG
. - 대상 schema에 대한
ALL PRIVILEGES
또는USE SCHEMA
그리고CREATE VOLUME
권한.
- 부모 catalog에 대한
- 필요에 따라 기존 볼륨을 사용할 수 있습니다. 다음과 같은 권한이 있어야 합니다.
- 부모 catalog에 대한
USE CATALOG
. - 부모 schema에 대한
USE SCHEMA
. -
ALL PRIVILEGES
READ VOLUME
또는WRITE VOLUME
대상 볼륨에 있습니다.
- 부모 catalog에 대한
- 기존 schema새 볼륨을 만들려면 다음 권한이 있어야 합니다.
이러한 권한을 set 하려면 Databricks 관리자에게 문의하세요. Unity Catalog 권한에 대한 자세한 내용은 Unity Catalog 권한 및 보안 개체참조하세요.
-
catalog에서 새 schema를 만들려면
0단계: 데이터 다운로드
다음은 Unity Catalog 볼륨에서 데이터를 로드하는 예제입니다. 다음 코드는 CSV 파일을 다운로드하여 지정된 볼륨에 저장합니다. 새 Notebook을 열고 다음 코드를 실행하여 이 데이터를 지정된 볼륨에 다운로드합니다.
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
<catalog-name>
, <schema-name>
및 <volume-name>
를 Unity Catalog 볼륨의 catalog, schema및 볼륨 이름으로 변경합니다. 제공된 코드는 이러한 개체가 없는 경우 지정된 schema 및 볼륨을 만들려고 시도합니다. Unity Catalog개체를 만들고 쓸 수 있는 적절한 권한을 가지고 있어야 합니다.
요구 사항을 참조하세요.
참고 항목
자습서를 계속하기 전에 이 Notebook이 성공적으로 실행되었는지 확인합니다. 이 Notebook을 파이프라인의 일부로 구성하지 마세요.
1단계: 파이프라인 만들기
Delta Live Tables은 Delta Live Tables 구문을 사용하여 Notebook 또는 파일에 정의된 종속성(이를 소스 코드이라고 함)을 해결함으로써 파이프라인을 만듭니다. 각 소스 코드 파일에는 하나의 언어만 포함될 수 있지만 파이프라인에 여러 언어별 Notebook 또는 파일을 추가할 수 있습니다.
Important
소스 코드 필드에 자산을 구성하지 마세요. 이 필드를 검은색으로 두면 소스 코드 작성을 위한 Notebook이 만들어지고 구성됩니다.
이 자습서의 지침에서는 서버리스 컴퓨팅 및 Unity Catalog사용합니다. 이러한 지침에 지정되지 않은 모든 구성 옵션에 대한 기본 설정을 사용합니다.
참고 항목
작업 영역에서 서버리스를 사용하도록 설정하거나 지원하지 않는 경우 기본 컴퓨팅 설정을 사용하여 작성된 자습서를 완료할 수 있습니다.
새 파이프라인을 구성하려면 다음을 수행합니다.
- 사이드바에서 Delta Live Tables를 클릭합니다.
- 파이프라인만들기
클릭합니다. - 파이프라인 이름: 고유한 파이프라인 이름을 입력합니다.
- Select 서버리스 확인란.
- Unity Catalog 위치 wheretables가 구성되도록 대상에서 게시되며, Catalog 및 Schemaselect가 포함됩니다.
-
고급에서 구성 추가를 클릭한 다음, 다음 매개 변수 이름을 사용하여 데이터를 다운로드한 catalog, schema및 볼륨에 대해 파이프라인 parameters를 정의합니다.
my_catalog
my_schema
my_volume
- 만들기를 클릭합니다.
새 파이프라인에 대한 파이프라인 UI가 나타납니다. 소스 코드 Notebook은 파이프라인에 대해 자동으로 만들어지고 구성됩니다.
Notebook은 사용자 디렉터리의 새 디렉터리에 만들어집니다. 새 디렉터리 및 파일의 이름이 파이프라인의 이름과 일치합니다. 예들 들어 /Users/your.username@databricks.com/my_pipeline/my_pipeline
입니다.
이 Notebook에 액세스할 수 있는 링크는 파이프라인 세부 정보 패널의 소스 코드 필드 아래에 있습니다. 다음 단계로 진행하기 전에 링크를 클릭하여 전자 필기장을 엽니다.
2단계: Python 또는 SQL을 사용하여 노트북 환경에서 구현된 views 및 스트리밍 tables 선언
Datbricks Notebook을 사용하여 Delta Live Tables 파이프라인에 대한 소스 코드를 대화형으로 개발하고 유효성을 검사할 수 있습니다. 이 기능을 사용하려면 Notebook을 파이프라인에 연결해야 합니다. 방금 만든 파이프라인에 새로 만든 Notebook을 연결하려면 다음을 수행합니다.
- 오른쪽 위에서 연결을 클릭하여 컴퓨팅 구성 메뉴를 엽니다.
- 1단계에서 만든 파이프라인의 이름을 마우스로 가리킵니다.
- 연결을 클릭합니다.
오른쪽 위에 유효성 검사 및 시작 단추를 포함하도록 UI가 변경됩니다. 파이프라인 코드 개발을 위한 노트북 지원에 대한 자세한 내용은 노트북에서 Delta Live Tables 파이프라인을 개발 및 디버그하기를 참조하세요.
Important
- Delta Live Tables 파이프라인은 계획 단계에서 노트북의 모든 셀을 평가합니다. 다목적 컴퓨팅에 대해 실행되거나 작업으로 예약된 Notebook과 달리 파이프라인은 셀이 지정된 순서로 실행되도록 보장하지 않습니다.
- Notebook은 단일 프로그래밍 언어만 포함할 수 있습니다. 파이프라인 소스 코드 Notebook에서 Python 및 SQL 코드를 혼합하지 마세요.
Python 또는 SQL을 사용하여 코드를 개발하는 방법에 대한 자세한 내용은 Python을 사용하여 파이프라인 코드 개발 또는 SQL을 사용하여 파이프라인 코드 개발을 참조하세요.
예제 파이프라인 코드
이 자습서의 예제를 구현하려면 다음 코드를 복사하여 파이프라인의 소스 코드로 구성된 Notebook의 셀에 붙여넣습니다.
제공된 코드는 다음을 수행합니다.
- 필요한 모듈을 가져옵니다(Python에만 해당).
- 파이프라인 구성 중에 정의된 참조 parameters.
- 볼륨에서 데이터를 수집하는 스트리밍 table을(를)
baby_names_raw
으로 정의합니다. - 수집된 데이터의 유효성을 검사하는 구체화된 뷰를
baby_names_prepared
정의합니다. - 데이터의 고도로 구체화된 뷰가 있는 명명
top_baby_names_2021
된 구체화된 뷰를 정의합니다.
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
3단계: 파이프라인 update 시작
Notebook UI의 오른쪽 위에 있는 시작 단추를 클릭하여 파이프라인 update을 시작합니다.
예제 Notebook
다음 Notebook에는 이 문서에 제공된 것과 동일한 코드 예제가 포함되어 있습니다. 이러한 Notebook은 이 문서의 단계와 동일한 요구 사항을 갖습니다. 요구 사항을 참조하세요.
Notebook을 가져오려면 다음 단계를 완료합니다.
- Notebook UI를 엽니다.
- + 새> 클릭합니다.
- 빈 전자 필기장이 열립니다.
- File>Import를 차례로 클릭합니다. 가져오기 대화 상자가 나타납니다.
에서 가져오기에 대한 URL 옵션.- Notebook의 URL을 붙여넣습니다.
- 가져오기를 클릭합니다.
이 자습서에서는 Delta Live Tables 파이프라인을 구성하고 실행하기 전에 데이터 설정 Notebook을 실행해야 합니다. 다음 Notebook을 가져오고, 컴퓨팅 리소스에 Notebook을 연결하고, 필요한 변수를 my_catalog
my_schema
입력하고my_volume
, 모두 실행을 클릭합니다.
파이프라인용 데이터 다운로드 자습서
다음 Notebook은 Python 또는 SQL의 예제를 제공합니다. 전자 필기장을 가져오면 사용자 홈 디렉터리에 저장됩니다.
아래 Notebook 중 하나를 가져온 후 파이프라인을 만드는 단계를 완료하지만 소스 코드 파일 선택기를 사용하여 다운로드한 Notebook을 select. 소스 코드로 구성된 노트북을 사용하여 파이프라인을 만든 후, 파이프라인 UI에서 시작을 클릭하여 update를 트리거합니다.