Udostępnij za pośrednictwem


Typowe wzorce ładowania danych

Moduł automatycznego ładowania upraszcza szereg typowych zadań pozyskiwania danych. Ta szybka dokumentacja zawiera przykłady dla kilku popularnych wzorców.

Filtrowanie katalogów lub plików przy użyciu wzorców glob

Wzorce globu mogą służyć do filtrowania katalogów i plików, jeśli podano w ścieżce.

Wzorzec opis
? Pasuje do dowolnego pojedynczego znaku
* Dopasuje zero lub więcej znaków
[abc] Dopasuje pojedynczy znak z zestawu znaków {a,b,c}.
[a-z] Dopasuje pojedynczy znak z zakresu znaków {a... z}.
[^a] Dopasuje pojedynczy znak, który nie pochodzi z zestawu znaków lub zakresu {a}. Należy pamiętać, że ^ znak musi występować natychmiast po prawej stronie nawiasu otwierającego.
{ab,cd} Dopasuje ciąg z zestawu ciągów {ab, cd}.
{ab,c{de, fh}} Dopasuje ciąg z zestawu ciągów {ab, cde, cfh}.

Użyj elementu path do udostępniania wzorców prefiksów, na przykład:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Ważne

Należy użyć opcji pathGlobFilter jawnego udostępniania wzorców sufiksów. Jedyną path wartością jest filtr prefiksu.

Jeśli na przykład chcesz przeanalizować tylko png pliki w katalogu zawierającym pliki z różnymi sufiksami, możesz wykonać następujące czynności:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Uwaga

Domyślne zachowanie funkcji automatycznego ładowania globbingowego różni się od domyślnego zachowania innych źródeł plików platformy Spark. Dodaj .option("cloudFiles.useStrictGlobber", "true") element do odczytu, aby użyć funkcji globbing, która jest zgodna z domyślnym zachowaniem platformy Spark względem źródeł plików. Zobacz następującą tabelę, aby uzyskać więcej informacji na temat globbingu:

Wzorzec Ścieżka pliku Domyślny globber Surowy globber
/a/b /a/b/c/file.txt Tak Tak
/a/b /a/b_dir/c/file.txt Nie Nie
/a/b /a/b.txt Nie Nie
/a/b/ /a/b.txt Nie Nie
/a/*/c/ /a/b/c/file.txt Tak Tak
/a/*/c/ /a/b/c/d/file.txt Tak Tak
/a/*/c/ /a/b/x/y/c/file.txt Tak Nie
/a/*/c /a/b/c_file.txt Tak Nie
/a/*/c/ /a/b/c_file.txt Tak Nie
/a/*/c/ /a/*/cookie/file.txt Tak Nie
/a/b* /a/b.txt Tak Tak
/a/b* /a/b/file.txt Tak Tak
/a/{0.txt,1.txt} /a/0.txt Tak Tak
/a/*/{0.txt,1.txt} /a/0.txt Nie Nie
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Tak Tak

Włączanie łatwego etl

Łatwym sposobem wprowadzenia danych do Delta Lake bez utraty danych jest zastosowanie następującego wzorca i włączenie wnioskowania schematu za pomocą Auto Loader. Usługa Databricks zaleca uruchomienie następującego kodu w zadaniu usługi Azure Databricks w celu automatycznego ponownego uruchomienia strumienia po zmianie schematu danych źródłowych. Domyślnie schemat jest uznany za typy stringów, wszelkie błędy analizowania (nie powinno być żadnych, jeśli wszystko pozostanie jako string) przejdą do _rescued_data, a wszystkie nowe kolumny spowodują błąd strumienia i zmienią schemat.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Zapobieganie utracie danych w dobrze ustrukturyzowanych danych

Gdy znasz schemat, ale chcesz wiedzieć, kiedy otrzymujesz nieoczekiwane dane, usługa Databricks zaleca użycie rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Jeśli chcesz, aby strumień przestał przetwarzać, jeśli wprowadzono nowe pole, które nie jest zgodne ze schematem, możesz dodać:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Włączanie elastycznych potoków danych częściowo ustrukturyzowanych

Kiedy otrzymujesz dane od dostawcy, który wprowadza nowe kolumny do dostarczanych informacji, możesz nie być świadomy tego, kiedy dokładnie to robią, lub możesz nie mieć możliwości zaktualizować swojego potoku danych. Teraz możesz użyć ewolucji schematu, aby ponownie uruchomić strumień i zezwolić automatycznemu modułowi ładującego zaktualizować wywnioskowany schemat. Możesz również skorzystać schemaHints z niektórych pól "bez schematu", które dostawca może dostarczać.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Przekształcanie zagnieżdżonych danych JSON

Ponieważ Auto Loader interpretuje kolumny JSON najwyższego poziomu jako ciągi znaków, mogą pozostać zagnieżdżone obiekty JSON, które wymagają dalszych przekształceń. Interfejsy API dostępu do danych częściowo ustrukturyzowanych umożliwiają dalsze przekształcanie złożonej zawartości JSON.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Wnioskowanie zagnieżdżonych danych JSON

W przypadku zagnieżdżenia danych można użyć opcji cloudFiles.inferColumnTypes, aby wywnioskować zagnieżdżoną strukturę danych oraz inne typy kolumn.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Ładowanie plików CSV bez nagłówków

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Wymuszanie schematu w plikach CSV z nagłówkami

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Pozyskiwanie obrazów lub danych binarnych do usługi Delta Lake dla uczenia maszynowego

Gdy dane są przechowywane w usłudze Delta Lake, możesz uruchomić rozproszone wnioskowanie na danych. Zobacz Wykonywanie wnioskowania rozproszonego przy użyciu funkcji zdefiniowanej przez użytkownika biblioteki pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Składnia automatycznego modułu ładującego dla biblioteki DLT

Funkcja Delta Live Tables zapewnia nieco zmodyfikowaną składnię języka Python dla Auto Loader oraz dodaje obsługę języka SQL dla Auto Loader.

W poniższych przykładach użyto narzędzia Auto Loader do tworzenia zestawów danych z plików CSV i JSON:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")

Możesz użyć obsługiwanych opcji formatowania z modułem automatycznego ładowania. map() Za pomocą funkcji można przekazać opcje do read_files() metody . Opcje to pary klucz-wartość, w których klucze i wartości są ciągami. Poniżej opisano składnię pracy z modułem automatycznego ładowania w programie SQL:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Poniższy przykład odczytuje dane z plików CSV rozdzielanych tabulatorami z nagłówkiem:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

Możesz użyć schema, aby ręcznie określić format; Należy określić schema dla formatów, które nie obsługują wnioskowania schematu :

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM read_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Uwaga

Delta Live Tables automatycznie konfiguruje schemat i katalogi punktów kontrolnych oraz zarządza nimi podczas używania automatycznego modułu ładującego do odczytywania plików. Jeśli jednak ręcznie skonfigurujesz jeden z tych katalogów, wykonywanie pełnego odświeżania nie ma wpływu na zawartość skonfigurowanych katalogów. Usługa Databricks zaleca używanie automatycznie skonfigurowanych katalogów, aby uniknąć nieoczekiwanych skutków ubocznych podczas przetwarzania.