Běžné vzory načítání dat
Automatický zavaděč zjednodušuje řadu běžných úloh příjmu dat. Tato rychlá referenční příručka obsahuje příklady pro několik oblíbených vzorů.
Filtrování adresářů nebo souborů pomocí vzorů globu
Vzory globu lze použít pro filtrování adresářů a souborů, pokud jsou v cestě k dispozici.
Vzor | Popis |
---|---|
? |
Odpovídá jakémukoli jednomu znaku. |
* |
Odpovídá nule nebo více znaků |
[abc] |
Odpovídá jednomu znaku ze znakové sady {a,b,c}. |
[a-z] |
Odpovídá jednomu znaku z rozsahu znaků {a... z}. |
[^a] |
Odpovídá jednomu znaku, který není ze znakové sady nebo rozsahu {a}. Všimněte si, že ^ znak musí nastat okamžitě napravo od levé závorky. |
{ab,cd} |
Odpovídá řetězci ze sady řetězců {ab, cd}. |
{ab,c{de, fh}} |
Odpovídá řetězci ze sady řetězců {ab, cde, cfh}. |
path
Použijte k poskytování vzorů předpon, například:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Důležité
Musíte použít možnost pathGlobFilter
explicitního poskytování vzorů přípon. Jediný path
filtr předpony.
Pokud například chcete parsovat jenom png
soubory v adresáři, který obsahuje soubory s různými příponami, můžete:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Poznámka:
Výchozí chování automatického zavaděče se liší od výchozího chování jiných zdrojů souborů Spark. Přidejte .option("cloudFiles.useStrictGlobber", "true")
do čtení, abyste použili globbing, který odpovídá výchozímu chování Sparku vůči zdrojům souborů. Další informace o globbingu najdete v následující tabulce:
Vzor | Cesta k souboru | Výchozí globber | Striktní globber |
---|---|---|---|
/a/b | /a/b/c/file.txt | Ano | Ano |
/a/b | /a/b_dir/c/file.txt | Ne | Ne |
/a/b | /a/b.txt | Ne | Ne |
/a/b/ | /a/b.txt | Ne | Ne |
/a/*/c/ | /a/b/c/file.txt | Ano | Ano |
/a/*/c/ | /a/b/c/d/file.txt | Ano | Ano |
/a/*/c/ | /a/b/x/y/c/file.txt | Ano | Ne |
/a/*/c | /a/b/c_file.txt | Ano | Ne |
/a/*/c/ | /a/b/c_file.txt | Ano | Ne |
/a/*/c/ | /a/*/cookie/file.txt | Ano | Ne |
/a/b* | /a/b.txt | Ano | Ano |
/a/b* | /a/b/file.txt | Ano | Ano |
/a/{0.txt,1.txt} | /a/0.txt | Ano | Ano |
/a/*/{0.txt,1.txt} | /a/0.txt | Ne | Ne |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Ano | Ano |
Povolení snadného ETL
Jednoduchý způsob, jak dostat data do Delta Lake bez ztráty dat, je použít následující vzor a povolit odvozování schématu pomocí Auto Loaderu. Databricks doporučuje spustit následující kód v úloze Azure Databricks, aby automaticky restartoval datový proud, když se změní schéma zdrojových dat. Ve výchozím nastavení je schéma odvozeno jako typy řetězců, všechny chyby analýzy (pokud vše zůstane jako řetězec) budou směřovat do _rescued_data
a všechny nové sloupce způsobí selhání datového proudu a vývoj schématu.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Prevence ztráty dat ve dobře strukturovaných datech
Když znáte schéma, ale chcete vědět, kdykoli obdržíte neočekávaná data, databricks doporučuje používat rescuedDataColumn
.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Pokud chcete, aby stream přestal zpracovávat, pokud se zavádí nové pole, které neodpovídá vašemu schématu, můžete přidat:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Povolení flexibilních částečně strukturovaných datových kanálů
Když dostáváte data od dodavatele, který do informací, které poskytují, zavádí nové sloupce, nemusíte vědět přesně, kdy to dělají, nebo možná nebudete mít šířku pásma pro aktualizaci datového kanálu. Nyní můžete využít vývoj schématu k restartování streamu a nechat Auto Loader aktualizovat odvozené schéma. Můžete také využít schemaHints
některá pole bez schématu, která může dodavatel poskytnout.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Transformace vnořených dat JSON
Jelikož automatický zavaděč odvozuje sloupce JSON nejvyšší úrovně jako řetězce, mohou zůstat vnořené objekty JSON, které vyžadují další transformace. Pomocí částečně strukturovaných rozhraní API pro přístup k datům můžete dále transformovat složitý obsah JSON.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Odvození vnořených dat JSON
Pokud máte vnořená data, můžete použít možnost cloudFiles.inferColumnTypes
k odvození vnořené struktury dat a dalších typů sloupců.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Načtení souborů CSV bez hlaviček
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Vynucení schématu u souborů CSV s hlavičkami
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Ingestování obrázků nebo binárních dat do Delta Lake pro ML
Jakmile jsou data uložená v Delta Lake, můžete na datech spustit distribuované odvozování. Viz Provádění distribuovaných odvozování pomocí funkce UDF pandas.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Syntaxe automatického zavaděče pro DLT
Delta Live Tables poskytuje mírně upravenou syntaxi Pythonu pro Auto Loader a přidává podporu SQL pro Auto Loader.
Následující příklady používají automatický zavaděč k vytvoření datových sad ze souborů CSV a JSON:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
U automatického zavaděče můžete použít podporované možnosti formátování.
map()
Pomocí funkce můžete metodě předat možnostiread_files()
. Možnosti jsou páry klíč-hodnota, kde klíče a hodnoty jsou řetězce. Následující popis syntaxe pro práci s automatickým zavaděčem v SQL:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
Následující příklad načte data ze souborů CSV oddělených tabulátory s hlavičkou:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
K ručnímu zadání formátu můžete použít schema
; Je nutné zadat schema
pro formáty, které nepodporují odvození schématu:
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Poznámka:
Delta Live Tables automaticky konfiguruje a spravuje adresáře schématu a kontrolních bodů při použití Auto Loader ke čtení souborů. Pokud však ručně nakonfigurujete některý z těchto adresářů, provedení úplné aktualizace nemá vliv na obsah nakonfigurovaných adresářů. Databricks doporučuje používat automaticky nakonfigurované adresáře, aby se zabránilo neočekávaným vedlejším účinkům během zpracování.