다음을 통해 공유


Delta Live Tables 사용하여 데이터 로드

Delta Live Tables사용하여 Azure Databricks의 Apache Spark에서 지원하는 모든 데이터 원본에서 데이터를 로드할 수 있습니다. Delta Live Tables에서 Streaming DataFrame 및 Pandas for Spark DataFrame을 포함한 Spark DataFrame을 반환하는 모든 쿼리에 대해 데이터 세트(tables 및 views)를 정의할 수 있습니다. 데이터 수집 작업의 경우 Databricks는 대부분의 사용 사례에 스트리밍 tables 사용하는 것이 좋습니다. 클라우드 객체 저장소에서 자동 로더를 사용하거나 Kafka와 같은 메시지 버스로부터 데이터를 수집하는 데 스트리밍 tables이 적합합니다. 아래 예는 몇 가지 일반적인 패턴을 보여줍니다.

Important

모든 데이터 원본에 SQL이 지원되는 것은 아닙니다. Delta Live Tables 파이프라인에서 SQL과 Python Notebook을 혼합하여 수집 이외의 모든 작업에 SQL을 사용할 수 있습니다.

기본적으로 Delta Live Tables에 포함되지 않은 라이브러리를 사용하여 작업하는 방법에 대한 자세한 내용은 Delta Live Tables 파이프라인의Python 종속성 관리 항목을 참조하세요.

클라우드 오브젝트 스토리지에서 파일 로드

Databricks는 클라우드 개체 스토리지에서 대부분의 데이터 수집 작업에 델타 라이브 Tables 자동 로더를 사용하는 것이 좋습니다. 자동 로더(Auto Loader)와 Delta Live Tables는 클라우드 스토리지에 데이터가 도착할 때마다 증가하는 데이터를 증분적이고 멱등하게 로드하도록 설계되었습니다. 다음 예제에서는 자동 로더를 사용하여 CSV 및 JSON 파일에서 데이터 세트를 만듭니다.

참고 항목

Unity Catalog 사용하도록 설정된 파이프라인에서 자동 로더를 사용하여 파일을 로드하려면 외부 위치를 사용해야 합니다. Delta Live Tables와 함께 Unity Catalog을 사용하는 방법에 대해 더 알아보려면, Delta Live Tables 파이프라인에 Unity Catalog을 사용하십시오.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

자동 로더란?자동 로더 SQL 구문을 참조하세요.

Warning

자동 로더를 파일 알림과 함께 사용하고 파이프라인 refresh 전체를 실행하거나 스트리밍 table을 실행하는 경우 리소스를 수동으로 정리해야 합니다. Notebook에서 CloudFilesResourceManager 를 사용하여 정리를 수행할 수 있습니다.

메시지 버스에서 데이터 로드

스트리밍 tables사용하여 메시지 버스에서 데이터를 수집하도록 Delta Live Tables 파이프라인을 구성할 수 있습니다. Databricks는 메시지 버스에서 지연 시간이 짧은 로드를 위해 스트리밍 tables을 연속 실행 및 향상된 자동 스케일링과 결합하여 가장 효율적으로 수집할 것을 권장합니다. Optimize Delta Live Tables 파이프라인의 클러스터 사용률을 향상된 자동 크기 조정을 통해 확인하세요.

예를 들어, 다음 코드는 Kafka에서 데이터를 로드하도록 스트리밍 table을 구성합니다.

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

다음 예제와 같이 순수 SQL에서 다운스트림 작업을 작성하여 이 데이터에 대한 스트리밍 변환을 수행할 수 있습니다.

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Event Hubs 사용 예제는 Azure Event Hubs를 Delta Live Tables 데이터 소스로 사용하는 방법입니다.

스트리밍 데이터 원본 구성을 참조하세요.

외부 시스템에서 데이터 로드

