Dela via


Mata in data som halvstrukturerad varianttyp

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

I Databricks Runtime 15.3 och senare kan du använda VARIANT typen för att mata in halvstrukturerade data. Den här artikeln beskriver beteende och innehåller exempelmönster för inmatning av data från molnobjektlagring med hjälp av Auto Loader och COPY INTO, strömmande poster från Kafka och SQL-kommandon för att skapa nya tables med variantdata eller infoga nya poster med hjälp av varianttypen.

Se Frågevariantdata.

Skapa en table med en variant column

VARIANT är en SQL-standardtyp i Databricks Runtime 15.3 och senare och stöds av tables som backas upp av Delta Lake. Hanterade tables i Azure Databricks använder Delta Lake som standard, så att du kan skapa en tom table med en enda VARIANTcolumn med hjälp av följande syntax:

CREATE TABLE table_name (variant_column VARIANT)

Alternativt kan du använda funktionen PARSE_JSON på en JSON-sträng för att använda en CTAS-instruktion för att skapa en table med en variant column. I följande exempel skapas en table med två columns:

  • Den idcolumn som extraherats från JSON-strängen är av STRING typ.
  • variant_column column innehåller hela JSON-strängen som kodas som VARIANT typ.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Kommentar

Databricks rekommenderar att du extraherar och lagrar fält som icke-variant columns som du planerar att använda för att påskynda frågor och optimize lagringslayout.

VARIANT columns kan inte användas för klustring av nycklar, partitioner eller Z-ordernycklar. Den VARIANT datatypen kan inte användas för jämförelser, gruppering, ordning och set åtgärder. En fullständig list av begränsningar finns i Begränsningar.

Använda Insert-data med parse_json

Om målet table redan innehåller en column, kodad som VARIANT, kan du använda parse_json för att insert JSON-strängposter som VARIANT, som i följande exempel:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Mata in data från molnobjektlagring som variant

I Databricks Runtime 15.3 och senare kan du använda Auto Loader för att läsa in all data från JSON-källor som en enda VARIANTcolumn i ett mål table. Eftersom VARIANT är flexibelt för schema och typändringar och upprätthåller skiftlägeskänslighet samt behåller NULLvalues som finns i datakällan, är det här mönstret robust i de flesta inmatningsscenarier med följande varningar:

  • Felaktiga JSON-poster kan inte kodas med hjälp av VARIANT typen .
  • VARIANT -typen kan bara innehålla poster med en storlek på upp till 16 mb.

Kommentar

Variant behandlar alltför stora poster som liknar skadade poster. I standardbearbetningsläget PERMISSIVE samlas alltför stora poster in i _malformed_datacolumn tillsammans med felaktiga JSON-poster.

Eftersom alla data från JSON-källan registreras som en enda VARIANTcolumn, sker ingen schema utveckling under inmatning och rescuedDataColumn stöds inte. I följande exempel förutsätts att målet table redan finns med en enda VARIANTcolumn.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Du kan också ange VARIANT när du definierar en schema eller skickar schemaHints. Data i det refererade källfältet måste innehålla en giltig JSON-sträng. Följande exempel visar den här syntaxen:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Använd COPY INTO med variant

Databricks rekommenderar att du använder Auto Loader över COPY INTO när det är tillgängligt.

COPY INTO stöder inmatning av hela innehållet i en JSON-datakälla som en enda column. I följande exempel skapas en ny table med en enda VARIANTcolumn, och sedan används COPY INTO för att mata in poster från en JSON-filkälla.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Du kan också definiera valfritt fält i ett mål table som VARIANT. När du kör COPY INTOmatas motsvarande fält i datakällan in och castas till VARIANT typ, som i följande exempel:

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Strömma Kafka-data som variant

Många Kafka-strömmar kodar sina nyttolaster med JSON. Bearbetning av Kafka-strömmar med hjälp av VARIANT gör dessa arbeten robusta mot schema ändringar.

I följande exempel visas hur du läser en Kafka-strömningskälla, omvandlar key till en STRING och value till VARIANT, och skriver ut till en målplats table.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)