Поделиться через


Прием данных в виде полуструктурированного типа варианта

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

В Databricks Runtime 15.3 и более поздних версиях можно использовать VARIANT тип для приема полуструктурированных данных. В этой статье описано поведение и приведены примеры шаблонов приема данных из облачного хранилища объектов с помощью автозагрузчика и COPY INTO, потоковой передачи записей из Kafka и команд SQL для создания таблиц с вариантными данными или вставки новых записей с помощью типа варианта.

См . сведения о варианте запроса.

Создание таблицы с вариантным столбцом

VARIANT — это стандартный тип SQL в Databricks Runtime 15.3 и более поздних версий и поддерживается таблицами, поддерживаемыми Delta Lake. Управляемые таблицы в Azure Databricks по умолчанию используют Delta Lake, поэтому можно создать пустую таблицу с одним столбцом VARIANT с помощью следующего синтаксиса:

CREATE TABLE table_name (variant_column VARIANT)

Кроме того, можно использовать функцию PARSE_JSON в строке JSON, чтобы использовать инструкцию CTAS для создания таблицы с вариантным столбцом. В следующем примере создается таблица с двумя столбцами:

  • Столбец id, извлеченный из строки JSON в виде типа STRING.
  • Столбец variant_column содержит всю строку JSON, закодированную как тип VARIANT.
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Примечание.

Databricks рекомендует извлекать и хранить поля в качестве не вариантных столбцов, которые планируется использовать для ускорения запросов и оптимизации макета хранилища.

VARIANT столбцы нельзя использовать для кластеризации ключей, секций или ключей порядка Z. Тип данных VARIANT нельзя использовать для сравнения, группировки, упорядочивания и задания операций. Для получения полного списка ограничений см. Ограничения.

Вставка данных с помощью parse_json

Если целевая таблица уже содержит столбец, закодированный как VARIANT, можно использовать parse_json для вставки строковых записей JSON как VARIANT, как показано в следующем примере:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Прием данных из облачного хранилища объектов в качестве варианта

В Databricks Runtime 15.3 и более поздних версиях можно использовать автозагрузчик для загрузки всех данных из источников JSON в виде одного столбца VARIANT в целевой таблице. Так как VARIANT гибко адаптируется к изменениям схемы и типа и соблюдает чувствительность к регистру, а также учитывает NULL значения, присутствующие в источнике данных, этот шаблон является надежным для большинства сценариев загрузки данных с учетом следующих предостережений:

  • Неправильно сформированные записи JSON не могут быть закодированы с помощью VARIANT типа.
  • VARIANT Тип может хранить только записи размером до 16 мб.

Примечание.

Variant обрабатывает слишком большие записи, аналогичные поврежденным записям. В режиме обработки PERMISSIVE по умолчанию в столбце _malformed_data наряду с неправильно сформированными записями JSON фиксируются слишком большие записи.

Так как все данные из источника JSON записываются как один столбец VARIANT, во время приема схемы не происходит эволюция схемы и rescuedDataColumn не поддерживается. В следующем примере предполагается, что целевая таблица уже существует с одним столбцом VARIANT.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Можно также указать VARIANT при определении схемы или передаче schemaHints. Данные в поле источника, на который ссылается ссылка, должны содержать допустимую строку JSON. В следующих примерах показан этот синтаксис:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Использование COPY INTO с вариантом

Databricks рекомендует использовать автозагрузчик COPY INTO при наличии.

COPY INTO поддерживает прием всего содержимого источника данных JSON в виде одного столбца. В следующем примере создается новая таблица с одним столбцом VARIANT, а затем используется COPY INTO для приема записей из источника JSON-файла.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Вы также можете определить любое поле в целевой таблице как VARIANT. При запуске COPY INTOсоответствующие поля в источнике данных получаются и приведение к VARIANT типу, как показано в следующих примерах:

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Потоковая передача данных Kafka в качестве варианта

Многие потоки Kafka кодируют полезные данные с помощью JSON. Прием потоков Kafka с помощью VARIANT делает эти рабочие нагрузки надежными для изменений схемы.

В следующем примере показано чтение источника потоковой передачи Kafka, приведение key в качестве STRING и value как VARIANTи запись в целевую таблицу.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)