Delta Live Tables Azure Databricks에서 지원하는 모든 데이터 원본에서 데이터 로드를 지원합니다. 데이터 원본에 대한 연결을 참조 하세요. 지원되는 데이터 원본에 대해 Lakehouse Federation을 사용하여 외부 데이터를 로드할 수도 있습니다. Lakehouse Federation에는 Databricks Runtime 13.3 LTS 이상이 필요하므로 Lakehouse 페더레이션을 사용하려면 파이프라인이 미리 보기 채널을 사용하도록 구성되어야 합니다.

일부 데이터 원본은 SQL에서 동등한 지원을 제공하지 않습니다. 이러한 데이터 원본 중 하나에서 Lakehouse Federation을 사용할 수 없는 경우 Python Notebook을 사용하여 원본에서 데이터를 수집할 수 있습니다. 동일한 Delta Live Tables 파이프라인에 Python 및 SQL 소스 코드를 추가할 수 있습니다. 다음 예제에서는 원격 PostgreSQL table데이터의 현재 상태에 액세스하기 위해 구체화된 뷰를 선언합니다.

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

클라우드 개체 스토리지에서 소규모 또는 정적 데이터 세트 로드

Apache Spark 로드 구문을 사용하여 작거나 정적 데이터 세트를 로드할 수 있습니다. Delta Live Tables Azure Databricks의 Apache Spark에서 지원하는 모든 파일 형식을 지원합니다. 전체 에 대해서는데이터 형식 옵션을 참조하세요.

다음 예제에서는 JSON을 로드하여 Delta Live Tablestables만드는 방법을 보여 줍니다.

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

참고 항목

SELECT * FROM format.`path`; SQL 구문은 Azure Databricks의 모든 SQL 환경에 공통적으로 적용됩니다. Delta Live TablesSQL을 사용하는 직접 파일 액세스에 권장되는 패턴입니다.

파이프라인에서 비밀을 사용하여 스토리지 credentials 안전하게 액세스

Azure Databricks 비밀 사용하여 액세스 키 또는 암호와 같은 credentials 저장할 수 있습니다. 파이프라인에서 비밀을 구성하려면 파이프라인 설정 클러스터 구성에서 Spark 속성을 사용합니다. Delta Live 파이프라인에 대한 컴퓨팅 구성을 참조하세요.

다음 예제에서는 비밀과 자동 로더를 사용하여 ADLS Gen2(Azure Data Lake Storage Gen2) 스토리지 계정에서 입력 데이터를 읽는 데 필요한 액세스 키를 저장합니다. 이 동일한 방법을 사용하여 파이프라인에 필요한 비밀(예: S3에 액세스하기 위한 AWS 키 또는 Apache Hive 메타스토어에 대한 암호)을 구성할 수 있습니다.

Azure Data Lake Storage Gen2 작업에 대한 자세한 내용은 Azure Data Lake Storage Gen2 및 Blob Storage에 대한 연결을 참조하세요.

참고 항목

비밀 값을 설정하는 spark.hadoop. 구성 키에 spark_conf 접두사를 추가해야 합니다.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name>을 ADLS Gen2 스토리지 계정 이름으로 바꿉니다.
  • <scope-name>을 Azure Databricks 비밀 범위 이름으로 바꿉니다.
  • <secret-name>을 Azure Storage 계정 액세스 키가 포함된 키 이름으로 바꿉니다.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • 입력 데이터를 저장하는 Azure Storage 계정 컨테이너 이름을 사용하는 <container-name>입니다.
  • <storage-account-name>을 ADLS Gen2 스토리지 계정 이름으로 바꿉니다.
  • 입력 데이터 세트의 경로를 사용하는 <path-to-input-dataset>입니다.

Azure Event Hubs에서 데이터 로드

Azure Event Hubs는 Apache Kafka 호환 인터페이스를 제공하는 데이터 스트리밍 서비스입니다. Delta Live Tables 런타임에 포함된 구조적 스트리밍 Kafka 커넥터를 사용하여 Azure Event Hubs에서 메시지를 로드할 수 있습니다. Azure Event Hubs에서 메시지를 로드하고 처리하는 방법에 대한 자세한 내용은 Azure Event Hubs를 Delta Live Tables 데이터 원본으로 사용하는 것에 대해 참조하세요.