다음을 통해 공유


Delta Live Tables SQL 언어 참조

이 문서에서는 Delta Live Tables SQL 프로그래밍 인터페이스에 대한 세부 정보를 설명합니다.

SQL 쿼리에서 Python UDF(사용자 정의 함수)를 사용할 수 있지만 SQL 원본 파일에서 호출하기 전에 Python 파일에서 이러한 UDF를 정의해야 합니다. 사용자 정의 스칼라 함수 - Python을 참조하세요.

제한 사항

PIVOT 절은 지원되지 않습니다. Spark에서 pivot 작업을 수행하려면 출력 스키마를 계산하기 위해 입력 데이터를 즉시 로드해야 합니다. 이 기능은 Delta Live Tables에서 지원되지 않습니다.

Delta Live Tables 구체화된 뷰 또는 스트리밍 테이블 만들기

참고 항목

  • 구체화된 뷰를 만드는 CREATE OR REFRESH LIVE TABLE 구문은 더 이상 사용되지 않습니다. 대신 CREATE OR REFRESH MATERIALIZED VIEW을 사용합니다.
  • CLUSTER BY 절을 사용하여 리퀴드 클러스터링을 사용하도록 설정하려면 미리 보기 채널을 사용하도록 파이프라인을 구성해야 합니다.

스트리밍 테이블 또는 구체화된 뷰를 선언할 때도 동일한 기본 SQL 구문을 사용합니다.

SQL을 사용하여 델타 라이브 테이블 구체화된 뷰 선언

다음은 SQL을 사용하여 Delta Live Tables에서 구체화된 뷰를 선언하는 구문에 대해 설명합니다.

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

SQL을 사용하여 델타 라이브 테이블 스트리밍 테이블 선언

스트리밍 원본에 대해 읽은 쿼리를 사용해야만 스트리밍 테이블을 선언할 수 있습니다. Databricks에서는 클라우드 개체 스토리지에서 파일의 스트리밍 수집 시 자동 로더를 사용할 것을 권장합니다. 자동 로더 SQL 구문을 참조하세요.

파이프라인의 다른 테이블 또는 보기를 스트리밍 원본으로 지정할 때 데이터 세트 이름 주위에 STREAM() 함수를 포함해야 합니다.

다음은 SQL을 사용하여 Delta Live Tables에서 스트리밍 테이블을 선언하는 구문에 대해 설명합니다.

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Delta Live Tables 보기 만들기

다음은 SQL을 사용하여 보기를 선언하는 구문에 대해 설명합니다.

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

자동 로더 SQL 구문

다음은 SQL에서 자동 로더를 사용하기 위한 구문에 대해 설명합니다.

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

자동 로더에서 지원되는 형식 옵션을 사용할 수 있습니다. 함수를 map() 사용하여 메서드에 read_files() 옵션을 전달할 수 있습니다. 옵션은 키-값 쌍이며, 키와 값은 문자열입니다. 지원 형식 및 옵션에 대한 자세한 내용은 파일 형식 옵션을 참조하세요.

예: 테이블 정의

외부 데이터 원본 또는 파이프라인에 정의된 데이터 세트에서 읽어 데이터 세트를 만들 수 있습니다. 내부 데이터 세트에서 읽으려면 데이터 세트 이름 앞에 LIVE 키보드를 추가합니다. 다음 예제에서는 두 가지 데이터 세트(JSON 파일을 입력 원본으로 사용하는 taxi_raw 테이블과 taxi_raw 테이블을 입력으로 사용하는 filtered_data 테이블)를 정의합니다.

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

예: 스트리밍 원본에서 읽기

스트리밍 원본(예: 자동 로더 또는 내부 데이터 세트)에서 데이터를 읽으려면 테이블을 정의 STREAMING 합니다.

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

스트리밍 데이터에 대한 자세한 내용은 Delta Live Tables를 사용하여 데이터 전환을 참조하세요.

테이블 구체화 방법 제어

