Condividi tramite


Configurare l'inferenza e l'evoluzione dello schema nel caricatore automatico

È possibile configurare il caricatore automatico per rilevare automaticamente lo schema dei dati caricati, consentendo di inizializzare le tabelle senza dichiarare in modo esplicito lo schema dei dati ed evolvere lo schema della tabella man mano che vengono introdotte nuove colonne. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche dello schema nel tempo.

Il caricatore automatico può anche "salvare" dati imprevisti (ad esempio, di tipi di dati differenti) in una colonna blob JSON. È possibile scegliere di accedervi in un secondo momento utilizzando le API di accesso ai dati semistrutturati .

Per l'inferenza e l'evoluzione dello schema sono supportati i formati seguenti:

Formato di file Versioni supportate
JSON Tutte le versioni
CSV Tutte le versioni
XML Databricks Runtime 14.3 LTS e versioni successive
Avro Databricks Runtime 10.4 LTS e versioni successive
Parquet Databricks Runtime 11.3 LTS e versioni successive
ORC Non supportato
Text Non applicabile (schema fisso)
Binaryfile Non applicabile (a schema fisso)

Sintassi per l'inferenza e l'evoluzione dello schema

La specifica di una directory di destinazione per l'opzione cloudFiles.schemaLocation abilita l'inferenza dello schema e l'evoluzione. È possibile scegliere di usare la stessa directory specificata per checkpointLocation. Se si usa DLT, Azure Databricks gestisce automaticamente la posizione dello schema e altre informazioni sul checkpoint.

Nota

Se nella tabella di destinazione sono stati caricati più percorsi dati di origine, ogni carico di lavoro di inserimento automatico del caricatore richiede un checkpoint di streaming separato.

Nel seguente esempio, viene usato parquet per il cloudFiles.format. Usare csv, avroo json per altre origini file. Tutte le altre impostazioni per la lettura e la scrittura rimangono invariate per i comportamenti predefiniti per ogni formato.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Come funziona l'inferenza dello schema del caricatore automatico?

Per dedurre lo schema durante la prima lettura dei dati, il caricatore automatico campiona i primi 50 GB o 1000 file individuati, a qualunque limite venga superato per primo. Il caricatore automatico archivia le informazioni sullo schema in una directory _schemas nel cloudFiles.schemaLocation configurato per tenere traccia delle modifiche dello schema dei dati di input nel corso del tempo.

Nota

Per modificare le dimensioni dell'esempio usato, è possibile impostare le configurazioni SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(stringa di byte, ad esempio 10gb)

e

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(intero)

Per impostazione predefinita, l'inferenza dello schema di Auto Loader cerca di evitare problemi di evoluzione dello schema a causa di mancata corrispondenza dei tipi. Per i formati che non codificano i tipi di dati (JSON, CSV e XML), Il caricatore automatico deduce tutte le colonne come stringhe (inclusi i campi annidati nei file JSON). Per i formati con schema tipizzato (Parquet e Avro), Il caricatore automatico esegue l'esempio di un subset di file e unisce gli schemi dei singoli file. Questo comportamento è riepilogato nella tabella seguente:

Formato di file Tipo di dati dedotto predefinito
JSON Stringa
CSV Stringa
XML Stringa
Avro Tipi codificati nello schema Avro
Parquet Tipi codificati nello schema Parquet

Apache Spark DataFrameReader usa un comportamento diverso per l'inferenza dello schema, selezionando i tipi di dati per le colonne in origini JSON, CSV e XML in base ai dati di esempio. Per abilitare questo comportamento con il caricatore automatico, impostare l'opzione cloudFiles.inferColumnTypes su true.

Nota

Quando si deduce lo schema per i dati CSV, il caricatore automatico presuppone che i file contengano intestazioni. Se i file CSV non contengono intestazioni, fornire l'opzione .option("header", "false"). Inoltre, il caricatore automatico unisce gli schemi di tutti i file nell'esempio per ottenere uno schema globale. Il caricatore automatico può quindi leggere ogni file in base all'intestazione e analizzare correttamente il file CSV.

