Konfigurera schemainferens och utveckling i Auto Loader
Du kan konfigurera automatisk inläsning för att automatiskt identifiera schemat för inlästa data, så att du kan initiera tabeller utan att uttryckligen deklarera dataschemat och utveckla tabellschemat när nya kolumner introduceras. Detta eliminerar behovet av att manuellt spåra och tillämpa schemaändringar över tid.
Automatisk inläsare kan också "rädda" data som var oväntade (till exempel av olika datatyper) i en JSON-blobkolumn, som du kan välja att komma åt senare med hjälp av api:erna för halvstrukturerad dataåtkomst.
Följande format stöds för schemainferens och utveckling:
File format | Versioner som stöds |
---|---|
JSON |
Alla versioner |
CSV |
Alla versioner |
XML |
Databricks Runtime 14.3 LTS och senare |
Avro |
Databricks Runtime 10.4 LTS och senare |
Parquet |
Databricks Runtime 11.3 LTS och senare |
ORC |
Stöd saknas |
Text |
Ej tillämpligt (fast schema) |
Binaryfile |
Ej tillämpligt (fast schema) |
Syntax för schemainferens och utveckling
Om du anger en målkatalog för alternativet cloudFiles.schemaLocation
aktiveras schemainferens och utveckling. Du kan välja att använda samma katalog som du anger för checkpointLocation
. Om du använder Delta Live Tables hanterar Azure Databricks schemaplats och annan kontrollpunktsinformation automatiskt.
Kommentar
Om du har mer än en källdataplats som läses in i måltabellen kräver varje arbetsbelastning för automatisk inläsning en separat kontrollpunkt för direktuppspelning.
I följande exempel används parquet
för cloudFiles.format
. Använd csv
, avro
eller json
för andra filkällor. Alla andra inställningar för läsning och skrivning förblir desamma för standardbeteenden för varje format.
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Hur fungerar schemainferens för automatisk inläsning?
För att härleda schemat vid första läsningen av data tar Auto Loader exempel på de första 50 GB eller 1 000 filer som identifieras, beroende på vilken gräns som överskrids först. Automatisk inläsning lagrar schemainformationen i en katalog _schemas
på den konfigurerade cloudFiles.schemaLocation
för att spåra schemaändringar i indata över tid.
Kommentar
Om du vill ändra storleken på det exempel som används kan du ange SQL-konfigurationerna:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(bytesträng, till exempel 10gb
)
och
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(heltal)
Som standard försöker schemainferensen för automatisk inläsning undvika problem med schemautveckling på grund av typmatchningar. För format som inte kodar datatyper (JSON, CSV och XML) härleder Auto Loader alla kolumner som strängar (inklusive kapslade fält i JSON-filer). För format med skrivet schema (Parquet och Avro) tar Auto Loader exempel på en delmängd av filer och sammanfogar scheman för enskilda filer. Det här beteendet sammanfattas i följande tabell:
File format | Standard härledd datatyp |
---|---|
JSON |
String |
CSV |
String |
XML |
String |
Avro |
Typer som kodas i Avro-schema |
Parquet |
Typer som kodas i Parquet-schema |
Apache Spark DataFrameReader använder olika beteende för schemainferens och väljer datatyper för kolumner i JSON-, CSV- och XML-källor baserat på exempeldata. Om du vill aktivera det här beteendet med Auto Loader anger du alternativet cloudFiles.inferColumnTypes
till true
.
Kommentar
När schemat för CSV-data härleds förutsätter Auto Loader att filerna innehåller rubriker. Om dina CSV-filer inte innehåller rubriker anger du alternativet .option("header", "false")
. Dessutom sammanfogar Auto Loader scheman för alla filer i exemplet för att komma fram till ett globalt schema. Automatisk inläsare kan sedan läsa varje fil enligt rubriken och parsa CSV:en korrekt.
Kommentar
När en kolumn har olika datatyper i två Parquet-filer väljer Auto Loader den bredaste typen. Du kan använda schemaHints för att åsidosätta det här valet. När du anger schematips omvandlar autoinläsaren inte kolumnen till den angivna typen, utan uppmanar i stället Parquet-läsaren att läsa kolumnen som den angivna typen. Vid matchningsfel räddas kolumnen i den räddade datakolumnen.
Hur fungerar schemautvecklingen för automatisk inläsning?
Automatisk inläsning identifierar tillägg av nya kolumner när dina data bearbetas. När Auto Loader identifierar en ny kolumn stoppas strömmen med en UnknownFieldException
. Innan strömmen genererar det här felet utför Auto Loader schemainferens på den senaste mikrobatchen med data och uppdaterar schemaplatsen med det senaste schemat genom att slå samman nya kolumner till slutet av schemat. Datatyperna för befintliga kolumner förblir oförändrade.
Databricks rekommenderar att du konfigurerar automatiska inläsningsströmmar med Databricks-jobb så att de startas om automatiskt efter sådana schemaändringar.
Auto Loader stöder följande lägen för schemautveckling, som du anger i alternativet cloudFiles.schemaEvolutionMode
:
Läge | Beteende vid läsning av ny kolumn |
---|---|
addNewColumns (standard) |
Strömmen misslyckas. Nya kolumner läggs till i schemat. Befintliga kolumner utvecklar inte datatyper. |
rescue |
Schemat har aldrig utvecklats och strömmen misslyckas inte på grund av schemaändringar. Alla nya kolumner registreras i den räddade datakolumnen. |
failOnNewColumns |
Strömmen misslyckas. Stream startar inte om om inte det angivna schemat uppdateras eller om den felaktiga datafilen tas bort. |
none |
Utvecklar inte schemat, nya kolumner ignoreras och data räddas inte om inte rescuedDataColumn alternativet har angetts. Stream misslyckas inte på grund av schemaändringar. |
Kommentar
addNewColumns
läge är standard när ett schema inte anges, men none
är standard när du anger ett schema.
addNewColumns
tillåts inte när ett schema för dataströmmen tillhandahålls, men fungerar om du anger ditt schema som ett schema-hint .
Hur fungerar partitioner med Auto Loader?
Auto loader försöker härleda partitionskolumner från den underliggande katalogstrukturen för data om data anges i Hive-formatpartitionering. Till exempel resulterar filsökvägen base_path/event=click/date=2021-04-01/f0.json
i slutsatsdragningen av date
och event
som partitionskolumner. Om den underliggande katalogstrukturen innehåller hive-partitioner i konflikt eller inte innehåller Hive-formatpartitionering ignoreras partitionskolumner.
Binära filer (binaryFile
) och text
filformat har fasta datascheman, men stöder inferens för partitionskolumner. Databricks rekommenderar inställning cloudFiles.schemaLocation
för dessa filformat. Detta undviker eventuella fel eller informationsförluster och förhindrar slutsatsdragning av partitionskolumner varje gång en automatisk inläsare börjar.
Partitionskolumner beaktas inte för schemautveckling. Om du hade en inledande katalogstruktur som base_path/event=click/date=2021-04-01/f0.json
, och sedan börjar ta emot nya filer som base_path/event=click/date=2021-04-01/hour=01/f1.json
, ignorerar Auto Loader timkolumnen. Om du vill samla in information om nya partitionskolumner anger du cloudFiles.partitionColumns
till event,date,hour
.
Kommentar
Alternativet cloudFiles.partitionColumns
tar en kommaavgränsad lista med kolumnnamn. Endast kolumner som finns som key=value
par i katalogstrukturen parsas.
Vad är den räddade datakolumnen?
När Auto Loader härleder schemat läggs automatiskt en räddad datakolumn till i schemat som _rescued_data
. Du kan byta namn på kolumnen eller inkludera den i fall där du anger ett schema genom att ange alternativet rescuedDataColumn
.
Den räddade datakolumnen säkerställer att kolumner som inte matchar schemat räddas i stället för att tas bort. Den räddade datakolumnen innehåller alla data som inte parsas av följande skäl:
- Kolumnen saknas i schemat.
- Typmatchningsfel.
- Skiftlägesfel.
Den räddade datakolumnen innehåller en JSON som innehåller de räddade kolumnerna och källfilens sökväg för posten.
Kommentar
JSON- och CSV-parsarna stöder tre lägen vid parsning av poster: PERMISSIVE
, DROPMALFORMED
och FAILFAST
. När de används tillsammans med rescuedDataColumn
, orsakar inte datatypsmatchningar att poster tas bort i DROPMALFORMED
läge eller utlöser ett fel i FAILFAST
läge. Endast skadade poster tas bort eller utlöser fel, till exempel ofullständig eller felaktigt JSON eller CSV. Om du använder badRecordsPath
när du parsar JSON eller CSV betraktas inte datatypsmatchningar som felaktiga poster när du använder rescuedDataColumn
. Endast ofullständiga och felaktiga JSON- eller CSV-poster lagras i badRecordsPath
.
Ändra skiftlägeskänsligt beteende
Om inte skiftlägeskänslighet är aktiverat betraktas kolumnerna abc
, Abc
och och ABC
som samma kolumn för schemainferens. Det valda fallet är godtyckligt och beror på exempeldata. Du kan använda schematips för att framtvinga vilket fall som ska användas. När ett val har gjorts och schemat har härledts tar Auto Loader inte hänsyn till de höljevarianter som inte har valts i enlighet med schemat.
När den räddade datakolumnen är aktiverad läses fält med namnet i ett annat fall än schemat in i _rescued_data
kolumnen. Ändra det här beteendet genom att ange alternativet readerCaseSensitive
till false, i vilket fall Auto Loader läser data på ett skiftlägesokänsligt sätt.
Åsidosätta schemainferens med schematips
Du kan använda schematips för att framtvinga schemainformationen som du känner till och förväntar dig på ett härledt schema. När du vet att en kolumn är av en specifik datatyp, eller om du vill välja en mer allmän datatyp (till exempel en double
i stället för en integer
), kan du ange ett godtyckligt antal tips för kolumndatatyper som en sträng med sql-schemaspecifikationssyntax, till exempel följande:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
Se dokumentationen om datatyper för listan över datatyper som stöds.
Om en kolumn inte finns i början av strömmen kan du också använda schematips för att lägga till den kolumnen i det här schemat.
Här är ett exempel på ett härledt schema för att se beteendet med schematips.
Härledt schema:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
Genom att ange följande schematips:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
du får:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Kommentar
Stöd för matris- och kartschematips finns i Databricks Runtime 9.1 LTS och senare.
Här är ett exempel på ett härledt schema med komplexa datatyper för att se beteendet med schematips.
Härledt schema:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
Genom att ange följande schematips:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
du får:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Kommentar
Schematips används endast om du inte anger något schema för automatisk inläsning. Du kan använda schematips om cloudFiles.inferColumnTypes
det är aktiverat eller inaktiverat.