Vanliga datainläsningsmönster
Automatisk inläsning förenklar ett antal vanliga datainmatningsuppgifter. Den här snabbreferensen innehåller exempel på flera populära mönster.
Filtrera kataloger eller filer med hjälp av globmönster
Globmönster kan användas för att filtrera kataloger och filer när de anges i sökvägen.
Mönster | beskrivning |
---|---|
? |
Matchar ett enskilt tecken |
* |
Matchar noll eller fler tecken |
[abc] |
Matchar ett enskilt tecken från teckenuppsättningen {a,b,c}. |
[a-z] |
Matchar ett enskilt tecken från teckenområdet {a... z}. |
[^a] |
Matchar ett enskilt tecken som inte kommer från teckenuppsättningen eller intervallet {a}. Observera att ^ tecknet måste ske omedelbart till höger om den inledande hakparentesen. |
{ab,cd} |
Matchar en sträng från stränguppsättningen {ab, cd}. |
{ab,c{de, fh}} |
Matchar en sträng från stränguppsättningen {ab, cde, cfh}. |
path
Använd för att tillhandahålla prefixmönster, till exempel:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Viktigt!
Du måste använda alternativet pathGlobFilter
för att uttryckligen tillhandahålla suffixmönster. Det path
enda ger ett prefixfilter.
Om du till exempel bara png
vill parsa filer i en katalog som innehåller filer med olika suffix kan du göra följande:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Kommentar
Standardbeteendet för den automatiska inläsaren skiljer sig från standardbeteendet för andra Spark-filkällor. Lägg till .option("cloudFiles.useStrictGlobber", "true")
i läsningen om du vill använda globbning som matchar standardbeteendet för Spark mot filkällor. Mer information om globbning finns i följande tabell:
Mönster | Filsökväg | Standardglobber | Strikt klotber |
---|---|---|---|
/a/b | /a/b/c/file.txt | Ja | Ja |
/a/b | /a/b_dir/c/file.txt | Nej | Nej |
/a/b | /a/b.txt | Nej | Nej |
/a/b/ | /a/b.txt | Nej | Nej |
/a/*/c/ | /a/b/c/file.txt | Ja | Ja |
/a/*/c/ | /a/b/c/d/file.txt | Ja | Ja |
/a/*/c/ | /a/b/x/y/c/file.txt | Ja | Nej |
/a/*/c | /a/b/c_file.txt | Ja | Nej |
/a/*/c/ | /a/b/c_file.txt | Ja | Nej |
/a/*/c/ | /a/*/cookie/file.txt | Ja | Nej |
/a/b* | /a/b.txt | Ja | Ja |
/a/b* | /a/b/file.txt | Ja | Ja |
/a/{0.txt,1.txt} | /a/0.txt | Ja | Ja |
/a/*/{0.txt,1.txt} | /a/0.txt | Nej | Nej |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Ja | Ja |
Aktivera enkel ETL
Ett enkelt sätt att hämta dina data till Delta Lake utan att förlora några data är att använda följande mönster och aktivera schemainferens med Auto Loader. Databricks rekommenderar att du kör följande kod i ett Azure Databricks-jobb så att den automatiskt startar om dataströmmen när schemat för dina källdata ändras. Som standard härleds schemat som strängtyper, eventuella parsningsfel (det bör inte finnas något om allt förblir som en sträng) kommer att gå till _rescued_data
, och alla nya kolumner kommer att misslyckas strömmen och utveckla schemat.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Förhindra dataförlust i välstrukturerade data
När du känner till ditt schema, men vill veta när du får oväntade data, rekommenderar Databricks att du använder rescuedDataColumn
.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Om du vill att dataströmmen ska sluta bearbetas om ett nytt fält introduceras som inte matchar schemat kan du lägga till:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Aktivera flexibla halvstrukturerade datapipelines
När du tar emot data från en leverantör som introducerar nya kolumner i den information de tillhandahåller kanske du inte känner till exakt när de gör det, eller så kanske du inte har bandbredden för att uppdatera din datapipeline. Nu kan du använda schemautveckling för att starta om strömmen och låta Auto Loader uppdatera det härledda schemat automatiskt. Du kan också använda schemaHints
för några av de "schemalösa" fält som leverantören kan tillhandahålla.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Transformera kapslade JSON-data
Eftersom Auto Loader härleder JSON-kolumnerna på den översta nivån som strängar kan du lämnas med kapslade JSON-objekt som kräver ytterligare omvandlingar. Du kan använda api:er för halvstrukturerad dataåtkomst för att ytterligare transformera komplext JSON-innehåll.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Härled kapslade JSON-data
När du har kapslade data kan du använda cloudFiles.inferColumnTypes
alternativet för att härleda den kapslade strukturen för dina data och andra kolumntyper.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Läs in CSV-filer utan rubriker
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Framtvinga ett schema på CSV-filer med rubriker
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Mata in bild- eller binärdata till Delta Lake för ML
När data har lagrats i Delta Lake kan du köra distribuerad slutsatsdragning på data. Se Utföra distribuerad slutsatsdragning med pandas UDF.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Syntax för automatisk inläsning för DLT
Delta Live Tables innehåller något ändrad Python-syntax för automatisk inläsning lägger till SQL-stöd för automatisk inläsning.
I följande exempel används Auto Loader för att skapa datauppsättningar från CSV- och JSON-filer:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
Du kan använda formatalternativ som stöds med Auto Loader. Med hjälp av map()
funktionen kan du skicka alternativ till read_files()
metoden. Alternativen är nyckel/värde-par, där nycklar och värden är strängar. Följande beskriver syntaxen för att arbeta med automatisk inläsning i SQL:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
I följande exempel läss data från flikavgränsade CSV-filer med en rubrik:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
Du kan använda schema
för att ange formatet manuellt. Du måste ange schema
för format som inte stöder schemainferens:
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Kommentar
Delta Live Tables konfigurerar och hanterar automatiskt schema- och kontrollpunktskatalogerna när du använder Auto Loader för att läsa filer. Men om du konfigurerar någon av dessa kataloger manuellt påverkas inte innehållet i de konfigurerade katalogerna om du utför en fullständig uppdatering. Databricks rekommenderar att du använder de automatiskt konfigurerade katalogerna för att undvika oväntade biverkningar under bearbetningen.