Ingesta de datos como tipo variante semiestructurado
Importante
Esta característica está en versión preliminar pública.
En Databricks Runtime 15.3 y versiones posteriores, puede usar el tipo VARIANT
para ingerir datos semiestructurados. En este artículo se describe el comportamiento y se proporciona patrones de ejemplo para la ingesta de datos desde el almacenamiento de objetos en la nube mediante Auto Loader y COPY INTO
, la transmisión de registros desde Kafka, y comandos SQL para crear nuevas tablas con datos variantes o insertar nuevos registros usando el tipo variante.
Consulte Consulta de datos de variante.
Creación de una tabla con una columna variante
VARIANT
es un tipo SQL estándar en Databricks Runtime 15.3 y versiones posteriores y compatibles con tablas respaldadas por Delta Lake. Las tablas administradas en Azure Databricks usan Delta Lake de forma predeterminada, por lo que puede crear una tabla vacía con una sola columna VARIANT
mediante la siguiente sintaxis:
CREATE TABLE table_name (variant_column VARIANT)
Como alternativa, puede usar la función PARSE_JSON
en una cadena JSON para usar una instrucción CTAS para crear una tabla con una columna variante. En el siguiente ejemplo se crea una tabla con dos columnas:
- Columna
id
extraída de la cadena JSON como un tipoSTRING
. - La columna
variant_column
contiene toda la cadena JSON codificada como tipoVARIANT
.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Nota:
Databricks recomienda extraer y almacenar como columnas no variantes los campos que piense usar para acelerar las consultas y optimizar la disposición del almacenamiento.
Las columnas VARIANT
no se pueden usar para las claves de agrupación en clústeres, las particiones ni las claves de orden Z. No se puede usar el tipo de datos VARIANT
para comparaciones, agrupación, ordenación y establecimiento de operaciones. Para obtener una lista completa de las limitaciones, consulte Limitaciones.
Inserción de datos mediante parse_json
Si la tabla de destino ya contiene una columna codificada como VARIANT
, puede usar parse_json
para insertar registros de cadena JSON como VARIANT
, como en el siguiente ejemplo:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Ingesta de datos del almacenamiento de objetos en la nube como variante
En Databricks Runtime 15.3 y versiones posteriores, puede usar Auto Loader para cargar todos los datos de orígenes JSON como una sola columna VARIANT
en una tabla de destino. Dado que VARIANT
es flexible para los cambios de esquema y tipo y mantiene la distinción entre mayúsculas y minúsculas y los valores NULL
presentes en el origen de datos, este patrón es sólido para la mayoría de los escenarios de ingesta con las siguientes advertencias:
- Los registros JSON con formato incorrecto no se pueden codificar mediante tipo
VARIANT
. - El tipo
VARIANT
solo puede contener registros de hasta 16 mb de tamaño.
Nota:
Variante trata registros demasiado grandes similares a los registros dañados. En el modo de procesamiento predeterminado PERMISSIVE
, los registros demasiado grandes se capturan en la columna _malformed_data
junto con registros JSON con formato incorrecto.
Dado que todos los datos del origen JSON se registran como una sola columna VARIANT
, no se produce ninguna evolución del esquema durante la ingesta y rescuedDataColumn
no se admite. En el siguiente ejemplo se supone que la tabla de destino ya existe con una sola columna VARIANT
.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
También puede especificar VARIANT
al definir un esquema o pasar schemaHints
. Los datos del campo de origen al que se hace referencia deben contener una cadena JSON válida. En los ejemplos siguientes se muestra esta sintaxis:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Uso de COPY INTO
con variante
Databricks recomienda usar Auto Loader en COPY INTO
cuando esté disponible.
COPY INTO
admite la ingesta de todo el contenido de un origen de datos JSON como una sola columna. En el siguiente ejemplo se crea una nueva tabla con una sola columna VARIANT
y, a continuación, se usa COPY INTO
para ingerir registros de un origen de archivo JSON.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
También puede definir cualquier campo de una tabla de destino como VARIANT
. Al ejecutar COPY INTO
, los campos correspondientes del origen de datos se ingieren y convierten en tipo VARIANT
, como en los siguientes ejemplos:
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Transmisión de datos de Kafka como variante
Muchos flujos de Kafka codifican sus cargas con JSON. La ingesta de flujos de Kafka mediante VARIANT
hace que estas cargas de trabajo sean sólidas para los cambios de esquema.
En el siguiente ejemplo se muestra cómo leer un origen de streaming de Kafka, convertir el key
en un STRING
y el value
en VARIANT
y escribir en una tabla de destino.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)