DLT를 사용하여 데이터 로드
DLT를 사용하여 Azure Databricks의 Apache Spark에서 지원하는 모든 데이터 원본에서 데이터를 로드할 수 있습니다. 스트리밍 데이터 프레임 및 Spark DataFrames용 Pandas를 포함하여 Spark DataFrame을 반환하는 모든 쿼리에 대해 DLT에서 데이터 세트(테이블 및 뷰)를 정의할 수 있습니다. 데이터 수집 작업의 경우 Databricks는 대부분의 사용 사례에 스트리밍 테이블을 사용하는 것이 좋습니다. 스트리밍 테이블은 자동 로더를 사용하거나 Kafka와 같은 메시지 버스에서 클라우드 개체 스토리지에서 데이터를 수집하는 데 적합합니다. 아래 예제에서는 몇 가지 일반적인 패턴을 보여 줍니다.
중요하다
모든 데이터 원본에 SQL이 지원되는 것은 아닙니다. DLT 파이프라인에서 SQL과 Python Notebook을 혼합하여 수집 이외의 모든 작업에 SQL을 사용할 수 있습니다.
기본적으로 DLT에서 패키지되지 않은 라이브러리 작업에 대한 자세한 내용은 DLT 파이프라인에 대한 Python 종속성 관리참조하세요.
클라우드 개체 스토리지에서 파일 로드
Databricks는 클라우드 개체 스토리지에서 대부분의 데이터 수집 작업에 DLT와 함께 자동 로더를 사용하는 것이 좋습니다. 자동 로더 및 DLT는 클라우드 스토리지에 도착하는 지속적으로 증가하는 데이터를 점진적이고 멱등하게 로드하도록 설계되었습니다. 다음 예제에서는 자동 로더를 사용하여 CSV 및 JSON 파일에서 데이터 세트를 만듭니다.
메모
Unity 카탈로그 사용 파이프라인에서 자동 로더를 사용하여 파일을 로드하려면 외부 위치를 사용해야 합니다. DLT에서 Unity 카탈로그를 사용하는 방법에 대한 자세한 내용은 DLT 파이프라인Unity 카탈로그 사용을 참조하세요.
파이썬
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
경고
파일 알림과 함께 자동 로더를 사용하고 파이프라인 또는 스트리밍 테이블에 대해 전체 새로 고침을 실행하는 경우 리소스를 수동으로 정리해야 합니다. Notebook에서 CloudFilesResourceManager 사용하여 정리를 수행할 수 있습니다.
메시지 버스에서 데이터 로드
스트리밍 테이블을 사용하여 메시지 버스에서 데이터를 수집하도록 DLT 파이프라인을 구성할 수 있습니다. Databricks는 메시지 버스에서의 낮은 지연 로드에 가장 효율적인 수집을 제공하기 위해 스트리밍 테이블을 연속 실행 및 향상된 자동 크기 조정과 결합할 것을 권장합니다. 향상된 자동 크기 조정 사용하여 DLT 파이프라인의 클러스터 사용률 최적화참조하세요.
예를 들어 다음 코드는 Kafka에서 데이터를 수집하도록 스트리밍 테이블을 구성합니다.
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
다음 예제와 같이 순수 SQL에서 다운스트림 작업을 작성하여 이 데이터에 대한 스트리밍 변환을 수행할 수 있습니다.
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(kafka_raw)
WHERE ...
Event Hubs를 활용하는 예에 대해서는 Azure Event Hubs를 DLT 데이터 원본으로 사용하는 방법을 참조하세요.
외부 시스템에서 데이터 로드
DLT는 Azure Databricks에서 지원하는 모든 데이터 원본에서 데이터 로드를 지원합니다. 데이터 원본에 연결하기를참조하세요. Lakehouse Federation을 사용하여 지원되는 데이터 원본 의 외부 데이터를 로드할 수도 있습니다. Lakehouse Federation에는 Databricks Runtime 13.3 LTS 이상이 필요하므로 Lakehouse Federation을 사용하려면 파이프라인이 미리 보기 채널사용하도록 구성되어야 합니다.
일부 데이터 원본은 SQL에서 동등한 지원을 제공하지 않습니다. 이러한 데이터 원본 중 하나에서 Lakehouse Federation을 사용할 수 없는 경우 Python Notebook을 사용하여 원본에서 데이터를 수집할 수 있습니다. Python 및 SQL 소스 코드를 동일한 DLT 파이프라인에 추가할 수 있습니다. 다음 예제에서는 구체화된 뷰를 선언하여 원격 PostgreSQL 테이블의 현재 데이터 상태에 액세스합니다.
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
클라우드 개체 스토리지에서 작은 데이터 세트 또는 정적 데이터 세트 로드
Apache Spark 로드 구문을 사용하여 작거나 정적 데이터 세트를 로드할 수 있습니다. DLT는 Azure Databricks의 Apache Spark에서 지원하는 모든 파일 형식을 지원합니다. 전체 목록은 데이터 형식 옵션을 참조하세요.
다음 예제에서는 DLT 테이블을 만들기 위해 JSON을 로드하는 방법을 보여 줍니다.
파이썬
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
메모
SELECT * FROM format.`path`;
SQL 구문은 Azure Databricks의 모든 SQL 환경에 공통적으로 적용됩니다. DLT와 함께 SQL을 사용하는 직접 파일 액세스에 권장되는 패턴입니다.
비밀을 사용하여 파이프라인에서 스토리지 자격 증명에 안전하게 액세스하기
Azure Databricks 비밀 사용하여 액세스 키 또는 암호와 같은 자격 증명을 저장할 수 있습니다. 파이프라인에서 비밀을 구성하려면 파이프라인 설정 클러스터 구성에서 Spark 속성을 사용합니다. DLT 파이프라인 대한 컴퓨팅 구성참조하세요.
다음 예제에서는 비밀을 사용하여 자동 로더사용하여 ADLS Gen2(Azure Data Lake Storage Gen2) 스토리지 계정에서 입력 데이터를 읽는 데 필요한 액세스 키를 저장합니다. 이 동일한 방법을 사용하여 파이프라인에 필요한 비밀(예: S3에 액세스하기 위한 AWS 키 또는 Apache Hive 메타스토어에 대한 암호)을 구성할 수 있습니다.
Azure Data Lake Storage Gen2 작업에 대한 자세한 내용은 Connect to Azure Data Lake Storage Gen2 및 Blob Storage참조하세요.
메모
비밀 값을 설정하는 spark_conf
구성 키에 spark.hadoop.
접두사를 추가해야 합니다.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
교체하다
-
<storage-account-name>
을 ADLS Gen2 스토리지 계정 이름으로 지정합니다. - Azure Databricks 비밀 범위 이름으로
<scope-name>
. - Azure Storage 계정 액세스 키를 포함하는 키의 이름이
<secret-name>
입니다.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
대체하다
-
<container-name>
을(를) 입력 데이터를 저장하는 Azure Storage 계정 컨테이너의 이름으로 사용하십시오. -
<storage-account-name>
는 ADLS Gen2 스토리지 계정 이름입니다. - 입력 데이터 세트의 경로를
<path-to-input-dataset>
에 지정합니다.
Azure Event Hubs에서 데이터 로드
Azure Event Hubs는 Apache Kafka 호환 인터페이스를 제공하는 데이터 스트리밍 서비스입니다. DLT 런타임에 포함된 구조적 스트리밍 Kafka 커넥터를 사용하여 Azure Event Hubs에서 메시지를 로드할 수 있습니다. Azure Event Hubs에서 메시지를 로드하고 처리하는 방법에 대한 자세한 내용은 Azure Event Hubs를 DLT 데이터 원본사용하는 방법을 참조하세요.