테이블은 또한 구체화에 대한 추가 제어를 제공합니다.

  • PARTITIONED BY를 사용하여 테이블을 분할하는 방법을 지정합니다. 분할을 사용하여 쿼리 속도를 높일 수 있습니다.
  • TBLPROPERTIES를 사용하여 테이블 속성을 설정할 수 있습니다. Delta Live Tables 테이블 속성을 참조하세요.
  • LOCATION 설정을 사용하여 스토리지 위치를 설정합니다. 기본적으로 테이블 데이터는 LOCATION이 설정되지 않은 경우 파이프라인 스토리지 위치에 저장됩니다.
  • 스키마 정의에서 생성된 열을 사용할 수 있습니다. 예제: 스키마 및 파티션 열 지정을 참고하세요.

참고 항목

크기가 1TB 미만인 테이블의 경우 Databricks는 Delta Live Tables가 데이터 조직을 제어하도록 하는 것이 좋습니다. 테이블이 테라바이트 이상으로 증가할 것으로 예상하지 않는 한 Databricks는 파티션 열을 지정하지 않는 것이 좋습니다.

예제: 스키마 및 파티션 열 지정을 참고하세요

필요에 따라 테이블을 정의할 때 스키마를 지정할 수 있습니다. 다음 예제에서는 Delta Lake에서 생성된 열 사용 및 테이블에 대한 파티션 열 정의를 포함하여 대상 테이블에 대한 스키마를 지정합니다.

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

기본적으로 Delta Live Tables는 스키마를 지정하지 않으면 table 정의에서 스키마를 유추합니다.

예: 테이블 제약 조건 정의

참고 항목

테이블 제약 조건에 대한 Delta Live Tables 지원은 공개 미리 보기로 제공됩니다. 테이블 제약 조건을 정의하려면 파이프라인이 Unity 카탈로그 사용 파이프라인이어야 하며 preview 채널을 사용하도록 구성되어야 합니다.

스키마를 지정할 때 기본 키와 외래 키를 정의할 수 있습니다. 이러한 제약 조건은 정보 제공용일 뿐이며 적용되지 않습니다. SQL 언어 참조의 제약 조건 절을 참조하세요.

다음 예제에서는 기본 및 외래 키 제약 조건이 있는 테이블을 정의합니다.

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

SQL을 사용하여 테이블 또는 보기를 선언할 때 사용되는 값 매개 변수화

SET를 사용하여 Spark 구성을 포함한 테이블 또는 보기를 선언하는 쿼리에서 구성 값을 지정합니다. SET 문 뒤에 Notebook에 정의한 테이블 또는 보기에서 정의된 값에 액세스할 수 있습니다. SET 문을 사용하여 지정된 모든 Spark 구성은 SET 문 다음에 오는 테이블 또는 보기에 대해 Spark 쿼리를 실행할 때 사용됩니다. 쿼리의 구성 값을 읽으려면 문자열 보간 구문 ${}를 사용합니다. 다음 예제에서는 startDate라는 Spark 구성 값을 설정하고 쿼리에서 해당 값을 사용합니다.

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

여러 구성 값을 지정하려면 각 값에 대해 별도의 SET 문을 사용합니다.

예: 행 필터 및 열 마스크 정의

Important

행 필터 및 열 마스크는 공개 미리 보기로 제공됩니다.

행 필터 및 열 마스크를 사용하여 구체화된 뷰 또는 스트리밍 테이블을 만들려면 ROW FILTER 절과 MASK 절사용합니다. 다음 예제에서는 행 필터와 열 마스크를 모두 사용하여 구체화된 뷰와 스트리밍 테이블을 정의하는 방법을 보여 줍니다.

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze

행 필터 및 열 마스크에 대한 자세한 내용은 행 필터 및 열 마스크가 있는 테이블 게시를 참조하세요.

SQL 속성

참고 항목

CLUSTER BY 절을 사용하여 리퀴드 클러스터링을 사용하도록 설정하려면 미리 보기 채널을 사용하도록 파이프라인을 구성해야 합니다.

CREATE TABLE 또는 VIEW
TEMPORARY