Nota

Quando una colonna ha tipi di dati diversi in due file Parquet, il caricatore automatico sceglie il tipo più ampio. È possibile usare schemaHints per eseguire l'override di questa scelta. Quando si specificano suggerimenti per lo schema, Auto Loader non esegue il cast della colonna al tipo specificato, ma indica al lettore Parquet di interpretare la colonna come del tipo specificato. In caso di mancata corrispondenza, la colonna viene recuperata nella colonna di dati recuperati .

Come funziona l'evoluzione dello schema del caricatore automatico?

Il caricatore automatico rileva l'aggiunta di nuove colonne durante l'elaborazione dei dati. Quando il caricatore automatico rileva una nuova colonna, il flusso si arresta con un UnknownFieldException. Prima che il flusso generi questo errore, Auto Loader esegue l'inferenza dello schema sull'ultimo micro-batch di dati e aggiorna il percorso dello schema con lo schema più recente unendo nuove colonne alla fine dello schema. I tipi di dati delle colonne esistenti rimangono invariati.

Databricks consiglia di configurare flussi del caricatore automatico con processi di Databricks per riavviare automaticamente dopo tali modifiche dello schema.

Il caricatore automatico supporta le modalità seguenti per l'evoluzione dello schema, impostata nell'opzione cloudFiles.schemaEvolutionMode:

Modalità Comportamento durante la lettura di una nuova colonna
addNewColumns (predefinito) Il flusso si interrompe. Le nuove colonne vengono aggiunte allo schema. Le colonne esistenti non evolvono i tipi di dati.
rescue Lo schema non è mai evoluto e il flusso non ha esito negativo a causa di modifiche dello schema. Tutte le nuove colonne vengono registrate nella colonna di dati recuperata .
failOnNewColumns Il flusso fallisce. Stream non viene riavviato a meno che lo schema specificato non venga aggiornato o che il file di dati che causa l'offesa venga rimosso.
none Non evolve lo schema, le nuove colonne vengono ignorate e i dati non vengono salvati a meno che non sia impostata l'opzione rescuedDataColumn. Il flusso non fallisce a causa di modifiche allo schema.

Nota

addNewColumns modalità è l'impostazione predefinita quando non viene fornito uno schema, ma none è l'impostazione predefinita quando si specifica uno schema. addNewColumns non è consentito quando viene fornito lo schema del flusso, ma funziona se si specifica lo schema come indicazione dello schema .

Come funzionano le partizioni con l'Autoloader?

Il caricatore automatico tenta di dedurre le colonne di partizione dalla struttura di directory sottostante dei dati, nel caso in cui questi siano disposti secondo il partizionamento in stile Hive. Ad esempio, il percorso del file base_path/event=click/date=2021-04-01/f0.json comporta l'inferenza di date e event come colonne di partizione. Se la struttura di directory sottostante contiene partizioni Hive in conflitto o non contiene il partizionamento dello stile Hive, le colonne di partizione vengono ignorate.

I file binari (binaryFile) e i formati di file text hanno schemi di dati fissi, ma supportano l'inferenza delle colonne di partizione. Databricks consiglia di impostare cloudFiles.schemaLocation per questi formati di file. In questo modo si evitano potenziali errori o perdite di informazioni e si impedisce l'inferenza delle colonne delle partizioni ogni volta che inizia un caricatore automatico.

Le colonne di partizione non vengono considerate per l'evoluzione dello schema. Se si dispone di una struttura di directory iniziale come base_path/event=click/date=2021-04-01/f0.jsone poi si inizia a ricevere nuovi file come base_path/event=click/date=2021-04-01/hour=01/f1.json, Auto Loader ignora la colonna ora. Per acquisire informazioni per le nuove colonne di partizione, impostare cloudFiles.partitionColumns su event,date,hour.

Nota

L'opzione cloudFiles.partitionColumns accetta un elenco delimitato da virgole di nomi di colonna. Vengono analizzate solo le colonne che esistono come coppie di key=value nella struttura della directory.

