Configurare l'inferenza e l'evoluzione nel caricatore automatico schema
È possibile configurare il caricatore automatico per rilevare automaticamente il schema dei dati caricati, consentendo di inizializzare tables senza dichiarare esplicitamente il schema ed evolvere il tableschema man mano che vengono introdotti nuovi columns. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche schema nel tempo.
Il caricatore automatico può anche "salvare" dati imprevisti (ad esempio, di tipi di dati diversi) in un BLOB JSON column, a cui è possibile accedere in un secondo momento usando le API di accesso ai dati semistrutturati .
I seguenti formati sono supportati per l'inferenza e l'evoluzione schema.
Formato di file | Versioni supportate |
---|---|
JSON |
Tutte le versioni |
CSV |
Tutte le versioni |
XML |
Databricks Runtime 14.3 LTS e versioni successive |
Avro |
Databricks Runtime 10.4 LTS e versioni successive |
Parquet |
Databricks Runtime 11.3 LTS e versioni successive |
ORC |
Non supportato |
Text |
Non applicabile (fixed-schema) |
Binaryfile |
Non applicabile (fixed-schema) |
Sintassi inferenziale ed evolutiva per schema
Specificando una directory di destinazione per l'opzione cloudFiles.schemaLocation
, si abilita l'inferenza e l'evoluzione di schema. È possibile scegliere di usare la stessa directory specificata per checkpointLocation
. Se si usa Delta Live Tables, Azure Databricks gestisce automaticamente le altre informazioni sul checkpoint e la posizione schema.
Nota
Se sono state caricate più di un'origine dati nel target table, ogni processo di inserimento tramite Auto Loader richiede un checkpoint di streaming separato.
Nell'esempio seguente viene parquet
usato per .cloudFiles.format
Usare csv
, avro
o json
per altre origini file. Tutte le altre impostazioni per la lettura e la scrittura rimangono invariate per i comportamenti predefiniti per ogni formato.
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Come funziona l'inferenza di Auto Loader schema?
Per dedurre il schema durante la prima lettura dei dati, il caricatore automatico esegue l'esempio dei primi 50 GB o 1000 file individuati, a qualsiasi limit venga superato per primo. Il caricatore automatico archivia le informazioni di schema in una directory _schemas
nel cloudFiles.schemaLocation
configurato per tenere traccia delle modifiche schema ai dati di input nel tempo.
Nota
Per modificare le dimensioni dell'esempio usato, è possibile set le configurazioni SQL:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(stringa di byte, ad esempio 10gb
)
e
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(integer)
Per impostazione predefinita, l'inferenza del caricatore automatico schema cerca di evitare problemi di evoluzione schema a causa di mancate corrispondenze del tipo. Per i formati che non codificano i tipi di dati (JSON, CSV e XML), il caricatore automatico deduce tutti i columns come stringhe (inclusi i campi annidati nei file JSON). Per i formati con schema tipizzato (Parquet e Avro), il caricatore automatico esegue il campionamento di un sottoinsieme di file e fonde gli schemi dei singoli file. Questo comportamento è riepilogato nei tableseguenti:
Formato di file | Tipo di dati dedotto predefinito |
---|---|
JSON |
String |
CSV |
String |
XML |
String |
Avro |
Tipi codificati in Avro schema |
Parquet |
Tipi codificati in Parquet schema |
Apache Spark DataFrameReader utilizza un comportamento diverso per l'inferenza di schema, selezionando i tipi di dati per columns nelle origini JSON, CSV e XML basandosi sui dati di esempio. Per abilitare questo comportamento con il caricatore automatico, set l'opzione cloudFiles.inferColumnTypes
per true
.
Nota
Quando si deduce il schema per i dati CSV, il caricatore automatico presuppone che i file contengano intestazioni. Se i file CSV non contengono intestazioni, fornire l'opzione .option("header", "false")
. Inoltre, Auto Loader unisce gli schemi di tutti i file nel campione per ottenere uno schema globale schema. Il caricatore automatico può quindi leggere ogni file in base all'intestazione e analizzare correttamente il file CSV.
Nota
Quando un column ha tipi di dati diversi in due file Parquet, Auto Loader sceglie il tipo più ampio. È possibile usare schemaHints per eseguire l'override di questa scelta. Quando si specificano hint schema, Auto Loader non esegue il cast di column nel tipo specificato, ma indica al lettore Parquet di leggere column come corrispondente al tipo specificato. In caso di mancata corrispondenza, il column viene recuperato nei dati salvati column.
Come funziona l'evoluzione del caricatore automatico schema?
Il sistema di caricamento automatico rileva l'aggiunta di nuovi columns durante l'elaborazione dei tuoi dati. Quando l'Auto Loader rileva un nuovo column, il flusso si arresta con un UnknownFieldException
. Prima che il tuo flusso generi questo errore, Auto Loader esegue un'inferenza schema sul micro-batch di dati più recente e aggiorna la posizione schema con l'ultimo schema aggiungendo nuovi columns alla fine del schema. I tipi di dati di columns esistenti rimangono invariati.
Databricks consiglia di configurare i flussi del Caricatore Automatico con Jobs di Databricks per riavviarsi automaticamente dopo tali modifiche schema.
Il caricatore automatico supporta le seguenti modalità per l'evoluzione schema, che puoi set nell'opzione cloudFiles.schemaEvolutionMode
:
Modalità | Comportamento durante la lettura di nuovi column |
---|---|
addNewColumns (predefinito) |
Flusso non riesce. I nuovi columns vengono aggiunti al schema. I columns esistenti non modificano i tipi di dati. |
rescue |
Schema non viene mai evoluto e il flusso non fallisce a causa delle modifiche di schema. Tutte le nuove columns vengono registrate nei dati recuperati column. |
failOnNewColumns |
Flusso non riesce. Stream non si riavvia a meno che il schema specificato non venga aggiornato o il file di dati problematico venga rimosso. |
none |
Non si evolve il schema, le nuove columns vengono ignorate e i dati non vengono recuperati a meno che l'opzione rescuedDataColumn non sia set. Lo stream non fallisce a causa delle modifiche schema. |
Nota
La modalità addNewColumns
è quella predefinita quando non viene specificata una schema, ma la modalità none
viene utilizzata di default quando è fornita una schema.
addNewColumns
non è consentito quando viene fornito il schema del flusso, ma funziona se si specifica il schema come hint schema.
Come funzionano le partizioni con l'Autoloader?
Auto Loader tenta di dedurre partitioncolumns dalla struttura di directory sottostante dei dati se i dati sono disposti secondo il partizionamento in stile Hive. Ad esempio, il percorso del file base_path/event=click/date=2021-04-01/f0.json
comporta l'inferenza di date
e event
come partitioncolumns. Se la struttura di directory sottostante contiene partizioni Hive in conflitto o non contiene il partizionamento dello stile Hive, partitioncolumns vengono ignorati.
Il file binario (binaryFile
) e i formati di file text
hanno schemi di dati fissi, ma supportano l'inferenza partitioncolumn. Databricks consiglia di impostare cloudFiles.schemaLocation
per questi formati di file. In questo modo si evitano potenziali errori o perdite di informazioni e si impedisce l'inferenza delle partizioni columns ogni volta che inizia un caricatore automatico.
Partition
columns non sono considerati per l'evoluzione di schema. Se si dispone di una struttura di directory iniziale come base_path/event=click/date=2021-04-01/f0.json
e quindi di iniziare a ricevere nuovi file come base_path/event=click/date=2021-04-01/hour=01/f1.json
, Auto Loader ignora l'orario column. Per acquisire informazioni sui nuovi partitioncolumns, setcloudFiles.partitionColumns
fino a event,date,hour
.
Nota
L'opzione cloudFiles.partitionColumns
accetta un list delimitato da virgole di nomi di column. Solo i columns che esistono in coppie di key=value
nella tua struttura di directory vengono analizzati.
Quali sono i dati salvati column?
Quando il caricatore automatico deduce il schema, i dati salvati column vengono automaticamente aggiunti al schema come _rescued_data
. È possibile rinominare il column o includerlo nei casi where si fornisce un schema impostando l'opzione rescuedDataColumn
.
I dati salvati column assicurano che columns che non corrispondono al schema vengano salvati invece di essere eliminati. I dati salvati column contengono dati che non vengono analizzati per i motivi seguenti:
- Il column non è presente nel schema.
- Mancata corrispondenza del tipo.
- Mancata corrispondenza tra maiuscole e minuscole.
I dati salvati column contengono un codice JSON contenente il columns salvato e il percorso del file di origine del record.
Nota
I parser JSON e CSV supportano tre modalità durante l'analisi dei record: PERMISSIVE
, DROPMALFORMED
e FAILFAST
. Se usato insieme a rescuedDataColumn
, le mancate corrispondenze del tipo di dati non causano l'esclusione dei record in modalità DROPMALFORMED
oppure generano un errore in modalità FAILFAST
. Solo i record danneggiati vengono eliminati o generati errori, ad esempio JSON incompleto o in formato non valido o CSV. Se si usa durante l'analisi badRecordsPath
di JSON o CSV, le mancate corrispondenze del tipo di dati non vengono considerate come record non validi quando si usa .rescuedDataColumn
Solo i record JSON o CSV incompleti e in formato non valido vengono archiviati in badRecordsPath
.
Modificare il comportamento con distinzione tra maiuscole e minuscole
A meno che non sia abilitata la distinzione tra maiuscole e minuscole, l'columns,abc
, Abc
e ABC
vengono considerati gli stessi column ai fini dell'inferenza schema. Il caso scelto è arbitrario e dipende dai dati campionati. È possibile usare schema hint per applicare il caso da usare. Dopo che è stata effettuata una selezione e il schema è stato dedotto, Auto Loader non considera le varianti di maiuscole e minuscole non selezionate coerenti con il schema.
Quando è abilitata la column dei dati salvati, i campi denominati in un caso diverso da quello del schema vengono caricati nella _rescued_data
column. Modificare questo comportamento impostando l'opzione readerCaseSensitive
su false, nel qual caso il caricatore automatico legge i dati in modo senza distinzione tra maiuscole e minuscole.
eseguire l'override dell'inferenza schema con i suggerimenti schema
È possibile usare schema hint per applicare le informazioni schema che si conoscono e si prevedono in un schemadedotto. Quando si è certi che un column è di un tipo di dati specifico o se si desidera scegliere un tipo di dati più generale, ad esempio un double
anziché un integer
, è possibile fornire un numero arbitrario di hint per i tipi di dati column come stringa usando la sintassi delle specifiche di SQL schema, ad esempio:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
Consultare la documentazione sui tipi di dati per list dei tipi di dati supportati.
Se un column non è presente all'inizio del flusso, puoi anche usare i suggerimenti schema per aggiungere column all'schemadedotto.
Di seguito è riportato un esempio di schema dedotto per visualizzare il comportamento con hint schema.
schemadedotto:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
Specificando i seguenti suggerimenti schema:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
tu get:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Nota
Il supporto degli hint schema per array e mappa è disponibile in Databricks Runtime 9.1 LTS e versioni successive.
Di seguito è riportato un esempio di schema dedotto con tipi di dati complessi per visualizzare il comportamento con hint schema.
schemadedotto:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
Specificando i seguenti suggerimenti schema:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
tu get:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Nota
Schema hint vengono usati solo se si non fornire un schema al caricatore automatico. È possibile usare il suggerimento schema indipendentemente dal fatto che cloudFiles.inferColumnTypes
sia abilitato o disabilitato.