Gegevens opnemen als semi-gestructureerd varianttype
Belangrijk
Deze functie is beschikbaar als openbare preview.
In Databricks Runtime 15.3 en hoger kunt u het VARIANT
type gebruiken om semi-gestructureerde gegevens op te nemen. In dit artikel wordt het gedrag beschreven en worden voorbeeldpatronen gegeven voor het opnemen van gegevens uit de opslag van cloudobjecten met behulp van automatisch laden en COPY INTO
streamingrecords uit Kafka en SQL-opdrachten voor het maken van nieuwe tabellen met variantgegevens of het invoegen van nieuwe records met behulp van het varianttype.
Zie Queryvariantgegevens.
Een tabel met een variantkolom maken
VARIANT
is een standaard SQL-type in Databricks Runtime 15.3 en hoger en wordt ondersteund door tabellen die worden ondersteund door Delta Lake. Beheerde tabellen in Azure Databricks maken standaard gebruik van Delta Lake, zodat u een lege tabel met één VARIANT
kolom kunt maken met behulp van de volgende syntaxis:
CREATE TABLE table_name (variant_column VARIANT)
U kunt de PARSE_JSON
functie ook gebruiken in een JSON-tekenreeks om een CTAS-instructie te gebruiken om een tabel met een variantkolom te maken. In het volgende voorbeeld wordt een tabel met twee kolommen gemaakt:
- De
id
kolom die uit de JSON-tekenreeks is geëxtraheerd als eenSTRING
type. - De
variant_column
kolom bevat de hele JSON-tekenreeks die is gecodeerd alsVARIANT
type.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Notitie
Databricks raadt aan velden te extraheren en op te slaan als niet-variantkolommen die u wilt gebruiken om query's te versnellen en de opslagindeling te optimaliseren.
VARIANT
kolommen kunnen niet worden gebruikt voor clustersleutels, partities of Z-volgordesleutels. Het VARIANT
gegevenstype kan niet worden gebruikt voor vergelijkingen, groepering, volgorde en setbewerkingen. Zie Beperkingenvoor een volledige lijst met beperkingen.
Gegevens invoegen met behulp van parse_json
Als de doeltabel al een kolom bevat die is gecodeerd, VARIANT
kunt parse_json
u JSON-tekenreeksrecords invoegen als VARIANT
in het volgende voorbeeld:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Gegevens opnemen uit cloudobjectopslag als variant
In Databricks Runtime 15.3 en hoger kunt u Auto Loader gebruiken om alle gegevens uit JSON-bronnen te laden als één VARIANT
kolom in een doeltabel. Omdat VARIANT
het flexibel is voor schema- en typewijzigingen en het onderhouden van hoofdlettergevoeligheid en NULL
waarden in de gegevensbron, is dit patroon robuust voor de meeste opnamescenario's met de volgende opmerkingen:
- Ongeldige JSON-records kunnen niet worden gecodeerd met behulp van
VARIANT
het type. -
VARIANT
type kan alleen records tot 16 mb groot bevatten.
Notitie
Variant behandelt overgrote records die vergelijkbaar zijn met beschadigde records. In de standaardverwerkingsmodus PERMISSIVE
worden te grote records vastgelegd in de _malformed_data
kolom naast ongeldige JSON-records.
Omdat alle gegevens uit de JSON-bron als één VARIANT
kolom worden vastgelegd, vindt er geen schemaontwikkeling plaats tijdens de opname en rescuedDataColumn
wordt deze niet ondersteund. In het volgende voorbeeld wordt ervan uitgegaan dat de doeltabel al bestaat met één VARIANT
kolom.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
U kunt ook opgeven VARIANT
wanneer u een schema definieert of doorgeeft schemaHints
. De gegevens in het bronveld waarnaar wordt verwezen, moeten een geldige JSON-tekenreeks bevatten. In de volgende voorbeelden ziet u deze syntaxis:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Gebruiken COPY INTO
met variant
Databricks raadt het gebruik van automatisch laadprogramma aan COPY INTO
wanneer deze beschikbaar is.
COPY INTO
ondersteunt het opnemen van de volledige inhoud van een JSON-gegevensbron als één kolom. In het volgende voorbeeld wordt een nieuwe tabel met één VARIANT
kolom gemaakt en vervolgens gebruikt COPY INTO
voor het opnemen van records uit een JSON-bestandsbron.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
U kunt ook elk veld in een doeltabel definiëren als VARIANT
. Wanneer u uitvoert COPY INTO
, worden de bijbehorende velden in de gegevensbron opgenomen en gecast om te VARIANT
typen, zoals in de volgende voorbeelden:
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Kafka-gegevens streamen als variant
Veel Kafka-streams coderen hun nettoladingen met behulp van JSON. Het opnemen van Kafka-streams maakt VARIANT
deze workloads robuust voor schemawijzigingen.
In het volgende voorbeeld ziet u hoe u een Kafka-streamingbron leest, hoe u de key
bron gieten als STRING
en als value
VARIANT
, en schrijft naar een doeltabel.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)