Qual è la colonna di dati salvata?

Quando il caricatore automatico deduce lo schema, una colonna di dati salvata viene aggiunta automaticamente allo schema come _rescued_data. È possibile rinominare la colonna o includerla nei casi in cui si specifica uno schema impostando l'opzione rescuedDataColumn.

La colonna di dati salvata garantisce che le colonne che non corrispondono allo schema vengano salvate anziché essere eliminate. La colonna di dati salvata contiene tutti i dati che non vengono analizzati per i motivi seguenti:

  • La colonna non è presente nello schema.
  • Incongruenze dei tipi
  • Mancata corrispondenza tra maiuscole e minuscole.

La colonna di dati salvata contiene un codice JSON contenente le colonne salvate e il percorso del file di origine del record.

Nota

I parser JSON e CSV supportano tre modalità durante l'analisi dei record: PERMISSIVE, DROPMALFORMEDe FAILFAST. Se usato insieme a rescuedDataColumn, le mancate corrispondenze del tipo di dati non causano l'esclusione dei record in modalità DROPMALFORMED oppure generano un errore in modalità FAILFAST. Solo i record danneggiati vengono eliminati o generati errori, ad esempio JSON incompleto o in formato non valido o CSV. Se si usa il badRecordsPath per analizzare i JSON o i CSV, le incongruenze nei tipi di dati non vengono considerate come record non validi quando si usa il rescuedDataColumn. Solo i record JSON o CSV incompleti e in formato non valido vengono archiviati in badRecordsPath.

Modificare il comportamento con distinzione tra maiuscole e minuscole

A meno che non sia abilitata la distinzione tra maiuscole e minuscole, le colonne abc, Abce ABC vengono considerate la stessa colonna ai fini dell'inferenza dello schema. Il caso scelto è arbitrario e dipende dai dati campionati. È possibile usare suggerimenti dello schema per imporre quale caso deve essere utilizzato. Dopo aver effettuato una selezione e aver dedotto lo schema, il caricatore automatico non considera le varianti di maiuscole e minuscole non selezionate in modo coerente con lo schema.

Quando colonna di dati salvata è abilitata, i campi denominati in un caso diverso da quello dello schema vengono caricati nella colonna _rescued_data. Modificare questo comportamento impostando l'opzione readerCaseSensitive su false, nel qual caso il caricatore automatico legge i dati in modo senza distinzione tra maiuscole e minuscole.

Sovrascrivere l'inferenza dello schema con le indicazioni dello schema

È possibile usare gli hint dello schema per applicare le informazioni sullo schema che si conoscono e si prevedono in uno schema dedotto. Quando si sa che una colonna è di un tipo di dati specifico o se si desidera scegliere un tipo di dati più generale, ad esempio un double anziché un integer, è possibile fornire un numero arbitrario di hint per i tipi di dati di colonna come stringa usando la sintassi della specifica dello schema SQL, ad esempio:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Vedere la documentazione sui tipi di dati per l'elenco dei tipi di dati supportati.

Se una colonna non è presente all'inizio del flusso, è anche possibile usare gli hint dello schema per aggiungere tale colonna allo schema dedotto.

Di seguito è riportato un esempio di schema dedotto per visualizzare il comportamento con gli hint dello schema.

Schema dedotto:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Specificando gli hint dello schema seguenti:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

ottieni:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Nota

Il supporto per gli hint degli schemi di array e mappa è disponibile in Databricks Runtime 9.1 LTS e versioni successive.

Di seguito è riportato un esempio di schema dedotto con tipi di dati complessi per visualizzare il comportamento con gli hint dello schema.

Schema dedotto:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Specificando gli hint dello schema seguenti:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

ottieni:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Nota

Gli hint dello schema vengono usati solo se si non fornire uno schema al caricatore automatico. È possibile usare gli hint per lo schema sia che cloudFiles.inferColumnTypes sia abilitato o disabilitato.