Настройка развития и вывода схемы в автозагрузчике
Вы можете настроить автозагрузчик для автоматического обнаружения схемы загруженных данных, позволяя инициализировать таблицы без явного объявления схемы данных и развивать схему таблицы в виде новых столбцов. Это устраняет необходимость вручную отслеживать и применять изменения схемы с течением времени.
Автозагрузчик также может "спасти" непредвиденные данные (например, отличающиеся типы данных) в столбец BLOB-объектов JSON, доступ к которым можно затем получить с помощью частично структурированных API доступа к данным.
Для вывода и развития схемы поддерживаются следующие форматы:
File format | Поддерживаемые версии |
---|---|
JSON |
Все версии |
CSV |
Все версии |
XML |
Databricks Runtime 14.3 LTS и более поздних версий |
Avro |
Databricks Runtime 10.4 LTS и более поздних версий |
Parquet |
Databricks Runtime 11.3 LTS и более поздних версий |
ORC |
Не поддерживается |
Text |
Неприменимо (фиксированная схема) |
Binaryfile |
Неприменимо (фиксированная схема) |
Синтаксис для развития и вывода схемы
Указание целевого каталога для параметра cloudFiles.schemaLocation
включает развитие и вывод схемы. Вы можете использовать тот же каталог, который указали для checkpointLocation
. При использовании разностных динамических таблиц Azure Databricks автоматически управляет расположением схемы и другими сведениями о контрольной точке.
Примечание.
Если в целевую таблицу загружено несколько исходных данных, для каждой рабочей нагрузки приема автозагрузчика требуется отдельная контрольная точка потоковой передачи.
В следующем примере используется parquet
для cloudFiles.format
. Используйте csv
, avro
или json
для других источников файлов. Все остальные параметры чтения и записи остаются неизменными для поведения по умолчанию для каждого формата.
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Как работает вывод схемы автозагрузчика?
Чтобы определить схему при первом чтении данных, автозагрузчик примеров первых 50 ГБ или 1000 файлов, которые он обнаруживает, в зависимости от того, какой предел пересекается сначала. Автозагрузчик сохраняет сведения о схеме в каталоге _schemas
, настроенном cloudFiles.schemaLocation
для отслеживания изменений схемы входных данных с течением времени.
Примечание.
Чтобы изменить размер используемой выборки, можно задать конфигурации SQL:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(байтовая строка, например 10gb
)
и
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(целое число)
По умолчанию вывод схемы автозагрузчика стремится избежать проблем с эволюцией схемы из-за несоответствия типов. Для форматов, которые не кодируют типы данных (JSON, CSV и XML), автозагрузчик выводит все столбцы в виде строк (включая вложенные поля в JSON-файлах). Для форматов с типизированной схемой (Parquet и Avro) автозагрузчик примеров подмножества файлов и объединяет схемы отдельных файлов. Это поведение приводится в следующей таблице:
File format | Тип данных с выводом по умолчанию |
---|---|
JSON |
Строка |
CSV |
Строка |
XML |
Строка |
Avro |
Типы, закодированные в схеме Avro |
Parquet |
Типы, закодированные в схеме Parquet |
Apache Spark DataFrameReader использует другое поведение для вывода схемы, выбор типов данных для столбцов в json, CSV и XML-источниках на основе примеров данных. Чтобы включить это поведение с помощью автозагрузчика, задайте для параметра значение cloudFiles.inferColumnTypes
true
.
Примечание.
При выводе схемы для данных CSV автозагрузчик предполагает, что файлы содержат заголовки. Если CSV-файлы не содержат заголовков, укажите параметр .option("header", "false")
. Кроме того, Auto Loader объединяет схемы всех файлов в примере, чтобы получить глобальную схему. После этого Auto Loader может считывать каждый файл в соответствии с заголовком и правильно анализировать данные в формате CSV.
Примечание.
Если столбец имеет разные типы данных в двух файлах Parquet, автозагрузчик выбирает самый широкий тип. Для переопределения этого варианта можно использовать schemaHints . При указании подсказок схемы автозагрузчик не приводит столбец к указанному типу, а указывает средству чтения Parquet читать столбец как указанный тип. В случае несоответствия столбец спасается в спасаемом столбце данных.
Как работает эволюция схемы автозагрузчика?
Автозагрузчик обнаруживает добавление новых столбцов при обработке данных. Когда автозагрузчик обнаруживает новый столбец, поток останавливается с UnknownFieldException
помощью . Прежде чем поток выдает эту ошибку, автозагрузчик выполняет вывод схемы в последней микропакете данных и обновляет расположение схемы с последней схемой, объединяя новые столбцы в конец схемы. Типы данных существующих столбцов останутся неизменными.
Databricks рекомендует настроить потоки автозагрузчика с помощью заданий Databricks, чтобы автоматически перезапустить после таких изменений схемы.
Автозагрузчик поддерживает следующие режимы для развития схем, которые задаются в параметре cloudFiles.schemaEvolutionMode
:
Как секции работают с автозагрузчиком?
Автозагрузчик пытается определить столбцы секций из базовой структуры каталогов данных, если данные размещаются в секционирования стилей Hive. Например, путь к base_path/event=click/date=2021-04-01/f0.json
файлу приводит к выводу date
и event
как столбцам секционирования. Если базовая структура каталога содержит конфликтующие секции Hive или не содержит секционирование стилей Hive, столбцы секционирования игнорируются.
Двоичные файлы (binaryFile
) и text
форматы файлов имеют фиксированные схемы данных, но поддерживают вывод столбцов секций. Databricks рекомендует задать cloudFiles.schemaLocation
для этих форматов файлов. Это позволяет избежать возможных ошибок или потери информации и предотвращает вывод столбцов секций при каждом запуске автозагрузчика.
Столбцы секционирования не учитываются при развитии схемы. Если у вас была начальная структура каталога, например base_path/event=click/date=2021-04-01/f0.json
, а затем начать получать новые файлы в качестве base_path/event=click/date=2021-04-01/hour=01/f1.json
, автозагрузчик игнорирует столбец часа. Для записи сведений о новых столбцах секционирования задайте для cloudFiles.partitionColumns
значение event,date,hour
.
Примечание.
cloudFiles.partitionColumns
Параметр принимает разделенный запятыми список имен столбцов. Анализируются только столбцы, которые существуют как key=value
пары в структуре каталогов.
Что такое столбец спасенных данных?
При автоматической загрузке схемы автоматически добавляется в схему в качестве _rescued_data
спасенного столбца данных. Можно переименовать столбец или включить его в тех случаях, когда вы предоставляете схему параметром rescuedDataColumn
.
Спасаемый столбец данных гарантирует, что столбцы, которые не соответствуют схеме, будут спасены вместо удаления. Столбец спасенных данных содержит любые данные, которые не анализируются по следующим причинам:
- Столбец отсутствует в схеме.
- Несоответствия типов.
- Несоответствие регистра.
Столбец спасенных данных содержит JSON, содержащий спасаемые столбцы и путь к исходному файлу записи.
Примечание.
Синтаксические анализаторы JSON и CSV поддерживают три режима при анализе записей: PERMISSIVE
, DROPMALFORMED
и FAILFAST
. При использовании вместе с rescuedDataColumn
несоответствие типов данных не приводит к удалению записей в режиме DROPMALFORMED
или возникновению ошибки в режиме FAILFAST
. Будут удалены или приведут к ошибке только поврежденные записи, т. е. неполные или неправильные файлы JSON или CSV. Если вы используете badRecordsPath
при синтаксическом анализе JSON или CSV, несоответствия типов данных не будут считаться неправильными записями при использовании rescuedDataColumn
. В badRecordsPath
хранятся только неполные и неправильные записи JSON или CSV.
Изменение поведения с учетом регистра
Если учет регистра не включен, столбцы abc
, Abc
и ABC
считаются одним столбцом в целях вывода схемы. Выбор регистра является произвольным и зависит от данных выборки. Вы можете использовать указания схемы, чтобы определить, какой регистр следует использовать. После выбора и вывода схемы автозагрузчик не будет учитывать варианты регистра, которые не были выбраны с учетом схемы.
Если столбец восстановленных данных включен, поля, именованные в регистре, отличном от указанного в схеме, будут загружены в столбец _rescued_data
. Измените это поведение, установив для параметра readerCaseSensitive
значение false. В этом случае автозагрузчик будет считывать данные без учета регистра.
Переопределение вывода схемы с указанием схемы
Вы можете использовать подсказки схемы для принудительного применения сведений о схеме, которые вы знаете и ожидаете в выводимых схемах. Если вы знаете, что столбец имеет определенный тип данных или вы хотите выбрать более общий тип данных (например, double
вместо него integer
), можно указать произвольное количество подсказок для типов данных столбцов в виде строки с помощью синтаксиса спецификации схемы SQL, например следующего:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
Список поддерживаемых типов данных см. в документации по типам данных.
Если столбец отсутствует в начале потока, можно также использовать указания схемы, чтобы добавить этот столбец в выведенную схему.
Ниже приведен пример выведенной схемы, чтобы прояснить роль указаний схемы.
Выведенная схема:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
Задав следующие указания схемы:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
вы получите:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Примечание.
Поддержка указаний для схем Array и Map доступна в Databricks Runtime 9.1 LTS и более поздних версий.
Ниже приведен пример выведенной схемы со сложными типами, чтобы прояснить роль указаний схемы.
Выведенная схема:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
Задав следующие указания схемы:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
вы получите:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Примечание.
Указания схемы используются только в том случае, если вы не предоставляете схема Автозагрузчику. Указания схемы можно использовать независимо от того, включен ли параметр cloudFiles.inferColumnTypes
или отключен.