次の方法で共有


半構造化バリアント型としてデータを取り込む

重要

この機能はパブリック プレビュー段階にあります。

Databricks Runtime 15.3 以降では、VARIANT 型を使用して半構造化データを取り込むことができます。 この記事では、動作について説明し、自動ローダーと COPY INTO を使用してクラウド オブジェクト ストレージからデータを取り込み、Kafka からレコードをストリーミングするためのパターン例を示し、バリアント データを含む新しいテーブルを作成し、バリアント型を使用して新しいレコードを挿入するための SQL コマンドを紹介します。

バリアント データにクエリを実行する」を参照してください。

バリアント列を持つテーブルを作成する

VARIANT は Databricks Runtime 15.3 以降の標準 SQL 型であり、Delta Lake によってバックアップされるテーブルでサポートされています。 Azure Databricks のマネージド テーブルでは、既定で Delta Lake が使用されるため、次の構文を使用して 1 つの VARIANT 列を含む空のテーブルを作成できます。

CREATE TABLE table_name (variant_column VARIANT)

または、JSON 文字列で PARSE_JSON 関数を使用し、CTAS ステートメントを使用してバリアント列を含むテーブルを作成することもできます。 次の例では、2 つの列を含むテーブルを作成します。

  • JSON 文字列から id 型として抽出された STRING 列。
  • variant_column 列には、VARIANT 型としてエンコードされた JSON 文字列全体が含まれます。
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Note

Databricks では、フィールドを、クエリの高速化とストレージ レイアウトの最適化のために使用しようとしている非バリアント列として抽出および保存することをお勧めしています。

VARIANT 列は、クラスタリング キー、パーティション、または Z オーダー キーには使用できません。 VARIANT データ型は、比較、グループ化、順序付け、集合の操作には使用できません。 制限事項の詳細な一覧については、「制限事項」を参照してください。

parse_json を使用してデータを挿入する

既にターゲット テーブルに VARIANT としてエンコードされた列が含まれている場合は、次の例のように、parse_json を使用して JSON 文字列レコードを VARIANT として挿入できます。

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

クラウド オブジェクト ストレージからデータをバリアントとして取り込む

Databricks Runtime 15.3 以降では、自動ローダーを使用して、JSON ソースからすべてのデータを 1 つの VARIANT 列としてターゲット テーブルに読み込むことができます。 VARIANT は、スキーマと型の変更に対して柔軟であり、データ ソースに存在する大文字と小文字の区別と NULL の値を維持するので、このパターンはほとんどのインジェスト シナリオで信頼性が高くなります。ただし、次の注意事項があります。

  • 形式に誤りがある JSON レコードは、VARIANT 型を使用してエンコードできません。
  • VARIANT 型に保持できるレコードのサイズは 16 MB までです。

Note

バリアントでは、過度に大きなレコードは破損したレコードと同様に処理されます。 既定の PERMISSIVE 処理モードでは、過度に大きなレコードは、形式に誤りがある JSON レコードと共に _malformed_data 列にキャプチャされます。

JSON ソースからのすべてのデータは 1 つの VARIANT 列として記録されるため、インジェスト中にスキーマ進化は発生しません。また、rescuedDataColumn はサポートされません。 次の例では、1 つの VARIANT 列を含むターゲット テーブルが既に存在することを想定しています。

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

また、スキーマを定義、または VARIANT を渡すときに schemaHints を指定することもできます。 参照先のソース フィールドのデータには、有効な JSON 文字列が含まれている必要があります。 この構文の例を次に示します。

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

バリアントで COPY INTO を使用する

Databricks では、使用可能であれば COPY INTO に対して自動ローダーを使用することをお勧めしています。

COPY INTO では、JSON データ ソースの内容全体を 1 つの列として取り込むことがサポートされています。 次の例では、1 つの VARIANT 列を持つ新しいテーブルを作成し、COPY INTO を使用して JSON ファイル ソースからレコードを取り込みます。

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

ターゲット テーブル内の任意のフィールドを VARIANT として定義することもできます。 COPY INTO を実行すると、次の例のように、データ ソース内の対応するフィールドが取り込まれ、VARIANT 型にキャストされます。

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Kafka データをバリアントとしてストリームする

多くの Kafka ストリームでは、JSON を使用してペイロードがエンコードされます。 VARIANT を使用して Kafka ストリームを取り込むと、スキーマが変更されてもこれらのワークロードの信頼性は高くなります。

次の例では、Kafka ストリーミング ソースを読み取り、keySTRING として、さらに valueVARIANT としてキャストし、ターゲット テーブルに書き出す方法を示しています。

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)