Sdílet prostřednictvím


Konfigurace odvození schématu a vývoje v automatickém zavaděči

Automatický zavaděč můžete nakonfigurovat tak, aby automaticky rozpoznal schéma načtených dat, což umožňuje inicializovat tabulky bez explicitního deklarování schématu dat a vyvíjet schéma tabulky při zavádění nových sloupců. To eliminuje potřebu ručního sledování a použití změn schématu v průběhu času.

Auto Loader může také "zachránit" data, která byla neočekávaná (například u různých datových typů) ve sloupci JSON blob, ke kterým můžete později přistupovat pomocí polostrukturovaných rozhraní API pro přístup k datům.

Pro odvozování a vývoj schématu se podporují následující formáty:

formát souboru Podporované verze
JSON Všechny verze
CSV Všechny verze
XML Databricks Runtime 14.3 LTS a vyšší
Avro Databricks Runtime 10.4 LTS a vyšší
Parquet Databricks Runtime 11.3 LTS a vyšší
ORC Nepodporované
Text Nepoužitelné (pevné schéma)
Binaryfile Nepoužitelné (pevné schéma)

Syntaxe pro odvozování a vývoj schématu

Určení cílového adresáře pro možnost cloudFiles.schemaLocation umožňuje odvozování a vývoj schématu. Můžete použít stejný adresář, který zadáte pro checkpointLocation. Pokud používáte DLT, Azure Databricks spravuje umístění schématu a další informace o kontrolních bodech automaticky.

Poznámka:

Pokud do cílové tabulky načítáte více než jedno zdrojové umístění dat, každá úloha zpracování Auto Loader vyžaduje samostatný kontrolní bod streamování.

Následující příklad používá parquet pro cloudFiles.format. Použijte csv, avronebo json pro jiné zdroje souborů. Všechna ostatní nastavení pro čtení a zápis zůstávají stejná pro výchozí chování jednotlivých formátů.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Jak funguje automatické odvozování schématu u Auto Loaderu?

Pro odvození schématu při prvním čtení dat auto loader vzorkuje prvních 50 GB nebo 1000 souborů, které zjistí, podle toho, který limit je překročen jako první. Auto Loader ukládá informace o schématu v adresáři _schemas nakonfigurované cloudFiles.schemaLocation tak, aby sledoval změny schématu vstupních dat v průběhu času.

Poznámka:

Pokud chcete změnit velikost použité ukázky, můžete nastavit konfigurace SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(bajtový řetězec, například 10gb)

a

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(celé číslo)

Ve výchozím nastavení se při odvozování schématu automatického zavaděče snaží vyhnout problémům s vývojem schématu kvůli neshodám typů. U formátů, které nekódují datové typy (JSON, CSV a XML), auto loader odvodí všechny sloupce jako řetězce (včetně vnořených polí v souborech JSON). U formátů se zadaným schématem (Parquet a Avro) auto loader vzorkuje podmnožinu souborů a slučuje schémata jednotlivých souborů. Toto chování je shrnuto v následující tabulce:

Formát souboru Výchozí odvozený datový typ
JSON řetězec
CSV řetězec
XML řetězec
Avro Typy kódované ve schématu Avro
Parquet Typy kódované ve schématu Parquet

Apache Spark DataFrameReader používá pro odvozování schématu různé chování, výběr datových typů pro sloupce ve zdrojích JSON, CSV a XML na základě ukázkových dat. Chcete-li toto chování povolit pomocí Auto Loaderu, nastavte možnost cloudFiles.inferColumnTypes na true.

Poznámka:

Při odvozování schématu pro data ve formátu CSV Auto Loader předpokládá, že soubory obsahují záhlaví. Pokud soubory CSV neobsahují záhlaví, zadejte možnost .option("header", "false"). Kromě toho Auto Loader slučuje schémata všech vzorových souborů, aby vzniklo globální schéma. Auto Loader pak může číst každý soubor podle záhlaví a správně parsovat CSV.

Poznámka:

Pokud sloupec obsahuje různé datové typy ve dvou souborech Parquet, automatický zavaděč vybere nejširší typ. Tuto volbu můžete přepsat pomocí funkce schemaHints . Když zadáte rady schématu, Auto Loader nepřetypuje sloupec na zadaný typ, ale řekne čtečce Parquet, aby sloupec četla jako zadaný typ. V případě neshody se sloupec zachrání ve zachráněném datovém sloupci.

Jak funguje vývoj schématu Auto Loaderu?

Auto Loader zjistí přidání nových sloupců při zpracování dat. Když Auto Loader zjistí nový sloupec, datový proud se zastaví s UnknownFieldException. Než váš datový proud vyvolá tuto chybu, Auto Loader provede odvozování schématu na nejnovější mikrodávce dat a aktualizuje lokaci schématu pomocí nejnovějšího schématu sloučením nových sloupců na konec schématu. Datové typy existujících sloupců zůstávají beze změny.

Databricks doporučuje nakonfigurovat Auto Loader streamy pomocí Databricks Jobs, tak aby se po takových změnách schématu automaticky restartovaly.

Auto Loader podporuje pro vývoj schématu následující režimy, které jste nastavili v možnosti cloudFiles.schemaEvolutionMode:

Režim Chování při čtení nového sloupce
addNewColumns (výchozí) Stream selhává. Do schématu se přidají nové sloupce. Existující sloupce nemění datové typy.
rescue Neprobíhají žádné změny ve schématu a přenos dat se nezastaví kvůli změnám ve schématu. Všechny nové sloupce jsou zaznamenány ve sloupci zachráněných dat.
failOnNewColumns Stream nefunguje. Stream se nerestartuje, pokud není aktualizováno poskytnuté schéma nebo odstraněn problemový datový soubor.
none Nevyvíjí schéma, nové sloupce se ignorují a data se nezachovají, pokud není nastavená rescuedDataColumn možnost. Stream neselže kvůli změnám schématu.

