Mata in data som halvstrukturerad varianttyp
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
I Databricks Runtime 15.3 och senare kan du använda VARIANT
typen för att mata in halvstrukturerade data. Den här artikeln beskriver beteende och innehåller exempelmönster för inmatning av data från molnobjektlagring med hjälp av Auto Loader och COPY INTO
, strömmande poster från Kafka och SQL-kommandon för att skapa nya tables med variantdata eller infoga nya poster med hjälp av varianttypen.
Se Frågevariantdata.
Skapa en table med en variant column
VARIANT
är en SQL-standardtyp i Databricks Runtime 15.3 och senare och stöds av tables som backas upp av Delta Lake. Hanterade tables i Azure Databricks använder Delta Lake som standard, så att du kan skapa en tom table med en enda VARIANT
column med hjälp av följande syntax:
CREATE TABLE table_name (variant_column VARIANT)
Alternativt kan du använda funktionen PARSE_JSON
på en JSON-sträng för att använda en CTAS-instruktion för att skapa en table med en variant column. I följande exempel skapas en table med två columns:
- Den
id
column som extraherats från JSON-strängen är avSTRING
typ. -
variant_column
column innehåller hela JSON-strängen som kodas somVARIANT
typ.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Kommentar
Databricks rekommenderar att du extraherar och lagrar fält som icke-variant columns som du planerar att använda för att påskynda frågor och optimize lagringslayout.
VARIANT
columns kan inte användas för klustring av nycklar, partitioner eller Z-ordernycklar. Den VARIANT
datatypen kan inte användas för jämförelser, gruppering, ordning och set åtgärder. En fullständig list av begränsningar finns i Begränsningar.
Använda Insert-data med parse_json
Om målet table redan innehåller en column, kodad som VARIANT
, kan du använda parse_json
för att insert JSON-strängposter som VARIANT
, som i följande exempel:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Mata in data från molnobjektlagring som variant
I Databricks Runtime 15.3 och senare kan du använda Auto Loader för att läsa in all data från JSON-källor som en enda VARIANT
column i ett mål table. Eftersom VARIANT
är flexibelt för schema och typändringar och upprätthåller skiftlägeskänslighet samt behåller NULL
values som finns i datakällan, är det här mönstret robust i de flesta inmatningsscenarier med följande varningar:
- Felaktiga JSON-poster kan inte kodas med hjälp av
VARIANT
typen . -
VARIANT
-typen kan bara innehålla poster med en storlek på upp till 16 mb.
Kommentar
Variant behandlar alltför stora poster som liknar skadade poster. I standardbearbetningsläget PERMISSIVE
samlas alltför stora poster in i _malformed_data
column tillsammans med felaktiga JSON-poster.
Eftersom alla data från JSON-källan registreras som en enda VARIANT
column, sker ingen schema utveckling under inmatning och rescuedDataColumn
stöds inte. I följande exempel förutsätts att målet table redan finns med en enda VARIANT
column.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Du kan också ange VARIANT
när du definierar en schema eller skickar schemaHints
. Data i det refererade källfältet måste innehålla en giltig JSON-sträng. Följande exempel visar den här syntaxen:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Använd COPY INTO
med variant
Databricks rekommenderar att du använder Auto Loader över COPY INTO
när det är tillgängligt.
COPY INTO
stöder inmatning av hela innehållet i en JSON-datakälla som en enda column. I följande exempel skapas en ny table med en enda VARIANT
column, och sedan används COPY INTO
för att mata in poster från en JSON-filkälla.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Du kan också definiera valfritt fält i ett mål table som VARIANT
. När du kör COPY INTO
matas motsvarande fält i datakällan in och castas till VARIANT
typ, som i följande exempel:
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Strömma Kafka-data som variant
Många Kafka-strömmar kodar sina nyttolaster med JSON. Bearbetning av Kafka-strömmar med hjälp av VARIANT
gör dessa arbeten robusta mot schema ändringar.
I följande exempel visas hur du läser en Kafka-strömningskälla, omvandlar key
till en STRING
och value
till VARIANT
, och skriver ut till en målplats table.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)