테이블을 만들지만 테이블에 대한 메타데이터는 게시하지 않습니다. TEMPORARY 절은 Delta Live Tables에 파이프라인에서 사용할 수 있지만 파이프라인 외부에서 액세스해서는 안 되는 테이블을 만들도록 지시합니다. 처리 시간을 줄이기 위해 임시 테이블은 단일 업데이트가 아니라 해당 테이블을 만드는 파이프라인의 수명 동안 유지됩니다.
STREAMING

입력 데이터 세트를 스트림으로 읽는 테이블을 만듭니다. 입력 데이터 세트는 스트리밍 데이터 원본(예: 자동 로더 또는 STREAMING 테이블)이어야 합니다.
CLUSTER BY

테이블에서 리퀴드 클러스터링을 사용하도록 설정하고 클러스터링 키로 사용할 열을 정의합니다.

Delta 테이블에 Liquid 클러스터링 사용을 참조하세요.
PARTITIONED BY

테이블 분할에 사용할 하나 이상의 열에 대한 선택적 목록입니다.
LOCATION

테이블 데이터에 대한 선택적 스토리지 위치입니다. 설정하지 않으면 시스템은 기본적으로 파이프라인 스토리지 위치로 설정됩니다.
COMMENT

테이블에 대한 선택적 설명입니다.
column_constraint

열의 선택적 정보 기본 키 또는 외래 키 제약 조건입니다.
MASK clause(공개 미리 보기)

열 마스크 함수를 추가하여 중요한 데이터를 익명화합니다. 해당 열에 대한 이후 쿼리는 열의 원래 값 대신 평가된 함수의 결과를 반환합니다. 이는 함수가 값을 수정할지 여부를 결정하기 위해 사용자의 ID 및 그룹 구성원을 검사할 수 있는 세분화된 액세스 제어 목적에 유용합니다.

열 마스크 절을 참조하세요.
table_constraint

테이블의 선택적 정보 기본 키 또는 외래 키 제약 조건입니다.
TBLPROPERTIES

테이블에 대한 테이블 속성의 선택적 목록입니다.
WITH ROW FILTER clause (공개 미리 보기)

테이블에 행 필터 함수를 추가합니다. 해당 테이블에 대한 이후 쿼리는 함수가 TRUE로 평가되는 행의 하위 집합을 수신합니다. 이는 함수가 값을 수정할지 여부를 결정하기 위해 호출하는 ID 및 그룹 구성원을 검사할 수 있는 세분화된 액세스 제어 목적에 유용합니다.

행 필터 절을 참조하세요.
select_statement

테이블에 대한 데이터 세트를 정의하는 Delta Live Tables 쿼리입니다.
CONSTRAINT 절
EXPECT expectation_name

데이터 품질 제약 조건 expectation_name을 정의합니다. ON VIOLATION 제약 조건이 정의되지 않은 경우 제약 조건을 위반하는 행을 대상 데이터 세트에 추가합니다.
ON VIOLATION

실패한 행에 대해 수행할 선택적 작업:

- FAIL UPDATE: 파이프라인 실행을 즉시 중지합니다.
- DROP ROW: 레코드를 삭제하고 처리를 계속합니다.

Delta Live Tables에서 SQL로 데이터 캡처 변경

다음에 설명된 대로 APPLY CHANGES INTO 문을 사용하여 Delta Live Tables CDC 기능을 사용합니다.

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

APPLY CHANGES 쿼리와 동일한 CONSTRAINT 절을 사용하여 APPLY CHANGES 대상에 대한 데이터 품질 제약 조건을 정의합니다. Delta Live Tables를 사용하여 데이터 품질 관리를 참조하세요.

참고 항목

INSERTUPDATE 이벤트의 기본 동작은 원본의 CDC 이벤트를 upsert하는 것입니다. 즉, 대상 테이블에서 지정된 키와 일치하는 모든 행을 업데이트하거나, 대상 테이블에 일치하는 레코드가 없는 경우 새 행을 삽입합니다. DELETE 이벤트에 대한 처리는 APPLY AS DELETE WHEN 조건을 사용하여 지정할 수 있습니다.

Important

