Распространенные шаблоны загрузки данных
Автозагрузчик упрощает ряд распространенных задач приема данных. В этом кратком справочнике приведены примеры нескольких популярных шаблонов.
Фильтрация каталогов или файлов с использованием glob-patterns
Глоб-шаблоны можно использовать для фильтрации каталогов и файлов, когда они указаны в пути.
Расписание | Описание |
---|---|
? |
Соответствует любому одиночному символу |
* |
Соответствует нулю или более символам |
[abc] |
Соответствует одиночному символу из кодировки {a, b, c}. |
[a-z] |
Соответствует одиночному символу из диапазона символов {a…z}. |
[^a] |
Соответствует одиночному символу, который не относится к кодировке или диапазону символов {a}. Обратите внимание, что символ ^ должен стоять непосредственно справа от открывающей скобки. |
{ab,cd} |
Соответствует строке из набора строк {ab, cd}. |
{ab,c{de, fh}} |
Соответствует строке из набора строк {ab, cde, cfh}. |
Используйте path
для предоставления шаблонов префиксов, например:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Внимание
Для явного предоставления шаблонов суффиксов необходимо использовать параметр pathGlobFilter
.
path
предоставляет только фильтр префиксов.
Например, если вы хотите проанализировать только файлы png
в каталоге, содержащем файлы с разными суффиксами, можно выполнить указанные ниже команды.
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Примечание.
Поведение автозагрузчика по умолчанию отличается от поведения по умолчанию других источников файлов Spark. Добавьте .option("cloudFiles.useStrictGlobber", "true")
в чтение, чтобы использовать глоббинг, соответствующий по умолчанию поведению Spark в источниках файлов. Дополнительную информацию о глоббинге можно найти в следующей таблице.
Расписание | Путь к файлу | Globber по умолчанию | Строгий глоббер |
---|---|---|---|
/a/b | /a/b/c/file.txt | Да | Да |
/a/b | /a/b_dir/c/file.txt | Нет | Нет |
/a/b | /a/b.txt | Нет | Нет |
/a/b/ | /a/b.txt | Нет | Нет |
/a/*/c/ | /a/b/c/file.txt | Да | Да |
/a/*/c/ | /a/b/c/d/file.txt | Да | Да |
/a/*/c/ | /a/b/x/y/c/file.txt | Да | Нет |
/a/*/c | /a/b/c_file.txt | Да | Нет |
/a/*/c/ | /a/b/c_file.txt | Да | Нет |
/a/*/c/ | /a/*/cookie/file.txt | Да | Нет |
/a/b* | /a/b.txt | Да | Да |
/a/b* | /a/b/file.txt | Да | Да |
/a/{0.txt,1.txt} | /a/0.txt | Да | Да |
/a/*/{0.txt,1.txt} | /a/0.txt | Нет | Нет |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Да | Да |
Включить простое ETL
Простой способ передать данные в Delta Lake без потери данных — использовать следующий шаблон и включить автоматическое определение схемы с функцией Автозагрузки. Databricks рекомендует запускать следующий код в задании Azure Databricks для автоматического перезапуска потока при изменении схемы исходных данных. По умолчанию схема автоматически считается состоящей из строковых типов, любые ошибки синтаксического анализа (их не должно возникать, если все данные останутся в виде строк) будут отправлены в _rescued_data
, а любые новые столбцы приведут к сбою потока и изменению структуры схемы.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Предотвращение потери данных в хорошо структурированных данных
Если вы знакомы со схемой, но хотите знать, когда будете получать непредвиденные данные, Databricks рекомендует использовать rescuedDataColumn
.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Если вы хотите, чтобы поток прекратил обработку при добавлении нового поля, которое не соответствует схеме, можно добавить:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Обеспечение гибких конвейеров с частично структурированными данными
При получении данных от поставщика, который вводит новые столбцы в предоставляемые им сведения, вы можете не знать точно, когда он это делают, или у вас может не быть достаточной пропускной способности для обновления конвейера данных. Теперь вы можете использовать эволюцию схемы для перезапуска потока иАвтозагрузчик автоматически обновит выводимую схему. Можно также использовать schemaHints
для некоторых бессхемных полей, которые может предоставить поставщик.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Преобразование вложенных данных JSON
Так как автозагрузчик выводит столбцы JSON верхнего уровня как строки, у вас могут остаться вложенные объекты JSON, требующие дальнейших преобразований. Вы можете использовать API доступа к частично структурированным данным для дальнейшего преобразования сложного содержимого JSON.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Вывод вложенных данных JSON
При наличии вложенных данных можно использовать параметр cloudFiles.inferColumnTypes
для вывода вложенной структуры данных и других типов столбцов.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Загрузка CSV-файлов без заголовков
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Применение схемы в CSV-файлах с заголовками
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Загрузка изображений и двоичных данных в Delta Lake для ML
После сохранения данных в Delta Lake можно выполнять распределенный вывод данных. Дополнительные сведения см. в разделе Выполнение распределенного вывода с помощью UDF pandas.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Синтаксис автозагрузчика для DLT
DLT предоставляет немного измененный синтаксис Python для автозагрузчика добавляет поддержку SQL для автозагрузчика.
В следующих примерах для создания наборов данных из CSV- и JSON-файлов используется Автозагрузчик.
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/customers/",
format => "csv"
)
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders/",
format => "json")
Вы можете использовать поддерживаемые параметры формата для автозагрузчика. Опции для read_files
представлены в виде пар «ключ-значение». Дополнительные сведения о поддерживаемых форматах и параметрах см. в разделе Параметры.
Рассмотрим пример.
CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_volume/path/to/files/*",
option-key => option-value,
...
)
В следующем примере показано, как из CSV-файлов с разделителями табуляции считываются данные с заголовком.
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/customers/",
format => "csv",
delimiter => "\t",
header => "true"
)
Вы можете использовать schema
для указания формата вручную; вы должны указать schema
для форматов, которые не поддерживают вывод схемы.
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
format => "parquet",
schema => "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING"
)
Примечание.
DLT автоматически настраивает и управляет каталогами схемы и контрольных точек при использовании автозагрузчика для чтения файлов. Однако если вы вручную настроите любой из этих каталогов, выполнение полного обновления не повлияет на содержимое настроенных каталогов. Для избежания непредвиденных побочных эффектов во время обработки, в Databricks рекомендуется использовать автоматически настроенные каталоги.