Inserire dati come tipo variant semistrutturato
Importante
Questa funzionalità è disponibile in anteprima pubblica.
In Databricks Runtime 15.3 e versioni successive è possibile usare il VARIANT
tipo per inserire dati semistrutturati. Questo articolo descrive il comportamento e fornisce modelli di esempio per l'inserimento di dati dall'archiviazione di oggetti cloud tramite il caricatore automatico e COPY INTO
, lo streaming di record da Kafka e i comandi SQL per la creazione di nuove tabelle con dati variant o l'inserimento di nuovi record usando il tipo variant.
Vedere Eseguire query sui dati varianti.
Creare una tabella con una colonna variante
VARIANT
è un tipo SQL standard in Databricks Runtime 15.3 e versioni successive e supportato dalle tabelle supportate da Delta Lake. Per impostazione predefinita, le tabelle gestite in Azure Databricks usano Delta Lake, quindi è possibile creare una tabella vuota con una singola VARIANT
colonna usando la sintassi seguente:
CREATE TABLE table_name (variant_column VARIANT)
In alternativa, è possibile usare la PARSE_JSON
funzione in una stringa JSON per usare un'istruzione CTAS per creare una tabella con una colonna variante. Nell'esempio seguente viene creata una tabella con due colonne:
- Colonna
id
estratta dalla stringa JSON comeSTRING
tipo. - La
variant_column
colonna contiene l'intera stringa JSON codificata comeVARIANT
tipo.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Nota
Databricks consiglia di estrarre e archiviare i campi come colonne non varianti che si prevede di usare per accelerare le query e ottimizzare il layout di archiviazione.
VARIANT
non è possibile usare colonne per il clustering di chiavi, partizioni o chiavi con ordine Z. Non è possibile utilizzare il tipo di dati VARIANT
per le operazioni di confronto, raggruppamento, ordinamento e set. Per un elenco completo delle limitazioni, vedere limitazioni .
Inserire dati usando parse_json
Se la tabella di destinazione contiene già una colonna codificata come VARIANT
, è possibile usare parse_json
per inserire record stringa JSON come VARIANT
, come nell'esempio seguente:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Inserire dati dall'archiviazione di oggetti cloud come variante
In Databricks Runtime 15.3 e versioni successive è possibile usare il caricatore automatico per caricare tutti i dati da origini JSON come singola VARIANT
colonna in una tabella di destinazione. Poiché VARIANT
è flessibile per la modifica dello schema e del tipo e mantiene la distinzione tra maiuscole e minuscole e NULL
i valori presenti nell'origine dati, questo modello è affidabile per la maggior parte degli scenari di inserimento con le avvertenze seguenti:
- I record JSON in formato non valido non possono essere codificati usando
VARIANT
il tipo . -
VARIANT
il tipo può contenere solo record di dimensioni fino a 16 MB.
Nota
Variant considera record eccessivamente di grandi dimensioni simili ai record danneggiati. Nella modalità di elaborazione predefinita PERMISSIVE
, i record eccessivamente grandi vengono acquisiti nella _malformed_data
colonna insieme a record JSON in formato non valido.
Poiché tutti i dati dell'origine JSON vengono registrati come una singola VARIANT
colonna, non viene eseguita alcuna evoluzione dello schema durante l'inserimento e rescuedDataColumn
non è supportata. Nell'esempio seguente si presuppone che la tabella di destinazione esista già con una singola VARIANT
colonna.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
È anche possibile specificare VARIANT
quando si definisce uno schema o si schemaHints
passa . I dati nel campo di origine a cui si fa riferimento devono contenere una stringa JSON valida. Gli esempi seguenti illustrano questa sintassi:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Usare COPY INTO
con variant
Databricks consiglia di usare il caricatore automatico quando COPY INTO
disponibile.
COPY INTO
supporta l'inserimento dell'intero contenuto di un'origine dati JSON come singola colonna. L'esempio seguente crea una nuova tabella con una singola VARIANT
colonna e quindi usa COPY INTO
per inserire record da un'origine file JSON.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
È anche possibile definire qualsiasi campo in una tabella di destinazione come VARIANT
. Quando si esegue COPY INTO
, i campi corrispondenti nell'origine dati vengono inseriti ed eseguiti il cast al VARIANT
tipo, come negli esempi seguenti:
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Trasmettere dati Kafka come variant
Molti flussi Kafka codificano i payload usando JSON. L'inserimento di flussi Kafka tramite VARIANT
rende questi carichi di lavoro affidabili per le modifiche dello schema.
Nell'esempio seguente viene illustrata la lettura di un'origine di streaming Kafka, il key
cast di come STRING
e value
come VARIANT
e la scrittura in una tabella di destinazione.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)