Typowe wzorce ładowania danych
Moduł automatycznego ładowania upraszcza szereg typowych zadań pozyskiwania danych. Ta szybka dokumentacja zawiera przykłady dla kilku popularnych wzorców.
Filtrowanie katalogów lub plików przy użyciu wzorców glob
Wzorce globu mogą służyć do filtrowania katalogów i plików, jeśli podano w ścieżce.
Wzorzec | opis |
---|---|
? |
Pasuje do dowolnego pojedynczego znaku |
* |
Dopasuje zero lub więcej znaków |
[abc] |
Dopasuje pojedynczy znak z zestawu znaków {a,b,c}. |
[a-z] |
Dopasuje pojedynczy znak z zakresu znaków {a... z}. |
[^a] |
Dopasuje pojedynczy znak, który nie pochodzi z zestawu znaków lub zakresu {a}. Należy pamiętać, że ^ znak musi występować natychmiast po prawej stronie nawiasu otwierającego. |
{ab,cd} |
Dopasuje ciąg z zestawu ciągów {ab, cd}. |
{ab,c{de, fh}} |
Dopasuje ciąg z zestawu ciągów {ab, cde, cfh}. |
Użyj elementu path
do udostępniania wzorców prefiksów, na przykład:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Ważne
Należy użyć opcji pathGlobFilter
jawnego udostępniania wzorców sufiksów. Jedyną path
wartością jest filtr prefiksu.
Jeśli na przykład chcesz przeanalizować tylko png
pliki w katalogu zawierającym pliki z różnymi sufiksami, możesz wykonać następujące czynności:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Uwaga
Domyślne zachowanie funkcji automatycznego ładowania globbingowego różni się od domyślnego zachowania innych źródeł plików platformy Spark. Dodaj .option("cloudFiles.useStrictGlobber", "true")
element do odczytu, aby użyć funkcji globbing, która jest zgodna z domyślnym zachowaniem platformy Spark względem źródeł plików. Zobacz następującą tabelę, aby uzyskać więcej informacji na temat tworzenia symboli globbingowych:
Wzorzec | Ścieżka pliku | Domyślny globber | Surowy globber |
---|---|---|---|
/a/b | /a/b/c/file.txt | Tak | Tak |
/a/b | /a/b_dir/c/file.txt | Nie | Nie |
/a/b | /a/b.txt | Nie | Nie |
/a/b/ | /a/b.txt | Nie | Nie |
/a/*/c/ | /a/b/c/file.txt | Tak | Tak |
/a/*/c/ | /a/b/c/d/file.txt | Tak | Tak |
/a/*/c/ | /a/b/x/y/c/file.txt | Tak | Nie |
/a/*/c | /a/b/c_file.txt | Tak | Nie |
/a/*/c/ | /a/b/c_file.txt | Tak | Nie |
/a/*/c/ | /a/*/cookie/file.txt | Tak | Nie |
/a/b* | /a/b.txt | Tak | Tak |
/a/b* | /a/b/file.txt | Tak | Tak |
/a/{0.txt,1.txt} | /a/0.txt | Tak | Tak |
/a/*/{0.txt,1.txt} | /a/0.txt | Nie | Nie |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Tak | Tak |
Włączanie łatwego etl
Łatwym sposobem uzyskania danych do usługi Delta Lake bez utraty danych jest użycie następującego wzorca i włączenie wnioskowania schematu za pomocą modułu ładującego automatycznego. Usługa Databricks zaleca uruchomienie następującego kodu w zadaniu usługi Azure Databricks w celu automatycznego ponownego uruchomienia strumienia po zmianie schematu danych źródłowych. Domyślnie schemat jest wywnioskowany jako typy ciągów, wszelkie błędy analizowania (nie powinno być żadnych, jeśli wszystko pozostanie jako ciąg) przejdzie do _rescued_data
, a wszystkie nowe kolumny nie powiedzą się strumieniu i ewoluują schemat.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Zapobieganie utracie danych w dobrze ustrukturyzowanych danych
Jeśli znasz schemat, ale chcesz wiedzieć, kiedy otrzymujesz nieoczekiwane dane, usługa Databricks zaleca użycie polecenia rescuedDataColumn
.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Jeśli chcesz, aby strumień przestał przetwarzać, jeśli wprowadzono nowe pole, które nie jest zgodne ze schematem, możesz dodać:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Włączanie elastycznych potoków danych częściowo ustrukturyzowanych
Gdy otrzymujesz dane od dostawcy, który wprowadza nowe kolumny do podanych informacji, możesz nie być świadomy dokładnie tego, kiedy to robią, lub nie masz przepustowości aktualizacji potoku danych. Teraz możesz użyć ewolucji schematu, aby ponownie uruchomić strumień i zezwolić automatycznemu modułowi ładującego zaktualizować wywnioskowany schemat. Możesz również skorzystać schemaHints
z niektórych pól "bez schematu", które dostawca może dostarczać.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Przekształcanie zagnieżdżonych danych JSON
Ponieważ moduł automatycznego ładowania wywnioskuje kolumny JSON najwyższego poziomu jako ciągi, można pozostawić z zagnieżdżonych obiektów JSON, które wymagają dalszych przekształceń. Interfejsy API dostępu do danych częściowo ustrukturyzowanych umożliwiają dalsze przekształcanie złożonej zawartości JSON.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Wnioskowanie zagnieżdżonych danych JSON
W przypadku zagnieżdżenia danych można użyć cloudFiles.inferColumnTypes
opcji wnioskowania zagnieżdżonej struktury danych i innych typów kolumn.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Ładowanie plików CSV bez nagłówków
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Wymuszanie schematu w plikach CSV z nagłówkami
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Pozyskiwanie obrazów lub danych binarnych do usługi Delta Lake dla uczenia maszynowego
Gdy dane są przechowywane w usłudze Delta Lake, możesz uruchomić rozproszone wnioskowanie na danych. Zobacz Wykonywanie wnioskowania rozproszonego przy użyciu funkcji zdefiniowanej przez użytkownika biblioteki pandas.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Składnia automatycznego modułu ładującego dla biblioteki DLT
Funkcja Delta Live Tables zapewnia nieco zmodyfikowaną składnię języka Python dla automatycznego modułu ładującego dodaje obsługę języka SQL dla modułu ładującego automatycznego.
W poniższych przykładach użyto narzędzia Auto Loader do tworzenia zestawów danych z plików CSV i JSON:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
Możesz użyć obsługiwanych opcji formatowania z modułem automatycznego ładowania. map()
Za pomocą funkcji można przekazać opcje do cloud_files()
metody . Opcje to pary klucz-wartość, w których klucze i wartości są ciągami. Poniżej opisano składnię pracy z modułem automatycznego ładowania w programie SQL:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
Poniższy przykład odczytuje dane z plików CSV rozdzielanych tabulatorami z nagłówkiem:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
Możesz użyć schema
polecenia , aby ręcznie określić format. Należy określić schema
format dla formatów, które nie obsługują wnioskowania schematu:
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Uwaga
Delta Live Tables automatycznie konfiguruje schemat i katalogi punktów kontrolnych oraz zarządza nimi podczas używania automatycznego modułu ładującego do odczytywania plików. Jeśli jednak ręcznie skonfigurujesz jeden z tych katalogów, wykonywanie pełnego odświeżania nie ma wpływu na zawartość skonfigurowanych katalogów. Usługa Databricks zaleca używanie automatycznie skonfigurowanych katalogów, aby uniknąć nieoczekiwanych skutków ubocznych podczas przetwarzania.