Configurare l'inferenza e l'evoluzione dello schema nel caricatore automatico
È possibile configurare il caricatore automatico per rilevare automaticamente lo schema dei dati caricati, consentendo di inizializzare le tabelle senza dichiarare in modo esplicito lo schema dei dati ed evolvere lo schema della tabella man mano che vengono introdotte nuove colonne. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche dello schema nel tempo.
Il caricatore automatico può anche "salvare" dati imprevisti (ad esempio, di tipi di dati differenti) in una colonna blob JSON. È possibile scegliere di accedervi in un secondo momento utilizzando le API di accesso ai dati semistrutturati .
Per l'inferenza e l'evoluzione dello schema sono supportati i formati seguenti:
Formato di file | Versioni supportate |
---|---|
JSON |
Tutte le versioni |
CSV |
Tutte le versioni |
XML |
Databricks Runtime 14.3 LTS e versioni successive |
Avro |
Databricks Runtime 10.4 LTS e versioni successive |
Parquet |
Databricks Runtime 11.3 LTS e versioni successive |
ORC |
Non supportato |
Text |
Non applicabile (schema fisso) |
Binaryfile |
Non applicabile (a schema fisso) |
Sintassi per l'inferenza e l'evoluzione dello schema
La specifica di una directory di destinazione per l'opzione cloudFiles.schemaLocation
abilita l'inferenza dello schema e l'evoluzione. È possibile scegliere di usare la stessa directory specificata per checkpointLocation
. Se si usa DLT, Azure Databricks gestisce automaticamente la posizione dello schema e altre informazioni sul checkpoint.
Nota
Se nella tabella di destinazione sono stati caricati più percorsi dati di origine, ogni carico di lavoro di inserimento automatico del caricatore richiede un checkpoint di streaming separato.
Nel seguente esempio, viene usato parquet
per il cloudFiles.format
. Usare csv
, avro
o json
per altre origini file. Tutte le altre impostazioni per la lettura e la scrittura rimangono invariate per i comportamenti predefiniti per ogni formato.
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Come funziona l'inferenza dello schema del caricatore automatico?
Per dedurre lo schema durante la prima lettura dei dati, il caricatore automatico campiona i primi 50 GB o 1000 file individuati, a qualunque limite venga superato per primo. Il caricatore automatico archivia le informazioni sullo schema in una directory _schemas
nel cloudFiles.schemaLocation
configurato per tenere traccia delle modifiche dello schema dei dati di input nel corso del tempo.
Nota
Per modificare le dimensioni dell'esempio usato, è possibile impostare le configurazioni SQL:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(stringa di byte, ad esempio 10gb
)
e
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(intero)
Per impostazione predefinita, l'inferenza dello schema di Auto Loader cerca di evitare problemi di evoluzione dello schema a causa di mancata corrispondenza dei tipi. Per i formati che non codificano i tipi di dati (JSON, CSV e XML), Il caricatore automatico deduce tutte le colonne come stringhe (inclusi i campi annidati nei file JSON). Per i formati con schema tipizzato (Parquet e Avro), Il caricatore automatico esegue l'esempio di un subset di file e unisce gli schemi dei singoli file. Questo comportamento è riepilogato nella tabella seguente:
Formato di file | Tipo di dati dedotto predefinito |
---|---|
JSON |
Stringa |
CSV |
Stringa |
XML |
Stringa |
Avro |
Tipi codificati nello schema Avro |
Parquet |
Tipi codificati nello schema Parquet |
Apache Spark DataFrameReader usa un comportamento diverso per l'inferenza dello schema, selezionando i tipi di dati per le colonne in origini JSON, CSV e XML in base ai dati di esempio. Per abilitare questo comportamento con il caricatore automatico, impostare l'opzione cloudFiles.inferColumnTypes
su true
.
Nota
Quando si deduce lo schema per i dati CSV, il caricatore automatico presuppone che i file contengano intestazioni. Se i file CSV non contengono intestazioni, fornire l'opzione .option("header", "false")
. Inoltre, il caricatore automatico unisce gli schemi di tutti i file nell'esempio per ottenere uno schema globale. Il caricatore automatico può quindi leggere ogni file in base all'intestazione e analizzare correttamente il file CSV.
Nota
Quando una colonna ha tipi di dati diversi in due file Parquet, il caricatore automatico sceglie il tipo più ampio. È possibile usare schemaHints per eseguire l'override di questa scelta. Quando si specificano suggerimenti per lo schema, Auto Loader non esegue il cast della colonna al tipo specificato, ma indica al lettore Parquet di interpretare la colonna come del tipo specificato. In caso di mancata corrispondenza, la colonna viene recuperata nella colonna di dati recuperati .
Come funziona l'evoluzione dello schema del caricatore automatico?
Il caricatore automatico rileva l'aggiunta di nuove colonne durante l'elaborazione dei dati. Quando il caricatore automatico rileva una nuova colonna, il flusso si arresta con un UnknownFieldException
. Prima che il flusso generi questo errore, Auto Loader esegue l'inferenza dello schema sull'ultimo micro-batch di dati e aggiorna il percorso dello schema con lo schema più recente unendo nuove colonne alla fine dello schema. I tipi di dati delle colonne esistenti rimangono invariati.
Databricks consiglia di configurare flussi del caricatore automatico con processi di Databricks per riavviare automaticamente dopo tali modifiche dello schema.
Il caricatore automatico supporta le modalità seguenti per l'evoluzione dello schema, impostata nell'opzione cloudFiles.schemaEvolutionMode
:
Nota
addNewColumns
modalità è l'impostazione predefinita quando non viene fornito uno schema, ma none
è l'impostazione predefinita quando si specifica uno schema.
addNewColumns
non è consentito quando viene fornito lo schema del flusso, ma funziona se si specifica lo schema come indicazione dello schema .
Come funzionano le partizioni con l'Autoloader?
Il caricatore automatico tenta di dedurre le colonne di partizione dalla struttura di directory sottostante dei dati, nel caso in cui questi siano disposti secondo il partizionamento in stile Hive. Ad esempio, il percorso del file base_path/event=click/date=2021-04-01/f0.json
comporta l'inferenza di date
e event
come colonne di partizione. Se la struttura di directory sottostante contiene partizioni Hive in conflitto o non contiene il partizionamento dello stile Hive, le colonne di partizione vengono ignorate.
I file binari (binaryFile
) e i formati di file text
hanno schemi di dati fissi, ma supportano l'inferenza delle colonne di partizione. Databricks consiglia di impostare cloudFiles.schemaLocation
per questi formati di file. In questo modo si evitano potenziali errori o perdite di informazioni e si impedisce l'inferenza delle colonne delle partizioni ogni volta che inizia un caricatore automatico.
Le colonne di partizione non vengono considerate per l'evoluzione dello schema. Se si dispone di una struttura di directory iniziale come base_path/event=click/date=2021-04-01/f0.json
e poi si inizia a ricevere nuovi file come base_path/event=click/date=2021-04-01/hour=01/f1.json
, Auto Loader ignora la colonna ora. Per acquisire informazioni per le nuove colonne di partizione, impostare cloudFiles.partitionColumns
su event,date,hour
.
Nota
L'opzione cloudFiles.partitionColumns
accetta un elenco delimitato da virgole di nomi di colonna. Vengono analizzate solo le colonne che esistono come coppie di key=value
nella struttura della directory.
Qual è la colonna di dati salvata?
Quando il caricatore automatico deduce lo schema, una colonna di dati salvata viene aggiunta automaticamente allo schema come _rescued_data
. È possibile rinominare la colonna o includerla nei casi in cui si specifica uno schema impostando l'opzione rescuedDataColumn
.
La colonna di dati salvata garantisce che le colonne che non corrispondono allo schema vengano salvate anziché essere eliminate. La colonna di dati salvata contiene tutti i dati che non vengono analizzati per i motivi seguenti:
- La colonna non è presente nello schema.
- Incongruenze dei tipi
- Mancata corrispondenza tra maiuscole e minuscole.
La colonna di dati salvata contiene un codice JSON contenente le colonne salvate e il percorso del file di origine del record.
Nota
I parser JSON e CSV supportano tre modalità durante l'analisi dei record: PERMISSIVE
, DROPMALFORMED
e FAILFAST
. Se usato insieme a rescuedDataColumn
, le mancate corrispondenze del tipo di dati non causano l'esclusione dei record in modalità DROPMALFORMED
oppure generano un errore in modalità FAILFAST
. Solo i record danneggiati vengono eliminati o generati errori, ad esempio JSON incompleto o in formato non valido o CSV. Se si usa il badRecordsPath
per analizzare i JSON o i CSV, le incongruenze nei tipi di dati non vengono considerate come record non validi quando si usa il rescuedDataColumn
. Solo i record JSON o CSV incompleti e in formato non valido vengono archiviati in badRecordsPath
.
Modificare il comportamento con distinzione tra maiuscole e minuscole
A meno che non sia abilitata la distinzione tra maiuscole e minuscole, le colonne abc
, Abc
e ABC
vengono considerate la stessa colonna ai fini dell'inferenza dello schema. Il caso scelto è arbitrario e dipende dai dati campionati. È possibile usare suggerimenti dello schema per imporre quale caso deve essere utilizzato. Dopo aver effettuato una selezione e aver dedotto lo schema, il caricatore automatico non considera le varianti di maiuscole e minuscole non selezionate in modo coerente con lo schema.
Quando colonna di dati salvata è abilitata, i campi denominati in un caso diverso da quello dello schema vengono caricati nella colonna _rescued_data
. Modificare questo comportamento impostando l'opzione readerCaseSensitive
su false, nel qual caso il caricatore automatico legge i dati in modo senza distinzione tra maiuscole e minuscole.
Sovrascrivere l'inferenza dello schema con le indicazioni dello schema
È possibile usare gli hint dello schema per applicare le informazioni sullo schema che si conoscono e si prevedono in uno schema dedotto. Quando si sa che una colonna è di un tipo di dati specifico o se si desidera scegliere un tipo di dati più generale, ad esempio un double
anziché un integer
, è possibile fornire un numero arbitrario di hint per i tipi di dati di colonna come stringa usando la sintassi della specifica dello schema SQL, ad esempio:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
Vedere la documentazione sui tipi di dati per l'elenco dei tipi di dati supportati.
Se una colonna non è presente all'inizio del flusso, è anche possibile usare gli hint dello schema per aggiungere tale colonna allo schema dedotto.
Di seguito è riportato un esempio di schema dedotto per visualizzare il comportamento con gli hint dello schema.
Schema dedotto:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
Specificando gli hint dello schema seguenti:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
ottieni:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Nota
Il supporto per gli hint degli schemi di array e mappa è disponibile in Databricks Runtime 9.1 LTS e versioni successive.
Di seguito è riportato un esempio di schema dedotto con tipi di dati complessi per visualizzare il comportamento con gli hint dello schema.
Schema dedotto:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
Specificando gli hint dello schema seguenti:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
ottieni:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Nota
Gli hint dello schema vengono usati solo se si non fornire uno schema al caricatore automatico. È possibile usare gli hint per lo schema sia che cloudFiles.inferColumnTypes
sia abilitato o disabilitato.