半構造化バリアント型としてデータを取り込む
重要
この機能はパブリック プレビュー段階にあります。
Databricks Runtime 15.3 以降では、VARIANT
型を使用して半構造化データを取り込むことができます。 この記事では、動作について説明し、自動ローダーと COPY INTO
を使用してクラウド オブジェクト ストレージからデータを取り込み、Kafka からレコードをストリーミングするためのパターン例を示し、バリアント データを含む新しいテーブルを作成し、バリアント型を使用して新しいレコードを挿入するための SQL コマンドを紹介します。
「バリアント データにクエリを実行する」を参照してください。
バリアント列を持つテーブルを作成する
VARIANT
は Databricks Runtime 15.3 以降の標準 SQL 型であり、Delta Lake によってバックアップされるテーブルでサポートされています。 Azure Databricks のマネージド テーブルでは、既定で Delta Lake が使用されるため、次の構文を使用して 1 つの VARIANT
列を含む空のテーブルを作成できます。
CREATE TABLE table_name (variant_column VARIANT)
または、JSON 文字列で PARSE_JSON
関数を使用し、CTAS ステートメントを使用してバリアント列を含むテーブルを作成することもできます。 次の例では、2 つの列を含むテーブルを作成します。
- JSON 文字列から
id
型として抽出されたSTRING
列。 variant_column
列には、VARIANT
型としてエンコードされた JSON 文字列全体が含まれます。
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Note
Databricks では、フィールドを、クエリの高速化とストレージ レイアウトの最適化のために使用しようとしている非バリアント列として抽出および保存することをお勧めしています。
VARIANT
列は、クラスタリング キー、パーティション、または Z オーダー キーには使用できません。 VARIANT
データ型は、比較、グループ化、順序付け、集合の操作には使用できません。 制限事項の詳細な一覧については、「制限事項」を参照してください。
parse_json
を使用してデータを挿入する
既にターゲット テーブルに VARIANT
としてエンコードされた列が含まれている場合は、次の例のように、parse_json
を使用して JSON 文字列レコードを VARIANT
として挿入できます。
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
クラウド オブジェクト ストレージからデータをバリアントとして取り込む
Databricks Runtime 15.3 以降では、自動ローダーを使用して、JSON ソースからすべてのデータを 1 つの VARIANT
列としてターゲット テーブルに読み込むことができます。 VARIANT
は、スキーマと型の変更に対して柔軟であり、データ ソースに存在する大文字と小文字の区別と NULL
の値を維持するので、このパターンはほとんどのインジェスト シナリオで信頼性が高くなります。ただし、次の注意事項があります。
- 形式に誤りがある JSON レコードは、
VARIANT
型を使用してエンコードできません。 VARIANT
型に保持できるレコードのサイズは 16 MB までです。
Note
バリアントでは、過度に大きなレコードは破損したレコードと同様に処理されます。 既定の PERMISSIVE
処理モードでは、過度に大きなレコードは、形式に誤りがある JSON レコードと共に _malformed_data
列にキャプチャされます。
JSON ソースからのすべてのデータは 1 つの VARIANT
列として記録されるため、インジェスト中にスキーマ進化は発生しません。また、rescuedDataColumn
はサポートされません。 次の例では、1 つの VARIANT
列を含むターゲット テーブルが既に存在することを想定しています。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
また、スキーマを定義、または VARIANT
を渡すときに schemaHints
を指定することもできます。 参照先のソース フィールドのデータには、有効な JSON 文字列が含まれている必要があります。 この構文の例を次に示します。
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
バリアントで COPY INTO
を使用する
Databricks では、使用可能であれば COPY INTO
に対して自動ローダーを使用することをお勧めしています。
COPY INTO
では、JSON データ ソースの内容全体を 1 つの列として取り込むことがサポートされています。 次の例では、1 つの VARIANT
列を持つ新しいテーブルを作成し、COPY INTO
を使用して JSON ファイル ソースからレコードを取り込みます。
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
ターゲット テーブル内の任意のフィールドを VARIANT
として定義することもできます。 COPY INTO
を実行すると、次の例のように、データ ソース内の対応するフィールドが取り込まれ、VARIANT
型にキャストされます。
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Kafka データをバリアントとしてストリームする
多くの Kafka ストリームでは、JSON を使用してペイロードがエンコードされます。 VARIANT
を使用して Kafka ストリームを取り込むと、スキーマが変更されてもこれらのワークロードの信頼性は高くなります。
次の例では、Kafka ストリーミング ソースを読み取り、key
を STRING
として、さらに value
を VARIANT
としてキャストし、ターゲット テーブルに書き出す方法を示しています。
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)