Konfigurace odvození schématu a vývoje v automatickém zavaděči
Automatický zavaděč můžete nakonfigurovat tak, aby automaticky rozpoznal schéma načtených dat, což umožňuje inicializovat tabulky bez explicitního deklarování schématu dat a vyvíjet schéma tabulky při zavádění nových sloupců. To eliminuje potřebu ručního sledování a použití změn schématu v průběhu času.
Auto Loader může také "zachránit" data, která byla neočekávaná (například u různých datových typů) ve sloupci JSON blob, ke kterým můžete později přistupovat pomocí polostrukturovaných rozhraní API pro přístup k datům.
Pro odvozování a vývoj schématu se podporují následující formáty:
formát souboru | Podporované verze |
---|---|
JSON |
Všechny verze |
CSV |
Všechny verze |
XML |
Databricks Runtime 14.3 LTS a vyšší |
Avro |
Databricks Runtime 10.4 LTS a vyšší |
Parquet |
Databricks Runtime 11.3 LTS a vyšší |
ORC |
Nepodporované |
Text |
Nepoužitelné (pevné schéma) |
Binaryfile |
Nepoužitelné (pevné schéma) |
Syntaxe pro odvozování a vývoj schématu
Určení cílového adresáře pro možnost cloudFiles.schemaLocation
umožňuje odvozování a vývoj schématu. Můžete použít stejný adresář, který zadáte pro checkpointLocation
. Pokud používáte DLT, Azure Databricks spravuje umístění schématu a další informace o kontrolních bodech automaticky.
Poznámka:
Pokud do cílové tabulky načítáte více než jedno zdrojové umístění dat, každá úloha zpracování Auto Loader vyžaduje samostatný kontrolní bod streamování.
Následující příklad používá parquet
pro cloudFiles.format
. Použijte csv
, avro
nebo json
pro jiné zdroje souborů. Všechna ostatní nastavení pro čtení a zápis zůstávají stejná pro výchozí chování jednotlivých formátů.
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Jak funguje automatické odvozování schématu u Auto Loaderu?
Pro odvození schématu při prvním čtení dat auto loader vzorkuje prvních 50 GB nebo 1000 souborů, které zjistí, podle toho, který limit je překročen jako první. Auto Loader ukládá informace o schématu v adresáři _schemas
nakonfigurované cloudFiles.schemaLocation
tak, aby sledoval změny schématu vstupních dat v průběhu času.
Poznámka:
Pokud chcete změnit velikost použité ukázky, můžete nastavit konfigurace SQL:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(bajtový řetězec, například 10gb
)
a
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(celé číslo)
Ve výchozím nastavení se při odvozování schématu automatického zavaděče snaží vyhnout problémům s vývojem schématu kvůli neshodám typů. U formátů, které nekódují datové typy (JSON, CSV a XML), auto loader odvodí všechny sloupce jako řetězce (včetně vnořených polí v souborech JSON). U formátů se zadaným schématem (Parquet a Avro) auto loader vzorkuje podmnožinu souborů a slučuje schémata jednotlivých souborů. Toto chování je shrnuto v následující tabulce:
Formát souboru | Výchozí odvozený datový typ |
---|---|
JSON |
řetězec |
CSV |
řetězec |
XML |
řetězec |
Avro |
Typy kódované ve schématu Avro |
Parquet |
Typy kódované ve schématu Parquet |
Apache Spark DataFrameReader používá pro odvozování schématu různé chování, výběr datových typů pro sloupce ve zdrojích JSON, CSV a XML na základě ukázkových dat. Chcete-li toto chování povolit pomocí Auto Loaderu, nastavte možnost cloudFiles.inferColumnTypes
na true
.
Poznámka:
Při odvozování schématu pro data ve formátu CSV Auto Loader předpokládá, že soubory obsahují záhlaví. Pokud soubory CSV neobsahují záhlaví, zadejte možnost .option("header", "false")
. Kromě toho Auto Loader slučuje schémata všech vzorových souborů, aby vzniklo globální schéma. Auto Loader pak může číst každý soubor podle záhlaví a správně parsovat CSV.
Poznámka:
Pokud sloupec obsahuje různé datové typy ve dvou souborech Parquet, automatický zavaděč vybere nejširší typ. Tuto volbu můžete přepsat pomocí funkce schemaHints . Když zadáte rady schématu, Auto Loader nepřetypuje sloupec na zadaný typ, ale řekne čtečce Parquet, aby sloupec četla jako zadaný typ. V případě neshody se sloupec zachrání ve zachráněném datovém sloupci.
Jak funguje vývoj schématu Auto Loaderu?
Auto Loader zjistí přidání nových sloupců při zpracování dat. Když Auto Loader zjistí nový sloupec, datový proud se zastaví s UnknownFieldException
. Než váš datový proud vyvolá tuto chybu, Auto Loader provede odvozování schématu na nejnovější mikrodávce dat a aktualizuje lokaci schématu pomocí nejnovějšího schématu sloučením nových sloupců na konec schématu. Datové typy existujících sloupců zůstávají beze změny.
Databricks doporučuje nakonfigurovat Auto Loader streamy pomocí Databricks Jobs, tak aby se po takových změnách schématu automaticky restartovaly.
Auto Loader podporuje pro vývoj schématu následující režimy, které jste nastavili v možnosti cloudFiles.schemaEvolutionMode
:
Poznámka:
addNewColumns
režim je výchozí, pokud schéma není zadané, ale none
je výchozí při zadání schématu.
addNewColumns
není povoleno, pokud je poskytnuto schéma pro stream, ale funguje, pokud zadáte své schéma jako nápovědu schématu .
Jak funguje rozdělení v Auto Loaderu?
Automatický zavaděč se pokusí odvodit sloupce oddílů ze základní adresářové struktury dat, pokud jsou data rozložená v dělení stylu Hive. Například cesta k souboru base_path/event=click/date=2021-04-01/f0.json
vede k tomu, že jsou date
a event
odvozeny jako sloupce oddílů. Pokud základní adresářová struktura obsahuje konfliktní oddíly Hive nebo neobsahuje dělení stylu Hive, sloupce oddílů se ignorují.
Binární soubor (binaryFile
) a text
formáty souborů mají pevná schémata dat, ale podporují odvozování sloupců oddílů. Databricks doporučuje nastavit cloudFiles.schemaLocation
pro tyto formáty souborů. Tím se zabrání případným chybám nebo ztrátě informací a zabrání nutnosti odvozovat sloupce oddílů při každém spuštění Auto Loaderu.
Sloupce pro rozdělení nejsou zohledněny při evoluci schématu. Pokud jste měli počáteční adresářovou strukturu jako base_path/event=click/date=2021-04-01/f0.json
, a pak začněte přijímat nové soubory jako base_path/event=click/date=2021-04-01/hour=01/f1.json
, Automatický Zavaděč ignoruje sloupec hodin. Pokud chcete zaznamenat informace pro nové sloupce oddílů, nastavte cloudFiles.partitionColumns
na event,date,hour
.
Poznámka:
Tato možnost cloudFiles.partitionColumns
přebírá čárkami oddělený seznam názvů sloupců. Parsují se jenom sloupce, které existují jako key=value
páry ve vaší adresářové struktuře.
Co je sloupec obnovených dat?
Když Auto Loader odvodí schéma, do vašeho schématu se automaticky přidá záchranný datový sloupec jako _rescued_data
. Sloupec můžete přejmenovat nebo zahrnout v případech, kdy zadáte schéma nastavením možnosti rescuedDataColumn
.
Sloupec zachráněných dat zajišťuje, aby se sloupce, které se neshodují se schématem, zachrání místo vyřazení. Sloupec zachráněných dat obsahuje všechna data, která nejsou analyzována z následujících důvodů:
- Ve schématu chybí sloupec.
- Neshody typů
- Neshody velkých a malých písmen.
Sloupec zachráněných dat obsahuje JSON obsahující záchranné sloupce a cestu ke zdrojovému souboru záznamu.
Poznámka:
Analyzátory JSON a CSV podporují při analýze záznamů tři režimy: PERMISSIVE
, DROPMALFORMED
a FAILFAST
. Při použití společně s datovým rescuedDataColumn
typem neshody nezpůsobí vyřazení záznamů v DROPMALFORMED
režimu nebo vyvolání chyby v FAILFAST
režimu. Only poškozené záznamy jsou odstraněny nebo způsobí chyby, jako například neúplné nebo poškozené soubory JSON nebo CSV. Pokud použijete badRecordsPath
při analýze FORMÁTU JSON nebo CSV, neshody datových typů se při použití objektu rescuedDataColumn
. nepovažují za chybné záznamy . Pouze neúplné a malformované záznamy JSON nebo CSV jsou uloženy v badRecordsPath
.
Změna chování rozlišujícího malá a velká písmena
Pokud není povolena citlivost písmen, sloupce abc
, Abc
a ABC
jsou považovány za stejný sloupec pro účely odvození schématu. Zvolený případ je libovolný a závisí na vzorkovaných datech. Pomocí nápověd schématu můžete určit, který případe by měl být použit. Po provedení výběru a odvození schématu automatický zavaděč nebere v úvahu varianty velikostí písmen, které nebyly vybrány v souladu se schématem.
Pokud je povolený záchranný datový sloupec, načtou se pole pojmenovaná jinak než podle schématu do sloupce _rescued_data
. Toto chování změňte nastavením možnosti readerCaseSensitive
na false, v takovém případě Auto Loader čte data bez rozlišování velkých a malých písmen.
Přepsání odvození schématu pomocí tipů schématu
Pomocí nápovědu ke schématu můžete vynutit informace o schématu, které znáte a očekáváte u odvozeného schématu. Pokud víte, že sloupec je konkrétní datový typ, nebo pokud chcete zvolit obecnější datový typ (například double
místo integer
), můžete zadat libovolný počet tipů pro datové typy sloupců jako řetězec pomocí syntaxe specifikace schématu SQL, například:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
Seznam podporovaných datových typů najdete v dokumentaci k datovým typům .
Pokud sloupec není na začátku datového proudu, můžete také použít náznaky schématu k přidání tohoto sloupce do odvozeného schématu.
Tady je příklad odvozeného schématu pro zobrazení chování s nápovědou schématu.
Odvozené schéma:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
Zadáním následujících tipů schématu:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
získáte:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Poznámka:
Podpora nápovědy pro schéma polí a map je dostupná v Databricks Runtime 9.1 LTS a novějších.
Tady je příklad odvozeného schématu se složitými datovými typy, abyste viděli chování pomocí tipů schématu.
Odvozené schéma:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
Zadáním následujících tipů schématu:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
získáte:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Poznámka:
Nápovědy schématu se používají pouze v případě, že nezadáte schéma Auto Loaderu. Můžete použít nápovědu schématu, jestli cloudFiles.inferColumnTypes
je povolená nebo zakázaná.