Compartilhar via


Ingerir dados como tipo de variante semiestruturada

Importante

Esse recurso está em uma versão prévia.

No Databricks Runtime 15.3 e superior, você pode usar o tipo VARIANT para ingerir dados semiestruturados. Este artigo descreve o comportamento e fornece padrões de exemplo para ingerir dados do armazenamento de objetos de nuvem usando o Carregador Automático e COPY INTO, os registros de streaming do Kafka e comandos SQL para criar novas tabelas com dados variantes ou inserir novos registros usando o tipo variante.

Consulte os Dados de variante de consulta.

Criar uma tabela com uma coluna variante

VARIANT é um tipo SQL padrão no Databricks Runtime 15.3 e superior e suportado por tabelas apoiadas pelo Delta Lake. As tabelas gerenciadas no Azure Databricks usam o Delta Lake por padrão, para que você possa criar uma tabela vazia com uma única coluna VARIANT usando a seguinte sintaxe:

CREATE TABLE table_name (variant_column VARIANT)

Como alternativa, você pode usar a função PARSE_JSON em uma cadeia de caracteres JSON para usar uma instrução CTAS e criar uma tabela com uma coluna variante. O exemplo a seguir cria uma tabela com duas colunas:

  • A coluna id extraída da cadeia de caracteres JSON como um tipo STRING.
  • A coluna variant_column contém toda a cadeia de caracteres JSON codificada como tipo VARIANT.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Observação

As colunas VARIANT não podem ser usadas para chaves de clustering, partições ou chaves de ordem Z. Os dados armazenados com o tipo VARIANT não podem ser usados para comparações e ordenações.

O Databricks recomenda extrair e armazenar campos como colunas não variantes que você planeja usar para acelerar consultas e otimizar o layout de armazenamento.

Inserir dados usando parse_json

Se a tabela de destino já contiver uma coluna codificada como VARIANT, você poderá usar parse_json para inserir registros de cadeia de caracteres JSON como VARIANT, como no exemplo a seguir:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Ingerir dados do armazenamento de objetos de nuvem como variante

No Databricks Runtime 15.3 e superior, você pode usar o Carregador Automático para carregar todos os dados de fontes JSON como uma única coluna VARIANT em uma tabela de destino. Como VARIANT é flexível para alterações de esquema e tipo e mantém a confidencialidade de maiúsculas e minúsculas e os valores NULL presentes na fonte de dados, esse padrão é robusto para a maioria dos cenários de ingestão com as seguintes ressalvas:

  • Registros JSON malformados não podem ser codificados usando o tipo VARIANT.
  • O tipo VARIANT só pode conter registros de até 16 mb de tamanho.

Observação

A variante trata registros excessivamente da mesma forma que trata registros corrompidos. No modo de processamento PERMISSIVE padrão, registros excessivamente grandes são capturados na coluna _malformed_data juntamente com registros JSON malformados.

Como todos os dados da fonte JSON são registrados como uma única coluna VARIANT, nenhuma evolução do esquema ocorre durante a ingestão e não há suporte a rescuedDataColumn. O exemplo a seguir pressupõe que a tabela de destino já existe com uma única coluna VARIANT.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Você também pode especificar VARIANT ao definir um esquema ou passar schemaHints. Os dados no campo de origem referenciado devem conter uma cadeia de caracteres JSON válida. Os exemplos a seguir demonstram essa sintaxe:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Usar COPY INTO com variante

O Databricks recomenda usar o Carregador Automático em COPY INTO quando disponível.

COPY INTO dá suporte à ingestão de todo o conteúdo de uma fonte de dados JSON como uma única coluna. O exemplo a seguir cria uma nova tabela com uma única coluna VARIANT e, em seguida, usa COPY INTO para ingerir registros de uma fonte de arquivo JSON.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Você também pode definir qualquer campo em uma tabela de destino como VARIANT. Quando você executa COPY INTO, os campos correspondentes na fonte de dados são ingeridos e convertidos para o tipo VARIANT, como nos seguintes exemplos:

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Transmitir dados do Kafka como variante

Muitos fluxos Kafka codificam suas cargas usando JSON. A ingestão de fluxos Kafka usando VARIANT torna essas cargas de trabalho robustas para alterações de esquema.

O exemplo a seguir demonstra a leitura de uma fonte de streaming do Kafka, lançando key como uma STRING, valuecomo VARIANT e a gravação em uma tabela de destino.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)