Поделиться через


Распространенные шаблоны загрузки данных

Автозагрузчик упрощает ряд распространенных задач приема данных. В этом кратком справочнике приведены примеры нескольких популярных шаблонов.

Фильтрация каталогов или файлов с использованием glob-patterns

Глоб-шаблоны можно использовать для фильтрации каталогов и файлов, когда они указаны в пути.

Расписание Описание
? Соответствует любому одиночному символу
* Соответствует нулю или более символам
[abc] Соответствует одиночному символу из кодировки {a, b, c}.
[a-z] Соответствует одиночному символу из диапазона символов {a…z}.
[^a] Соответствует одиночному символу, который не относится к кодировке или диапазону символов {a}. Обратите внимание, что символ ^ должен стоять непосредственно справа от открывающей скобки.
{ab,cd} Соответствует строке из набора строк {ab, cd}.
{ab,c{de, fh}} Соответствует строке из набора строк {ab, cde, cfh}.

Используйте path для предоставления шаблонов префиксов, например:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Внимание

Для явного предоставления шаблонов суффиксов необходимо использовать параметр pathGlobFilter. path предоставляет только фильтр префиксов.

Например, если вы хотите проанализировать только файлы png в каталоге, содержащем файлы с разными суффиксами, можно выполнить указанные ниже команды.

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Примечание.

Поведение автозагрузчика по умолчанию отличается от поведения по умолчанию других источников файлов Spark. Добавьте .option("cloudFiles.useStrictGlobber", "true") в чтение, чтобы использовать глоббинг, соответствующий по умолчанию поведению Spark в источниках файлов. Дополнительную информацию о глоббинге можно найти в следующей таблице.

Расписание Путь к файлу Globber по умолчанию Строгий глоббер
/a/b /a/b/c/file.txt Да Да
/a/b /a/b_dir/c/file.txt Нет Нет
/a/b /a/b.txt Нет Нет
/a/b/ /a/b.txt Нет Нет
/a/*/c/ /a/b/c/file.txt Да Да
/a/*/c/ /a/b/c/d/file.txt Да Да
/a/*/c/ /a/b/x/y/c/file.txt Да Нет
/a/*/c /a/b/c_file.txt Да Нет
/a/*/c/ /a/b/c_file.txt Да Нет
/a/*/c/ /a/*/cookie/file.txt Да Нет
/a/b* /a/b.txt Да Да
/a/b* /a/b/file.txt Да Да
/a/{0.txt,1.txt} /a/0.txt Да Да
/a/*/{0.txt,1.txt} /a/0.txt Нет Нет
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Да Да

Включить простое ETL

Простой способ передать данные в Delta Lake без потери данных — использовать следующий шаблон и включить автоматическое определение схемы с функцией Автозагрузки. Databricks рекомендует запускать следующий код в задании Azure Databricks для автоматического перезапуска потока при изменении схемы исходных данных. По умолчанию схема автоматически считается состоящей из строковых типов, любые ошибки синтаксического анализа (их не должно возникать, если все данные останутся в виде строк) будут отправлены в _rescued_data, а любые новые столбцы приведут к сбою потока и изменению структуры схемы.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Предотвращение потери данных в хорошо структурированных данных

Если вы знакомы со схемой, но хотите знать, когда будете получать непредвиденные данные, Databricks рекомендует использовать rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Если вы хотите, чтобы поток прекратил обработку при добавлении нового поля, которое не соответствует схеме, можно добавить:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Обеспечение гибких конвейеров с частично структурированными данными

При получении данных от поставщика, который вводит новые столбцы в предоставляемые им сведения, вы можете не знать точно, когда он это делают, или у вас может не быть достаточной пропускной способности для обновления конвейера данных. Теперь вы можете использовать эволюцию схемы для перезапуска потока иАвтозагрузчик автоматически обновит выводимую схему. Можно также использовать schemaHints для некоторых бессхемных полей, которые может предоставить поставщик.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Преобразование вложенных данных JSON

Так как автозагрузчик выводит столбцы JSON верхнего уровня как строки, у вас могут остаться вложенные объекты JSON, требующие дальнейших преобразований. Вы можете использовать API доступа к частично структурированным данным для дальнейшего преобразования сложного содержимого JSON.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Вывод вложенных данных JSON

При наличии вложенных данных можно использовать параметр cloudFiles.inferColumnTypes для вывода вложенной структуры данных и других типов столбцов.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Загрузка CSV-файлов без заголовков

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Применение схемы в CSV-файлах с заголовками

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Загрузка изображений и двоичных данных в Delta Lake для ML

После сохранения данных в Delta Lake можно выполнять распределенный вывод данных. Дополнительные сведения см. в разделе Выполнение распределенного вывода с помощью UDF pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Синтаксис автозагрузчика для DLT

DLT предоставляет немного измененный синтаксис Python для автозагрузчика добавляет поддержку SQL для автозагрузчика.

В следующих примерах для создания наборов данных из CSV- и JSON-файлов используется Автозагрузчик.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv"
)

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders/",
  format => "json")

Вы можете использовать поддерживаемые параметры формата для автозагрузчика. Опции для read_files представлены в виде пар «ключ-значение». Дополнительные сведения о поддерживаемых форматах и параметрах см. в разделе Параметры.

Рассмотрим пример.

CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
  FROM STREAM read_files(
    "/Volumes/my_volume/path/to/files/*",
    option-key => option-value,
    ...
  )

В следующем примере показано, как из CSV-файлов с разделителями табуляции считываются данные с заголовком.

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv",
  delimiter => "\t",
  header => "true"
)

Вы можете использовать schema для указания формата вручную; вы должны указать schema для форматов, которые не поддерживают вывод схемы.

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
  format => "parquet",
  schema => "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING"
)

Примечание.

DLT автоматически настраивает и управляет каталогами схемы и контрольных точек при использовании автозагрузчика для чтения файлов. Однако если вы вручную настроите любой из этих каталогов, выполнение полного обновления не повлияет на содержимое настроенных каталогов. Для избежания непредвиденных побочных эффектов во время обработки, в Databricks рекомендуется использовать автоматически настроенные каталоги.