Erfassen von Daten als halbstrukturierter Variantentyp
Wichtig
Dieses Feature befindet sich in der Public Preview.
In Databricks Runtime 15.3 und höher können Sie den VARIANT
Typ verwenden, um halbstrukturierte Daten zu erfassen. In diesem Artikel werden das Verhalten beschrieben und Beispielmuster zum Aufnehmen von Daten aus dem Cloudobjektspeicher mithilfe von AutoLadeprogramm und COPY INTO
, Streamingdatensätzen von Kafka und SQL-Befehlen zum Erstellen neuer Tabellen mit Variantendaten oder Einfügen neuer Datensätze mithilfe des Variantentyps bereitgestellt.
Siehe Abfragevariantendaten.
Erstellen einer Tabelle mit einer Variant-Spalte
VARIANT
ist ein standardmäßiger SQL-Typ in Databricks Runtime 15.3 und höher und wird von Tabellen unterstützt, die von Delta Lake unterstützt werden. Verwaltete Tabellen in Azure Databricks verwenden Delta Lake standardmäßig, sodass Sie eine leere Tabelle mit einer einzelnen VARIANT
Spalte mit der folgenden Syntax erstellen können:
CREATE TABLE table_name (variant_column VARIANT)
Alternativ können Sie die PARSE_JSON
Funktion in einer JSON-Zeichenfolge verwenden, um eine CTAS-Anweisung zum Erstellen einer Tabelle mit einer Variant-Spalte zu verwenden. Im folgenden Beispiel wird eine Tabelle mit zwei Spalten erstellt:
- Die aus der JSON-Zeichenfolge extrahierte
id
Spalte alsSTRING
Typ. - Die
variant_column
Spalte enthält die gesamte JSON-Zeichenfolge, die alsVARIANT
Typ codiert ist.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Hinweis
VARIANT
Spalten können nicht für Clusterschlüssel, Partitionen oder Z-Reihenfolge-Schlüssel verwendet werden. Daten, die mit VARIANT
dem Typ gespeichert sind, können nicht für Vergleiche und Sortierungen verwendet werden.
Databricks empfiehlt das Extrahieren und Speichern von Feldern als Nicht-Variant-Spalten, die Sie verwenden möchten, um Abfragen zu beschleunigen und das Speicherlayout zu optimieren.
Einfügen von Daten mithilfe von parse_json
Wenn die Zieltabelle bereits eine Spalte codiert als VARIANT
enthält, können Sie parse_json
benutzen, um JSON-Zeichenfolgeneinträge als VARIANT
einzufügen, wie im folgenden Beispiel:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Erfassen von Daten aus Cloudobjektspeicher als Variante
In Databricks Runtime 15.3 und höher können Sie Auto Loader verwenden, um alle Daten aus JSON-Quellen als einzelne VARIANT
Spalte in einer Zieltabelle zu laden. Da VARIANT
flexibel für Schema- und Typänderungen ist und die Groß-/Kleinschreibung und NULL
-Werte in der Datenquelle verwaltet, ist dieses Muster robust für die meisten Aufnahmeszenarien mit den folgenden Einschränkungen:
- Falsch formatierte JSON-Datensätze können nicht mit
VARIANT
typcodiert werden. - Der
VARIANT
-Typ kann Datensätze nur bis zu 16 MB groß halten.
Hinweis
Variant behandelt übermäßig große Datensätze, die mit beschädigten Datensätzen vergleichbar sind. Im Standardverarbeitungsmodus PERMISSIVE
werden übermäßig große Datensätze zusammen mit falsch formatierten JSON-Datensätzen in der _malformed_data
Spalte erfasst.
Da alle Daten aus der JSON-Quelle als einzelne VARIANT
-Spalte aufgezeichnet werden, tritt während der Aufnahme keine Schemaentwicklung auf und rescuedDataColumn
wird nicht unterstützt. Im folgenden Beispiel wird davon ausgegangen, dass die Zieltabelle bereits mit einer einzelnen VARIANT
Spalte vorhanden ist.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Sie können auch VARIANT
angeben, wenn Sie ein Schema definieren oder schemaHints
übergeben. Die Daten im Referenzquellfeld müssen eine gültige JSON-Zeichenfolge enthalten. Die folgenden Beispiele veranschaulichen diese Syntax:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Verwendung von COPY INTO
mit Variante
Databricks empfiehlt die Verwendung von Auto Loader anstelle von COPY INTO
, falls vorhanden.
COPY INTO
unterstützt das Aufnehmen des gesamten Inhalts einer JSON-Datenquelle als einzelne Spalte. Im folgenden Beispiel wird eine neue Tabelle mit einer einzelnen VARIANT
Spalte erstellt und dann zum Erfassen von Datensätzen aus einer JSON-Dateiquelle COPY INTO
verwendet.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Sie können auch ein beliebiges Feld in einer Zieltabelle als VARIANT
definieren. Wenn Sie COPY INTO
ausführen, werden die entsprechenden Felder in der Datenquelle erfasst und als VARIANT
-Typ umgewandelt, wie in den folgenden Beispielen:
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Stream Kafka-Daten als Variante
Viele Kafka-Streams codieren ihre Nutzlasten mithilfe von JSON. Das Aufnehmen von Kafka-Datenströmen mithilfe von VARIANT
macht diese Workloads robust gegenüber Schemaänderungen.
Das folgende Beispiel veranschaulicht das Lesen einer Kafka-Streamingquelle, das Umwandeln des key
-Typs als STRING
und value
als VARIANT
und das Schreiben in eine Zieltabelle.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)