Delen via


Gegevens opnemen als semi-gestructureerd varianttype

Belangrijk

Deze functie is beschikbaar als openbare preview.

In Databricks Runtime 15.3 en hoger kunt u het VARIANT type gebruiken om semi-gestructureerde gegevens op te nemen. In dit artikel wordt het gedrag beschreven en worden voorbeeldpatronen gegeven voor het opnemen van gegevens uit de opslag van cloudobjecten met behulp van automatisch laden en COPY INTOstreamingrecords uit Kafka en SQL-opdrachten voor het maken van nieuwe tabellen met variantgegevens of het invoegen van nieuwe records met behulp van het varianttype.

Zie Queryvariantgegevens.

Een tabel met een variantkolom maken

VARIANT is een standaard SQL-type in Databricks Runtime 15.3 en hoger en wordt ondersteund door tabellen die worden ondersteund door Delta Lake. Beheerde tabellen in Azure Databricks maken standaard gebruik van Delta Lake, zodat u een lege tabel met één VARIANT kolom kunt maken met behulp van de volgende syntaxis:

CREATE TABLE table_name (variant_column VARIANT)

U kunt de PARSE_JSON functie ook gebruiken in een JSON-tekenreeks om een CTAS-instructie te gebruiken om een tabel met een variantkolom te maken. In het volgende voorbeeld wordt een tabel met twee kolommen gemaakt:

  • De id kolom die uit de JSON-tekenreeks is geëxtraheerd als een STRING type.
  • De variant_column kolom bevat de hele JSON-tekenreeks die is gecodeerd als VARIANT type.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Notitie

Databricks raadt aan velden te extraheren en op te slaan als niet-variantkolommen die u wilt gebruiken om query's te versnellen en de opslagindeling te optimaliseren.

VARIANT kolommen kunnen niet worden gebruikt voor clustersleutels, partities of Z-volgordesleutels. Het VARIANT gegevenstype kan niet worden gebruikt voor vergelijkingen, groepering, volgorde en setbewerkingen. Zie Beperkingenvoor een volledige lijst met beperkingen.

Gegevens invoegen met behulp van parse_json

Als de doeltabel al een kolom bevat die is gecodeerd, VARIANTkunt parse_json u JSON-tekenreeksrecords invoegen als VARIANTin het volgende voorbeeld:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Gegevens opnemen uit cloudobjectopslag als variant

In Databricks Runtime 15.3 en hoger kunt u Auto Loader gebruiken om alle gegevens uit JSON-bronnen te laden als één VARIANT kolom in een doeltabel. Omdat VARIANT het flexibel is voor schema- en typewijzigingen en het onderhouden van hoofdlettergevoeligheid en NULL waarden in de gegevensbron, is dit patroon robuust voor de meeste opnamescenario's met de volgende opmerkingen:

  • Ongeldige JSON-records kunnen niet worden gecodeerd met behulp van VARIANT het type.
  • VARIANT type kan alleen records tot 16 mb groot bevatten.

Notitie

Variant behandelt overgrote records die vergelijkbaar zijn met beschadigde records. In de standaardverwerkingsmodus PERMISSIVE worden te grote records vastgelegd in de _malformed_data kolom naast ongeldige JSON-records.

Omdat alle gegevens uit de JSON-bron als één VARIANT kolom worden vastgelegd, vindt er geen schemaontwikkeling plaats tijdens de opname en rescuedDataColumn wordt deze niet ondersteund. In het volgende voorbeeld wordt ervan uitgegaan dat de doeltabel al bestaat met één VARIANT kolom.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

U kunt ook opgeven VARIANT wanneer u een schema definieert of doorgeeft schemaHints. De gegevens in het bronveld waarnaar wordt verwezen, moeten een geldige JSON-tekenreeks bevatten. In de volgende voorbeelden ziet u deze syntaxis:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Gebruiken COPY INTO met variant

Databricks raadt het gebruik van automatisch laadprogramma aan COPY INTO wanneer deze beschikbaar is.

COPY INTO ondersteunt het opnemen van de volledige inhoud van een JSON-gegevensbron als één kolom. In het volgende voorbeeld wordt een nieuwe tabel met één VARIANT kolom gemaakt en vervolgens gebruikt COPY INTO voor het opnemen van records uit een JSON-bestandsbron.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

U kunt ook elk veld in een doeltabel definiëren als VARIANT. Wanneer u uitvoert COPY INTO, worden de bijbehorende velden in de gegevensbron opgenomen en gecast om te VARIANT typen, zoals in de volgende voorbeelden:

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Kafka-gegevens streamen als variant

Veel Kafka-streams coderen hun nettoladingen met behulp van JSON. Het opnemen van Kafka-streams maakt VARIANT deze workloads robuust voor schemawijzigingen.

In het volgende voorbeeld ziet u hoe u een Kafka-streamingbron leest, hoe u de key bron gieten als STRING en als valueVARIANT, en schrijft naar een doeltabel.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)