Ingerir dados como tipo de variante semiestruturada
Importante
Esta funcionalidade está em Pré-visualização Pública.
No Databricks Runtime 15.3 e superior, você pode usar o VARIANT
tipo para ingerir dados semiestruturados. Este artigo descreve o comportamento e fornece padrões de exemplo para a ingestão de dados do armazenamento de objetos na nuvem usando o Auto Loader e COPY INTO
, streaming de registros do Kafka e comandos SQL para criar novas tabelas com dados de variantes ou inserir novos registros usando o tipo de variante.
Consulte Dados de variantes de consulta.
Criar uma tabela com uma coluna variante
VARIANT
é um tipo SQL padrão no Databricks Runtime 15.3 e superior e suportado por tabelas apoiadas pelo Delta Lake. As tabelas gerenciadas no Azure Databricks usam o Delta Lake por padrão, para que você possa criar uma tabela vazia com uma única VARIANT
coluna usando a seguinte sintaxe:
CREATE TABLE table_name (variant_column VARIANT)
Como alternativa, você pode usar a PARSE_JSON
função em uma cadeia de caracteres JSON para usar uma instrução CTAS para criar uma tabela com uma coluna variante. O exemplo a seguir cria uma tabela com duas colunas:
- A
id
coluna extraída da cadeia de caracteres JSON como umSTRING
tipo. - A
variant_column
coluna contém toda a cadeia de caracteres JSON codificada comoVARIANT
tipo.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Nota
VARIANT
as colunas não podem ser usadas para agrupar chaves, partições ou chaves de ordem Z. Os dados armazenados com VARIANT
o tipo não podem ser usados para comparações e ordenações.
O Databricks recomenda extrair e armazenar campos como colunas não variantes que você planeja usar para acelerar consultas e otimizar o layout de armazenamento.
Inserir dados usando parse_json
Se a tabela de destino já contiver uma coluna codificada como VARIANT
, você poderá usar parse_json
para inserir registros de cadeia de caracteres JSON como VARIANT
, como no exemplo a seguir:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Ingerir dados do armazenamento de objetos na nuvem como variante
No Databricks Runtime 15.3 e superior, você pode usar o Auto Loader para carregar todos os dados de fontes JSON como uma única VARIANT
coluna em uma tabela de destino. Como VARIANT
é flexível para alterações de esquema e tipo e mantém a diferenciação de maiúsculas e minúsculas e NULL
os valores presentes na fonte de dados, esse padrão é robusto para a maioria dos cenários de ingestão com as seguintes advertências:
- Registros JSON malformados não podem ser codificados usando
VARIANT
tipo. VARIANT
O tipo só pode conter registros de até 16 MB de tamanho.
Nota
Variant trata registros excessivamente grandes semelhantes a registros corrompidos. No modo de processamento padrão PERMISSIVE
, registros excessivamente grandes são capturados na coluna ao lado de _malformed_data
registros JSON malformados.
Como todos os dados da fonte JSON são registrados como uma única VARIANT
coluna, nenhuma evolução do esquema ocorre durante a ingestão e rescuedDataColumn
não é suportada. O exemplo a seguir pressupõe que a tabela de destino já existe com uma única VARIANT
coluna.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Você também pode especificar VARIANT
ao definir um esquema ou passar schemaHints
. Os dados no campo de origem referenciada devem conter uma cadeia de caracteres JSON válida. Os exemplos a seguir demonstram essa sintaxe:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Utilização COPY INTO
com variante
A Databricks recomenda o uso do Auto Loader quando COPY INTO
disponível.
COPY INTO
suporta a ingestão de todo o conteúdo de uma fonte de dados JSON como uma única coluna. O exemplo a seguir cria uma nova tabela com uma única VARIANT
coluna e, em seguida, usa COPY INTO
para ingerir registros de uma fonte de arquivo JSON.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Você também pode definir qualquer campo em uma tabela de destino como VARIANT
. Quando você executa COPY INTO
o , os campos correspondentes na fonte de dados são ingeridos e convertidos para VARIANT
digitar, como nos exemplos a seguir:
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Stream Kafka dados como variante
Muitos fluxos Kafka codificam suas cargas usando JSON. A ingestão de fluxos Kafka usando VARIANT
torna essas cargas de trabalho robustas para alterações de esquema.
O exemplo a seguir demonstra a leitura de uma fonte de streaming Kafka, a conversão do key
como a STRING
e o como VARIANT
e a value
e a gravação em uma tabela de destino.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)