Poznámka:

addNewColumns režim je výchozí, pokud schéma není zadané, ale none je výchozí při zadání schématu. addNewColumns není povoleno, pokud je poskytnuto schéma pro stream, ale funguje, pokud zadáte své schéma jako nápovědu schématu .

Jak funguje rozdělení v Auto Loaderu?

Automatický zavaděč se pokusí odvodit sloupce oddílů ze základní adresářové struktury dat, pokud jsou data rozložená v dělení stylu Hive. Například cesta k souboru base_path/event=click/date=2021-04-01/f0.json vede k tomu, že jsou date a event odvozeny jako sloupce oddílů. Pokud základní adresářová struktura obsahuje konfliktní oddíly Hive nebo neobsahuje dělení stylu Hive, sloupce oddílů se ignorují.

Binární soubor (binaryFile) a text formáty souborů mají pevná schémata dat, ale podporují odvozování sloupců oddílů. Databricks doporučuje nastavit cloudFiles.schemaLocation pro tyto formáty souborů. Tím se zabrání případným chybám nebo ztrátě informací a zabrání nutnosti odvozovat sloupce oddílů při každém spuštění Auto Loaderu.

Sloupce pro rozdělení nejsou zohledněny při evoluci schématu. Pokud jste měli počáteční adresářovou strukturu jako base_path/event=click/date=2021-04-01/f0.json, a pak začněte přijímat nové soubory jako base_path/event=click/date=2021-04-01/hour=01/f1.json, Automatický Zavaděč ignoruje sloupec hodin. Pokud chcete zaznamenat informace pro nové sloupce oddílů, nastavte cloudFiles.partitionColumns na event,date,hour.

Poznámka:

Tato možnost cloudFiles.partitionColumns přebírá čárkami oddělený seznam názvů sloupců. Parsují se jenom sloupce, které existují jako key=value páry ve vaší adresářové struktuře.

Co je sloupec obnovených dat?

Když Auto Loader odvodí schéma, do vašeho schématu se automaticky přidá záchranný datový sloupec jako _rescued_data. Sloupec můžete přejmenovat nebo zahrnout v případech, kdy zadáte schéma nastavením možnosti rescuedDataColumn.

Sloupec zachráněných dat zajišťuje, aby se sloupce, které se neshodují se schématem, zachrání místo vyřazení. Sloupec zachráněných dat obsahuje všechna data, která nejsou analyzována z následujících důvodů:

  • Ve schématu chybí sloupec.
  • Neshody typů
  • Neshody velkých a malých písmen.

Sloupec zachráněných dat obsahuje JSON obsahující záchranné sloupce a cestu ke zdrojovému souboru záznamu.

Poznámka:

Analyzátory JSON a CSV podporují při analýze záznamů tři režimy: PERMISSIVE, DROPMALFORMEDa FAILFAST. Při použití společně s datovým rescuedDataColumntypem neshody nezpůsobí vyřazení záznamů v DROPMALFORMED režimu nebo vyvolání chyby v FAILFAST režimu. Only poškozené záznamy jsou odstraněny nebo způsobí chyby, jako například neúplné nebo poškozené soubory JSON nebo CSV. Pokud použijete badRecordsPath při analýze FORMÁTU JSON nebo CSV, neshody datových typů se při použití objektu rescuedDataColumn. nepovažují za chybné záznamy . Pouze neúplné a malformované záznamy JSON nebo CSV jsou uloženy v badRecordsPath.

Změna chování rozlišujícího malá a velká písmena

Pokud není povolena citlivost písmen, sloupce abc, Abc a ABC jsou považovány za stejný sloupec pro účely odvození schématu. Zvolený případ je libovolný a závisí na vzorkovaných datech. Pomocí nápověd schématu můžete určit, který případe by měl být použit. Po provedení výběru a odvození schématu automatický zavaděč nebere v úvahu varianty velikostí písmen, které nebyly vybrány v souladu se schématem.

Pokud je povolený záchranný datový sloupec, načtou se pole pojmenovaná jinak než podle schématu do sloupce _rescued_data. Toto chování změňte nastavením možnosti readerCaseSensitive na false, v takovém případě Auto Loader čte data bez rozlišování velkých a malých písmen.

Přepsání odvození schématu pomocí tipů schématu

Pomocí nápovědu ke schématu můžete vynutit informace o schématu, které znáte a očekáváte u odvozeného schématu. Pokud víte, že sloupec je konkrétní datový typ, nebo pokud chcete zvolit obecnější datový typ (například double místo integer), můžete zadat libovolný počet tipů pro datové typy sloupců jako řetězec pomocí syntaxe specifikace schématu SQL, například:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Seznam podporovaných datových typů najdete v dokumentaci k datovým typům .

Pokud sloupec není na začátku datového proudu, můžete také použít náznaky schématu k přidání tohoto sloupce do odvozeného schématu.

Tady je příklad odvozeného schématu pro zobrazení chování s nápovědou schématu.

Odvozené schéma:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Zadáním následujících tipů schématu:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

získáte:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Poznámka:

Podpora nápovědy pro schéma polí a map je dostupná v Databricks Runtime 9.1 LTS a novějších.

Tady je příklad odvozeného schématu se složitými datovými typy, abyste viděli chování pomocí tipů schématu.

Odvozené schéma:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Zadáním následujících tipů schématu:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

získáte:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Poznámka:

Nápovědy schématu se používají pouze v případě, že nezadáte schéma Auto Loaderu. Můžete použít nápovědu schématu, jestli cloudFiles.inferColumnTypes je povolená nebo zakázaná.