변경 내용을 적용하려면 대상 스트리밍 테이블을 선언해야 합니다. 필요에 따라 대상 테이블에 대한 스키마를 지정할 수 있습니다. APPLY CHANGES 대상 테이블의 스키마를 지정할 때 sequence_by 필드와 데이터 형식이 동일한 __START_AT__END_AT 열도 포함해야 합니다.

APPLY CHANGES API: Delta Live Tables을 사용하여 변경 데이터 캡처 간소화를 참조하세요.

KEYS

원본 데이터의 행을 고유하게 식별하는 열 또는 열의 조합입니다. 대상 테이블에서 특정 레코드에 적용되는 CDC 이벤트를 식별하는 데 사용됩니다.

열 조합을 정의하려면 쉼표로 구분된 열 목록을 사용합니다.

이 절은 필수입니다.
IGNORE NULL UPDATES

대상 열의 하위 집합을 포함하는 업데이트를 수집할 수 있습니다. CDC 이벤트가 기존 행과 일치하고 IGNORE NULL UPDATES가 지정된 경우, null을 포함하는 열은 대상에서 기존 값으로 유지됩니다. null 값을 갖는 중첩된 열에도 적용됩니다.

이 절은 옵션입니다.

기본값은 기존 열을 null 값으로 덮어쓰는 것입니다.
APPLY AS DELETE WHEN

CDC 이벤트를 upsert가 아닌 DELETE로 취급해야 하는 경우를 지정합니다. 순서가 맞지 않는 데이터를 처리하기 위해, 삭제된 행은 기본 Delta 테이블에서 일시적으로 삭제 표식으로 유지되고 메타스토어에서 이러한 삭제 표식이 제외된 뷰가 만들어집니다. 보존 간격은
pipelines.cdc.tombstoneGCThresholdInSeconds 테이블 속성.

이 절은 옵션입니다.
APPLY AS TRUNCATE WHEN

CDC 이벤트가 전체 테이블 TRUNCATE로 처리되어야 하는 경우를 지정합니다. 이 절은 대상 테이블의 전체 자르기를 트리거하므로 이 기능이 필요한 특정 사용 사례에만 사용해야 합니다.

APPLY AS TRUNCATE WHEN 절은 SCD 형식 1에 대해서만 지원됩니다. SCD 형식 2는 자르기 작업을 지원하지 않습니다.

이 절은 옵션입니다.
SEQUENCE BY

원본 데이터에서 CDC 이벤트의 논리적 순서를 지정하는 열 이름입니다. Delta Live Tables는 이 시퀀싱을 사용하여 순서가 맞지 않게 도착한 변경 이벤트를 처리합니다.

지정된 열은 정렬 가능한 데이터 형식이어야 합니다.

이 절은 필수입니다.
COLUMNS

대상 테이블에 포함할 열의 하위 집합을 지정합니다. 구체적으로 다음과 같은 옵션을 선택할 수 있습니다.

- 포함할 열의 전체 목록을 지정합니다: COLUMNS (userId, name, city).
- 제외할 열의 목록을 지정합니다: COLUMNS * EXCEPT (operation, sequenceNum)

이 절은 옵션입니다.

기본값은 COLUMNS 절이 지정되지 않은 경우 대상 테이블에 모든 열을 포함하는 것입니다.
STORED AS

레코드를 SCD 형식 1 또는 SCD 형식 2로 저장할지 여부.

이 절은 옵션입니다.

기본값은 SCD 형식 1입니다.
TRACK HISTORY ON

지정된 열이 변경될 때 기록 레코드를 생성할 출력 열의 하위 집합을 지정합니다. 구체적으로 다음과 같은 옵션을 선택할 수 있습니다.

- 추적할 열의 전체 목록을 지정합니다(COLUMNS (userId, name, city)).
- 추적에서 제외할 열 목록을 지정합니다(COLUMNS * EXCEPT (operation, sequenceNum)).

이 절은 옵션입니다. 기본값은 TRACK HISTORY ON *에 해당하는 변경 내용이 있는 경우 모든 출력 열에 대한 추적 기록입니다.