Прием данных в виде полуструктурированного типа варианта
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
В Databricks Runtime 15.3 и более поздних версиях можно использовать VARIANT
тип для приема полуструктурированных данных. В этой статье описано поведение и приведены примеры шаблонов приема данных из облачного хранилища объектов с помощью автозагрузчика и COPY INTO
, потоковой передачи записей из Kafka и команд SQL для создания таблиц с вариантными данными или вставки новых записей с помощью типа варианта.
См . сведения о варианте запроса.
Создание таблицы с вариантным столбцом
VARIANT
— это стандартный тип SQL в Databricks Runtime 15.3 и более поздних версий и поддерживается таблицами, поддерживаемыми Delta Lake. Управляемые таблицы в Azure Databricks по умолчанию используют Delta Lake, поэтому можно создать пустую таблицу с одним столбцом VARIANT
с помощью следующего синтаксиса:
CREATE TABLE table_name (variant_column VARIANT)
Кроме того, можно использовать функцию PARSE_JSON
в строке JSON, чтобы использовать инструкцию CTAS для создания таблицы с вариантным столбцом. В следующем примере создается таблица с двумя столбцами:
- Столбец
id
, извлеченный из строки JSON в виде типаSTRING
. - Столбец
variant_column
содержит всю строку JSON, закодированную как типVARIANT
.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Примечание.
Databricks рекомендует извлекать и хранить поля в качестве не вариантных столбцов, которые планируется использовать для ускорения запросов и оптимизации макета хранилища.
VARIANT
столбцы нельзя использовать для кластеризации ключей, секций или ключей порядка Z. Тип данных VARIANT
нельзя использовать для сравнения, группировки, упорядочивания и задания операций. Для получения полного списка ограничений см. Ограничения.
Вставка данных с помощью parse_json
Если целевая таблица уже содержит столбец, закодированный как VARIANT
, можно использовать parse_json
для вставки строковых записей JSON как VARIANT
, как показано в следующем примере:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Прием данных из облачного хранилища объектов в качестве варианта
В Databricks Runtime 15.3 и более поздних версиях можно использовать автозагрузчик для загрузки всех данных из источников JSON в виде одного столбца VARIANT
в целевой таблице. Так как VARIANT
гибко адаптируется к изменениям схемы и типа и соблюдает чувствительность к регистру, а также учитывает NULL
значения, присутствующие в источнике данных, этот шаблон является надежным для большинства сценариев загрузки данных с учетом следующих предостережений:
- Неправильно сформированные записи JSON не могут быть закодированы с помощью
VARIANT
типа. -
VARIANT
Тип может хранить только записи размером до 16 мб.
Примечание.
Variant обрабатывает слишком большие записи, аналогичные поврежденным записям. В режиме обработки PERMISSIVE
по умолчанию в столбце _malformed_data
наряду с неправильно сформированными записями JSON фиксируются слишком большие записи.
Так как все данные из источника JSON записываются как один столбец VARIANT
, во время приема схемы не происходит эволюция схемы и rescuedDataColumn
не поддерживается. В следующем примере предполагается, что целевая таблица уже существует с одним столбцом VARIANT
.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Можно также указать VARIANT
при определении схемы или передаче schemaHints
. Данные в поле источника, на который ссылается ссылка, должны содержать допустимую строку JSON. В следующих примерах показан этот синтаксис:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Использование COPY INTO
с вариантом
Databricks рекомендует использовать автозагрузчик COPY INTO
при наличии.
COPY INTO
поддерживает прием всего содержимого источника данных JSON в виде одного столбца. В следующем примере создается новая таблица с одним столбцом VARIANT
, а затем используется COPY INTO
для приема записей из источника JSON-файла.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Вы также можете определить любое поле в целевой таблице как VARIANT
. При запуске COPY INTO
соответствующие поля в источнике данных получаются и приведение к VARIANT
типу, как показано в следующих примерах:
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Потоковая передача данных Kafka в качестве варианта
Многие потоки Kafka кодируют полезные данные с помощью JSON. Прием потоков Kafka с помощью VARIANT
делает эти рабочие нагрузки надежными для изменений схемы.
В следующем примере показано чтение источника потоковой передачи Kafka, приведение key
в качестве STRING
и value
как VARIANT
и запись в целевую таблицу.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)