Allgemeine Muster zum Laden von Daten
Autoloader vereinfacht eine Reihe allgemeiner Aufgaben für die Datenerfassung. Diese Kurzübersicht enthält Beispiele für mehrere beliebte Muster.
Filtern von Verzeichnissen oder Dateien mit Globmustern
Globmuster können zum Filtern von Verzeichnissen und Dateien verwendet werden, wenn sie im Pfad angegeben werden.
Muster | BESCHREIBUNG |
---|---|
? |
Führt einen Abgleich für ein einzelnes Zeichen durch |
* |
Entspricht null oder mehr Zeichen |
[abc] |
Entspricht einem einzelnen Zeichen aus dem Zeichensatz {a,b,c}. |
[a-z] |
Entspricht einem einzelnen Zeichen aus dem Zeichenbereich {a... z}. |
[^a] |
Entspricht einem einzelnen Zeichen, das nicht aus dem Zeichensatz oder Bereich {a} stammt. Beachten Sie, dass das Zeichen ^ sofort rechts neben der öffnenden Klammer auftreten muss. |
{ab,cd} |
Entspricht einer Zeichenfolge aus dem Zeichenfolgensatz {ab, cd}. |
{ab,c{de, fh}} |
Entspricht einer Zeichenfolge aus dem Zeichenfolgensatz {ab, cde, cfh}. |
Verwenden Sie zum Bereitstellen von Präfixmustern path
, z. B.:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Wichtig
Sie müssen die Option pathGlobFilter
zum expliziten Bereitstellen von Suffixmustern verwenden. path
stellt nur einen Präfixfilter bereit.
Wenn Sie beispielsweise nur png
-Dateien in einem Verzeichnis analysieren möchten, das Dateien mit unterschiedlichen Suffixen enthält, haben Sie folgende Möglichkeiten:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Hinweis
Das Standard-Globbingverhalten des Autoloaders unterscheidet sich vom Standardverhalten anderer Spark-Dateiquellen. Fügen Sie Ihrem Lesevorgang .option("cloudFiles.useStrictGlobber", "true")
hinzu, um für Dateiquellen Globbing zu verwenden, das dem Spark-Standardverhalten entspricht. Weitere Informationen zu Globbing finden Sie in der folgenden Tabelle:
Muster | Dateipfad | Standard-Globber | Strenger Globber |
---|---|---|---|
/a/b | /a/b/c/file.txt | Ja | Ja |
/a/b | /a/b_dir/c/file.txt | Nein | Nein |
/a/b | /a/b.txt | Nein | Nein |
/a/b/ | /a/b.txt | Nein | Nein |
/a/*/c/ | /a/b/c/file.txt | Ja | Ja |
/a/*/c/ | /a/b/c/d/file.txt | Ja | Ja |
/a/*/c/ | /a/b/x/y/c/file.txt | Ja | Nein |
/a/*/c | /a/b/c_file.txt | Ja | Nein |
/a/*/c/ | /a/b/c_file.txt | Ja | Nein |
/a/*/c/ | /a/*/cookie/file.txt | Ja | Nein |
/a/b* | /a/b.txt | Ja | Ja |
/a/b* | /a/b/file.txt | Ja | Ja |
/a/{0.txt,1.txt} | /a/0.txt | Ja | Ja |
/a/*/{0.txt,1.txt} | /a/0.txt | Nein | Nein |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Ja | Ja |
Aktivieren der einfachen ETL
Eine einfache Möglichkeit, Ihre Daten ohne Datenverlust in Delta Lake zu übertragen, ist die Verwendung des folgenden Musters und die Aktivierung des Schemarückschlusses mit Autoloader. Databricks empfiehlt, den folgenden Code in einem Azure Databricks-Auftrag auszuführen, damit Ihr Stream automatisch neu gestartet wird, wenn sich das Schema Ihrer Quelldaten ändert. Standardmäßig wird das Schema als Zeichenfolgentyp abgeleitet, alle Analysefehler (es sollte keine geben, wenn alles als Zeichenfolge vorliegt) werden an _rescued_data
weitergeleitet, und alle neuen Spalten lassen den Stream fehlschlagen und entwickeln das Schema weiter.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Verhindern von Datenverlust in gut strukturierten Daten
Wenn Sie Ihr Schema kennen, aber wissen möchten, wann Sie unerwartete Daten erhalten, empfiehlt Databricks die Verwendung von rescuedDataColumn
.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Falls Ihr Stream die Verarbeitung beenden soll, wenn ein neues Feld eingeführt wird, das nicht mit Ihrem Schema übereinstimmt, können Sie Folgendes hinzufügen:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Aktivieren flexibler semistrukturierter Datenpipelines
Wenn Sie Daten von einem Anbieter erhalten, der neue Spalten in die von ihm bereitgestellten Informationen einführt, wissen Sie möglicherweise nicht genau, wann er dies tut, oder Sie haben nicht die Bandbreite, um Ihre Datenpipeline zu aktualisieren. Sie können nun die Schemaentwicklung nutzen, um den Stream neu zu starten und das abgeleitete Schema automatisch von Autoloader aktualisieren zu lassen. Sie können schemaHints
auch für einige der „schemalosen“ Felder nutzen, die der Anbieter möglicherweise bereitstellt.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Hinzufügen von geschachtelten JSON-Daten
Da Auto Loader die JSON-Spalten der obersten Ebene als Zeichenfolgen ableitet, können Sie mit verschachtelten JSON-Objekten zurückbleiben, die weitere Transformationen erfordern. Sie können die APIs für den semistrukturierten Datenzugriff nutzen, um komplexe JSON-Inhalte weiter zu transformieren.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Hinzufügen von geschachtelten JSON-Daten
Wenn Sie geschachtelte Daten haben, können Sie die cloudFiles.inferColumnTypes
-Option verwenden, um die geschachtelte Struktur Ihrer Daten und anderer Spaltentypen abzuleiten.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Laden von CSV-Dateien ohne Kopfzeilen
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Erzwingen eines Schemas für CSV-Dateien mit Headern
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Nehmen Sie Bild- oder Binärdaten in Delta Lake für ML auf
Sobald die Daten in Delta Lake gespeichert sind, können Sie verteilte Rückschlüsse auf die Daten ausführen. Siehe Verteilte Inferenz mit Pandas-UDF durchführen.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Autoloader-Syntax für DLT
Delta Live Tables bietet eine leicht geänderte Python-Syntax für Auto Loader und fügt SQL-Unterstützung für Auto Loader hinzu.
In den folgenden Beispielen wird der Autoloader verwendet, um Datasets aus CSV- und JSON-Dateien zu erstellen:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
Sie können unterstützte Formatoptionen mit dem Autoloader verwenden. Mithilfe der map()
-Funktion können Sie Optionen an die cloud_files()
-Methode übergeben. Die Optionen sind Schlüssel-Wert-Paare, bei denen die Schlüssel und Werte Zeichenfolgen sind. Im Folgenden wird die Syntax für die Arbeit mit dem Autoloader in SQL beschrieben:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
Im folgenden Beispiel werden Daten aus CSV-Dateien mit Trennzeichen und einem Header gelesen:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
Sie können das schema
verwenden, um das Format manuell anzugeben. Sie müssen das schema
für Formate angeben, die keine Schemarückschlüsse unterstützen:
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Hinweis
Delta Live Tables konfiguriert und verwaltet die Schema- und Prüfpunktverzeichnisse automatisch, wenn der Autoloader zum Lesen von Dateien verwendet wird. Wenn Sie jedoch eines dieser Verzeichnisse manuell konfigurieren, wirkt sich eine vollständige Aktualisierung nicht auf den Inhalt der konfigurierten Verzeichnisse aus. Databricks empfiehlt die Verwendung der automatisch konfigurierten Verzeichnisse, um unerwartete Nebenwirkungen bei der Verarbeitung zu vermeiden.