다음을 통해 공유


반구조화된 변형 형식으로 데이터 수집

Important

이 기능은 공개 미리 보기 상태입니다.

Databricks Runtime 15.3 이상에서는 이 형식을 VARIANT 사용하여 반구조화된 데이터를 수집할 수 있습니다. 이 문서에서는 동작을 설명하고 자동 로더를 사용하여 클라우드 개체 스토리지에서 데이터를 수집하기 위한 예제 패턴, COPY INTOKafka에서 레코드 스트리밍, 변형 데이터로 새 테이블을 만들거나 변형 형식을 사용하여 새 레코드를 삽입하는 SQL 명령을 제공합니다.

쿼리 변형 데이터를 참조하세요.

variant 열이 있는 테이블 만들기

VARIANT 는 Databricks Runtime 15.3 이상의 표준 SQL 형식이며 Delta Lake에서 지원하는 테이블에서 지원됩니다. Azure Databricks의 관리되는 테이블은 기본적으로 Delta Lake를 사용하므로 다음 구문을 사용하여 단일 VARIANT 열이 있는 빈 테이블을 만들 수 있습니다.

CREATE TABLE table_name (variant_column VARIANT)

또는 JSON 문자열의 PARSE_JSON 함수를 사용하여 CTAS 문을 사용하여 변형 열이 있는 테이블을 만들 수 있습니다. 다음 예제에서는 두 개의 열이 있는 테이블을 만듭니다.

  • id 형식으로 JSON 문자열에서 추출된 열입니다STRING.
  • 열에는 variant_column 형식으로 VARIANT 인코딩된 전체 JSON 문자열이 포함됩니다.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

참고 항목

VARIANT 열은 클러스터링 키, 파티션 또는 Z 순서 키에 사용할 수 없습니다. 형식으로 VARIANT 저장된 데이터는 비교 및 순서 지정에 사용할 수 없습니다.

Databricks는 쿼리를 가속화하고 스토리지 레이아웃을 최적화하는 데 사용할 비 변형 열로 필드를 추출하고 저장하는 것이 좋습니다.

를 사용하여 데이터 삽입 parse_json

대상 테이블에 이미 인코딩된 VARIANT열이 있는 경우 다음 예제와 같이 VARIANTJSON 문자열 레코드를 삽입하는 데 사용할 parse_json 수 있습니다.

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

클라우드 개체 스토리지에서 데이터를 변형으로 수집

Databricks Runtime 15.3 이상에서는 자동 로더를 사용하여 JSON 원본의 모든 데이터를 대상 테이블의 단일 VARIANT 열로 로드할 수 있습니다. VARIANT 스키마 및 형식 변경에 유연하고 데이터 원본에 있는 대/소문자 민감도 및 NULL 값을 유지 관리하므로 이 패턴은 다음과 같은 주의 사항이 있는 대부분의 수집 시나리오에 강력합니다.

  • 형식이 잘못된 JSON 레코드는 형식을 사용하여 VARIANT 인코딩할 수 없습니다.
  • VARIANT type은 최대 16mb 크기의 레코드만 보유할 수 있습니다.

참고 항목

Variant는 손상된 레코드와 비슷하게 지나치게 큰 레코드 레코드를 처리합니다. 기본 PERMISSIVE 처리 모드에서는 형식이 잘못된 JSON 레코드와 함께 열에 _malformed_data 지나치게 큰 레코드가 캡처됩니다.

JSON 원본의 모든 데이터는 단일 VARIANT 열로 기록되므로 수집 중에 스키마가 진화하지 않으며 rescuedDataColumn 지원되지 않습니다. 다음 예제에서는 대상 테이블이 이미 단일 VARIANT 열에 있다고 가정합니다.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

스키마를 정의하거나 전달할 schemaHints때 지정할 VARIANT 수도 있습니다. 참조된 원본 필드의 데이터에는 유효한 JSON 문자열이 포함되어야 합니다. 다음 예제에서는 이 구문을 보여 줍니다.

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

변형과 함께 사용 COPY INTO

Databricks는 사용 가능한 경우 자동 로더를 COPY INTO 사용하는 것이 좋습니다.

COPY INTO 에서는 JSON 데이터 원본의 전체 콘텐츠를 단일 열로 수집할 수 있습니다. 다음 예제에서는 단일 VARIANT 열이 있는 새 테이블을 만든 다음 JSON 파일 원본에서 레코드를 수집하는 데 사용합니다 COPY INTO .

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

대상 테이블의 필드를 다음과 같이 VARIANT정의할 수도 있습니다. 실행 COPY INTO하면 다음 예제와 같이 데이터 원본의 해당 필드가 수집되어 형식으로 VARIANT 캐스팅됩니다.

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Kafka 데이터를 변형으로 스트리밍

많은 Kafka 스트림은 JSON을 사용하여 페이로드를 인코딩합니다. Kafka 스트림을 사용하여 VARIANT 수집하면 이러한 워크로드를 스키마 변경에 강력하게 적용할 수 있습니다.

다음 예제에서는 Kafka 스트리밍 원본을 읽고, as 및 as를 STRING 캐스팅 key 하고 value VARIANT, 대상 테이블에 쓰는 방법을 보여 줍니다.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)