Freigeben über


Erfassen von Daten als halbstrukturierter Variantentyp

Wichtig

Dieses Feature befindet sich in der Public Preview.

In Databricks Runtime 15.3 und höher können Sie den VARIANT Typ verwenden, um halbstrukturierte Daten zu erfassen. In diesem Artikel werden das Verhalten beschrieben und Beispielmuster zum Aufnehmen von Daten aus dem Cloudobjektspeicher mithilfe von AutoLadeprogramm und COPY INTO, Streamingdatensätzen von Kafka und SQL-Befehlen zum Erstellen neuer Tabellen mit Variantendaten oder Einfügen neuer Datensätze mithilfe des Variantentyps bereitgestellt.

Siehe Abfragevariantendaten.

Erstellen einer Tabelle mit einer Variant-Spalte

VARIANT ist ein standardmäßiger SQL-Typ in Databricks Runtime 15.3 und höher und wird von Tabellen unterstützt, die von Delta Lake unterstützt werden. Verwaltete Tabellen in Azure Databricks verwenden Delta Lake standardmäßig, sodass Sie eine leere Tabelle mit einer einzelnen VARIANT Spalte mit der folgenden Syntax erstellen können:

CREATE TABLE table_name (variant_column VARIANT)

Alternativ können Sie die PARSE_JSON Funktion in einer JSON-Zeichenfolge verwenden, um eine CTAS-Anweisung zum Erstellen einer Tabelle mit einer Variant-Spalte zu verwenden. Im folgenden Beispiel wird eine Tabelle mit zwei Spalten erstellt:

  • Die aus der JSON-Zeichenfolge extrahierte id Spalte als STRING Typ.
  • Die variant_column Spalte enthält die gesamte JSON-Zeichenfolge, die als VARIANT Typ codiert ist.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Hinweis

VARIANT Spalten können nicht für Clusterschlüssel, Partitionen oder Z-Reihenfolge-Schlüssel verwendet werden. Daten, die mit VARIANT dem Typ gespeichert sind, können nicht für Vergleiche und Sortierungen verwendet werden.

Databricks empfiehlt das Extrahieren und Speichern von Feldern als Nicht-Variant-Spalten, die Sie verwenden möchten, um Abfragen zu beschleunigen und das Speicherlayout zu optimieren.

Einfügen von Daten mithilfe von parse_json

Wenn die Zieltabelle bereits eine Spalte codiert als VARIANT enthält, können Sie parse_json benutzen, um JSON-Zeichenfolgeneinträge als VARIANT einzufügen, wie im folgenden Beispiel:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Erfassen von Daten aus Cloudobjektspeicher als Variante

In Databricks Runtime 15.3 und höher können Sie Auto Loader verwenden, um alle Daten aus JSON-Quellen als einzelne VARIANT Spalte in einer Zieltabelle zu laden. Da VARIANT flexibel für Schema- und Typänderungen ist und die Groß-/Kleinschreibung und NULL-Werte in der Datenquelle verwaltet, ist dieses Muster robust für die meisten Aufnahmeszenarien mit den folgenden Einschränkungen:

  • Falsch formatierte JSON-Datensätze können nicht mit VARIANT typcodiert werden.
  • Der VARIANT-Typ kann Datensätze nur bis zu 16 MB groß halten.

Hinweis

Variant behandelt übermäßig große Datensätze, die mit beschädigten Datensätzen vergleichbar sind. Im Standardverarbeitungsmodus PERMISSIVE werden übermäßig große Datensätze zusammen mit falsch formatierten JSON-Datensätzen in der _malformed_data Spalte erfasst.

Da alle Daten aus der JSON-Quelle als einzelne VARIANT-Spalte aufgezeichnet werden, tritt während der Aufnahme keine Schemaentwicklung auf und rescuedDataColumn wird nicht unterstützt. Im folgenden Beispiel wird davon ausgegangen, dass die Zieltabelle bereits mit einer einzelnen VARIANT Spalte vorhanden ist.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Sie können auch VARIANT angeben, wenn Sie ein Schema definieren oder schemaHints übergeben. Die Daten im Referenzquellfeld müssen eine gültige JSON-Zeichenfolge enthalten. Die folgenden Beispiele veranschaulichen diese Syntax:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Verwendung von COPY INTO mit Variante

Databricks empfiehlt die Verwendung von Auto Loader anstelle von COPY INTO, falls vorhanden.

COPY INTO unterstützt das Aufnehmen des gesamten Inhalts einer JSON-Datenquelle als einzelne Spalte. Im folgenden Beispiel wird eine neue Tabelle mit einer einzelnen VARIANT Spalte erstellt und dann zum Erfassen von Datensätzen aus einer JSON-Dateiquelle COPY INTO verwendet.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Sie können auch ein beliebiges Feld in einer Zieltabelle als VARIANT definieren. Wenn Sie COPY INTO ausführen, werden die entsprechenden Felder in der Datenquelle erfasst und als VARIANT-Typ umgewandelt, wie in den folgenden Beispielen:

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Stream Kafka-Daten als Variante

Viele Kafka-Streams codieren ihre Nutzlasten mithilfe von JSON. Das Aufnehmen von Kafka-Datenströmen mithilfe von VARIANT macht diese Workloads robust gegenüber Schemaänderungen.

Das folgende Beispiel veranschaulicht das Lesen einer Kafka-Streamingquelle, das Umwandeln des key-Typs als STRING und value als VARIANT und das Schreiben in eine Zieltabelle.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)