Schemarückschluss und -entwicklung in Auto Loader konfigurieren
Sie können den Autoloader so konfigurieren, dass das Schema der geladenen Daten automatisch erkannt wird, was es Ihnen erlaubt, Tabellen zu initialisieren, ohne das Datenschema explizit zu deklarieren und das Tabellenschema zu entwickeln, wenn neue Spalten eingeführt werden. Dadurch wird die Notwendigkeit beseitigt, Schemaänderungen im Laufe der Zeit manuell nachverfolgen und anwenden zu müssen.
Autoloader kann auch unerwartete Daten (z. B. von unterschiedlichen Datentypen) in einer JSON-Blob-Spalte „retten“, auf die Sie später über die APIs für den semistrukturierten Datenzugriff zugreifen können.
Die folgenden Formate werden für Schemarückschlüssen und Evolution unterstützt:
Dateiformat | Unterstützte Versionen |
---|---|
JSON |
Alle Versionen |
CSV |
Alle Versionen |
XML |
Databricks Runtime 14.3 LTS und höher |
Avro |
Databricks Runtime 10.4 LTS und höher |
Parquet |
Databricks Runtime 11.3 LTS und höher |
ORC |
Nicht unterstützt |
Text |
Nicht anwendbar (festes Schema) |
Binaryfile |
Nicht anwendbar (festes Schema) |
Syntax für Schemainferenz und -entwicklung
Wenn Sie ein Zielverzeichnis für die Option cloudFiles.schemaLocation
angeben, ermöglicht dies Schemarückschlüsse und -entwicklung. Sie können dasselbe Verzeichnis verwenden, das Sie für checkpointLocation
angeben. Wenn Sie Delta Live Tables verwenden, verwaltet Azure Databricks den Schemaspeicherort und andere Prüfpunktinformationen automatisch.
Hinweis
Wenn Sie mehr als einen Quelldatenspeicherort haben, der in die Zieltabelle geladen wird, erfordert jede Autoloader-Erfassungsworkload einen separaten Streamingprüfpunkt.
Das folgende Beispiel verwendet parquet
für die cloudFiles.format
. Verwenden Sie csv
, avro
oder json
für andere Dateiquellen. Alle anderen Einstellungen zum Lesen und Schreiben bleiben für das Standardverhalten für jedes Format gleich.
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Funktionsweise des Autoloader-Schemarückschlusses
Um beim ersten Laden von Daten auf das Schema zurückzuschließen, zieht der Autoloader die ersten 50 GB oder 1000 Dateien heran, die er entdeckt, je nachdem, welche Grenze zuerst überschritten wird. Der Autoloader speichert die Schemainformationen in dem Verzeichnis _schemas
am konfigurierten cloudFiles.schemaLocation
, um Schemaänderungen an den Eingabedaten im Laufe der Zeit nachzuverfolgen.
Hinweis
Um die Größe der verwendeten Stichprobe zu ändern, können Sie folgende SQL-Konfigurationen festlegen:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(Bytezeichenfolge, z. B. 10gb
)
and
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(Integer)
Standardmäßig versucht der Autoloader-Schemarückschluss, Schemaentwicklungsprobleme aufgrund von Typkonflikten zu vermeiden. Für Formate, die Datentypen (JSON, CSV und XML) nicht codieren, schließt Autoloader alle Spalten als Zeichenfolgen rück (einschließlich geschachtelter Felder in JSON-Dateien). Bei Formaten mit typisiertem Schema (Parquet und Avro) nimmt Autoloader eine Teilmenge von Dateien als Stichprobe und führt die Schemas einzelner Dateien zusammen. Dieses Verhalten wird in der folgenden Tabelle zusammengefasst:
Dateiformat | Standardmäßig rückgeschlossener Datentyp |
---|---|
JSON |
String |
CSV |
String |
XML |
String |
Avro |
Im Avro-Schema codierte Typen |
Parquet |
Im Parquet-Schema codierte Typen |
Der Apache Spark DataFrameReader verwendet verschiedene Verhalten für Schemarückschlüsse, wobei er Datentypen für Spalten in JSON-, CSV- und XML-Quellen auf Grundlage von Beispieldaten auswählt. Um dieses Verhalten im Autoloader zu aktivieren, legen Sie die Option cloudFiles.inferColumnTypes
auf true
fest.
Hinweis
Beim Ableiten des Schemas für CSV-Daten geht der Autoloader davon aus, dass die Dateien Header enthalten. Wenn Ihre CSV-Dateien keine Header enthalten, geben Sie die Option .option("header", "false")
an. Darüber hinaus führt der Autoloader die Schemas aller Dateien im Beispiel zusammen, um ein globales Schema zu erstellen. Anschließend kann der Autoloader jede Datei entsprechend ihrem Header lesen und die CSV-Daten ordnungsgemäß analysieren.
Hinweis
Wenn eine Spalte unterschiedliche Datentypen in zwei Parquetdateien aufweist, wählt Auto Loader den breitesten Typ aus. Sie können schemaHints verwenden, um diese Auswahl außer Kraft zu setzen. Wenn Sie Schema-Hinweise angeben, wandelt Auto Loader die Spalte nicht in den angegebenen Typ um, sondern weist den Parquet-Reader an, die Spalte als den angegebenen Typ zu lesen. Im Falle eines Konflikts wird die Spalte in der geretteten Datenspaltegerettet.
Funktionsweise der Autoloader-Schemaentwicklung
Autoloader erkennt das Hinzufügen neuer Spalten, während er Ihre Daten verarbeitet. Wenn Autoloader eine neue Spalte erkennt, wird der Stream mit einer UnknownFieldException
beendet. Bevor Ihr Stream diesen Fehler auslöst, führt Autoloader einen Schemarückschluss für den letzten Mikrobatch von Daten durch und aktualisiert den Schemaspeicherort mit dem neuesten Schema, indem neue Spalten am Ende des Schemas zusammengeführt werden. Die Datentypen vorhandener Spalten bleiben unverändert.
Databricks empfiehlt, Autoloaderstreams mit Databricks-Aufträgen so zu konfigurieren, dass sie nach solchen Schemaänderungen automatisch neu gestartet werden.
Autoloader unterstützt die folgenden Modi für die Schemaentwicklung, die Sie in der Option cloudFiles.schemaEvolutionMode
einstellen:
Mode | Verhalten beim Lesen neuer Spalten |
---|---|
addNewColumns (Standardwert) |
Stream schlägt fehl. Dem Schema werden neue Spalten hinzugefügt. Vorhandene Spalten erhalten keine Datentypen. |
rescue |
Das Schema wird nie entwickelt, und der Stream schlägt aufgrund von Schemaänderungen nicht fehl. Alle neuen Spalten werden in der „rescued data“-Spalte aufgezeichnet. |
failOnNewColumns |
Stream schlägt fehl. Der Stream wird erst dann neu gestartet, wenn das angegebene Schema aktualisiert oder die problematische Datendatei entfernt wurde. |
none |
Das Schema wird nicht entwickelt, neue Spalten werden ignoriert, und die Daten werden nicht gerettet, es sei denn, die Option rescuedDataColumn ist festgelegt. Der Stream schlägt aufgrund von Schemaänderungen nicht fehl. |
Funktionsweise von Partitionen mit Autoloader
Autoloader versucht, Partitionsspalten aus der zugrunde liegenden Verzeichnisstruktur der Daten abzuleiten, wenn die Daten im Hive-Stil partitioniert sind. Zum Beispiel führt der Dateipfad base_path/event=click/date=2021-04-01/f0.json
dazu, dass date
und event
als Partitionsspalten rückgeschlossen werden. Wenn die zugrunde liegende Verzeichnisstruktur widersprüchliche Hive-Partitionen enthält oder keine Partitionierung im Hive-Stil aufweist, werden Partitionsspalten ignoriert.
Binärdateiformate (binaryFile
) und text
-Dateiformate weisen feste Datenschemata auf, unterstützen aber auch den Rückschluss auf Partitionsspalten. Databricks empfiehlt die Festlegung von cloudFiles.schemaLocation
für diese Dateiformate. Dadurch werden potenzielle Fehler oder Informationsverluste vermieden und der Rückschluss von Partitionsspalten bei jedem Start eines Autoloaders verhindert.
Partitionsspalten werden bei der Schemaentwicklung nicht berücksichtigt. Wenn eine anfängliche Verzeichnisstruktur wie base_path/event=click/date=2021-04-01/f0.json
vorlag und dann neue Dateien als base_path/event=click/date=2021-04-01/hour=01/f1.json
empfangen werden, ignoriert Autoloader die Spalte „Stunde“. Um Informationen für neue Partitionsspalten zu erfassen, legen Sie cloudFiles.partitionColumns
auf event,date,hour
fest.
Hinweis
Die Option cloudFiles.partitionColumns
akzeptiert eine durch Trennzeichen getrennten Liste von Spaltennamen. Nur Spalten, die als key=value
-Paare in Ihrer Verzeichnisstruktur vorhanden sind, werden analysiert.
Was ist die Spalte „rescued data“ (gerettete Daten)?
Wenn Autoloader das Schema ableitet, wird dem Schema automatisch eine Spalte für gerettete Daten als _rescued_data
hinzugefügt. Sie können die Spalte umbenennen oder sie in solchen Fällen einbeziehen, in denen Sie ein Schema durch Festlegen der Option rescuedDataColumn
bereitstellen.
Die „rescued data“-Spalte stellt sicher, dass Spalten, die nicht mit dem Schema übereinstimmen, gerettet werden, anstatt gelöscht zu werden. Die Spalte „rescued data“ enthält alle Daten, die aus folgenden Gründen nicht analysiert werden:
- Die Spalte fehlt im Schema.
- Typkonflikte.
- Groß-/Kleinschreibungskonflikte.
Die „rescued data“-Spalte enthält ein JSON-Blob, das die geretteten Spalten und den Quelldateipfad des Datensatzes enthält.
Hinweis
Die JSON- und CSV-Parser unterstützen drei Modi beim Analysieren von Datensätzen: PERMISSIVE
, DROPMALFORMED
und FAILFAST
. Bei Verwendung mit rescuedDataColumn
führen Datentypkonflikte nicht dazu, dass Datensätze im Modus DROPMALFORMED
gelöscht werden oder im Modus FAILFAST
einen Fehler auslösen. Nur beschädigte Datensätze werden verworfen oder Fehler ausgegeben, z. B. unvollständige oder fehlerhafte JSON- oder CSV-Dateien. Wenn Sie beim Analysieren von JSON- oder CSV-Daten badRecordsPath
verwenden, werden Datentypkonflikte bei Verwendung von rescuedDataColumn
nicht als fehlerhafte Datensätze betrachtet. Nur unvollständige und falsch formatierte JSON- oder CSV-Datensätze werden in badRecordsPath
gespeichert.
Ändern des Verhaltens zur Beachtung der Groß-/Kleinschreibung
Wenn die Beachtung der Groß- und Kleinschreibung nicht aktiviert ist, werden die Spalten abc
, Abc
und ABC
für den Schemarückschluss als dieselbe Spalte betrachtet. Der gewählte Fall ist willkürlich und hängt von den abgetasteten Daten ab. Mithilfe von Schemahinweisen können Sie die zu verwendende Schreibweise erzwingen. Nachdem eine Auswahl getroffen und das Schema abgeleitet wurde, berücksichtigt Auto Loader nicht die Hüllenvarianten, die nicht im Einklang mit dem Schema ausgewählt wurden.
Wenn Spalte für gerettete Daten aktiviert ist, werden Felder, die in einer anderen Schreibweise (bezogen auf Groß- und Kleinschreibung) benannt sind als das Schema, in die Spalte _rescued_data
geladen. Sie können dieses Verhalten ändern, indem Sie die Option readerCaseSensitive
auf „False“ festlegen. In diesem Fall werden vom Autoloader Daten gelesen, ohne dass zwischen Groß- und Kleinschreibung unterschieden wird.
Außerkraftsetzung des Schemarückschlusses mit Schemahinweisen
Sie können Schemahinweise verwenden, um die Schemainformationen zu erzwingen, die Sie kennen und für ein rückgeschlossenes Schema erwarten. Wenn Sie wissen, dass eine Spalte einen bestimmten Datentyp hat, oder wenn Sie einen noch allgemeineren Datentyp wählen möchten (z. B. double
statt integer
), können Sie eine beliebige Anzahl von Hinweisen für die Datentypen von Spalten als eine Zeichenfolge mithilfe der SQL-Schemaspezifikationssyntax wie folgt angeben:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
Eine Liste der unterstützten Datentypen finden Sie in der Dokumentation zu Datentypen.
Wenn eine Spalte am Anfang des Streams nicht vorhanden ist, können Sie auch Schemahinweise verwenden, um diese Spalte dem abgeleiteten Schema hinzuzufügen.
Hier ist ein Beispiel für ein abgeleitetes Schema, um das Verhalten mit Schemahinweisen zu veranschaulichen.
Abgeleitetes Schema:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
Durch Angeben der folgenden Schemahinweise:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
erhalten Sie:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Hinweis
Die Unterstützung von Array- und Zuordnungsschemahinweisen ist in Databricks Runtime 9.1 LTS und höher verfügbar.
Hier ist ein Beispiel für ein abgeleitetes Schema mit komplexen Datentypen, um das Verhalten mit Schemahinweisen zu veranschaulichen.
Abgeleitetes Schema:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
Durch Angeben der folgenden Schemahinweise:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
erhalten Sie:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Hinweis
Schemahinweise werden nur verwendet, wenn Sie dem Autoloader kein Schema bereitstellen. Sie können Schemahinweise verwenden, unabhängig davon, ob cloudFiles.inferColumnTypes
aktiviert oder deaktiviert ist.