Condividi tramite


Configurare l'inferenza e l'evoluzione nel caricatore automatico schema

È possibile configurare il caricatore automatico per rilevare automaticamente il schema dei dati caricati, consentendo di inizializzare tables senza dichiarare esplicitamente il schema ed evolvere il tableschema man mano che vengono introdotti nuovi columns. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche schema nel tempo.

Il caricatore automatico può anche "salvare" dati imprevisti (ad esempio, di tipi di dati diversi) in un BLOB JSON column, a cui è possibile accedere in un secondo momento usando le API di accesso ai dati semistrutturati .

I seguenti formati sono supportati per l'inferenza e l'evoluzione schema.

Formato di file Versioni supportate
JSON Tutte le versioni
CSV Tutte le versioni
XML Databricks Runtime 14.3 LTS e versioni successive
Avro Databricks Runtime 10.4 LTS e versioni successive
Parquet Databricks Runtime 11.3 LTS e versioni successive
ORC Non supportato
Text Non applicabile (fixed-schema)
Binaryfile Non applicabile (fixed-schema)

Sintassi inferenziale ed evolutiva per schema

Specificando una directory di destinazione per l'opzione cloudFiles.schemaLocation, si abilita l'inferenza e l'evoluzione di schema. È possibile scegliere di usare la stessa directory specificata per checkpointLocation. Se si usa Delta Live Tables, Azure Databricks gestisce automaticamente le altre informazioni sul checkpoint e la posizione schema.

Nota

Se sono state caricate più di un'origine dati nel target table, ogni processo di inserimento tramite Auto Loader richiede un checkpoint di streaming separato.

Nell'esempio seguente viene parquet usato per .cloudFiles.format Usare csv, avroo json per altre origini file. Tutte le altre impostazioni per la lettura e la scrittura rimangono invariate per i comportamenti predefiniti per ogni formato.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Come funziona l'inferenza di Auto Loader schema?

Per dedurre il schema durante la prima lettura dei dati, il caricatore automatico esegue l'esempio dei primi 50 GB o 1000 file individuati, a qualsiasi limit venga superato per primo. Il caricatore automatico archivia le informazioni di schema in una directory _schemas nel cloudFiles.schemaLocation configurato per tenere traccia delle modifiche schema ai dati di input nel tempo.

Nota

Per modificare le dimensioni dell'esempio usato, è possibile set le configurazioni SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(stringa di byte, ad esempio 10gb)

e

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(integer)

Per impostazione predefinita, l'inferenza del caricatore automatico schema cerca di evitare problemi di evoluzione schema a causa di mancate corrispondenze del tipo. Per i formati che non codificano i tipi di dati (JSON, CSV e XML), il caricatore automatico deduce tutti i columns come stringhe (inclusi i campi annidati nei file JSON). Per i formati con schema tipizzato (Parquet e Avro), il caricatore automatico esegue il campionamento di un sottoinsieme di file e fonde gli schemi dei singoli file. Questo comportamento è riepilogato nei tableseguenti:

Formato di file Tipo di dati dedotto predefinito
JSON String
CSV String
XML String
Avro Tipi codificati in Avro schema
Parquet Tipi codificati in Parquet schema

Apache Spark DataFrameReader utilizza un comportamento diverso per l'inferenza di schema, selezionando i tipi di dati per columns nelle origini JSON, CSV e XML basandosi sui dati di esempio. Per abilitare questo comportamento con il caricatore automatico, set l'opzione cloudFiles.inferColumnTypes per true.

Nota

Quando si deduce il schema per i dati CSV, il caricatore automatico presuppone che i file contengano intestazioni. Se i file CSV non contengono intestazioni, fornire l'opzione .option("header", "false"). Inoltre, Auto Loader unisce gli schemi di tutti i file nel campione per ottenere uno schema globale schema. Il caricatore automatico può quindi leggere ogni file in base all'intestazione e analizzare correttamente il file CSV.

Nota

Quando un column ha tipi di dati diversi in due file Parquet, Auto Loader sceglie il tipo più ampio. È possibile usare schemaHints per eseguire l'override di questa scelta. Quando si specificano hint schema, Auto Loader non esegue il cast di column nel tipo specificato, ma indica al lettore Parquet di leggere column come corrispondente al tipo specificato. In caso di mancata corrispondenza, il column viene recuperato nei dati salvati column.

Come funziona l'evoluzione del caricatore automatico schema?

Il sistema di caricamento automatico rileva l'aggiunta di nuovi columns durante l'elaborazione dei tuoi dati. Quando l'Auto Loader rileva un nuovo column, il flusso si arresta con un UnknownFieldException. Prima che il tuo flusso generi questo errore, Auto Loader esegue un'inferenza schema sul micro-batch di dati più recente e aggiorna la posizione schema con l'ultimo schema aggiungendo nuovi columns alla fine del schema. I tipi di dati di columns esistenti rimangono invariati.

Databricks consiglia di configurare i flussi del Caricatore Automatico con Jobs di Databricks per riavviarsi automaticamente dopo tali modifiche schema.

Il caricatore automatico supporta le seguenti modalità per l'evoluzione schema, che puoi set nell'opzione cloudFiles.schemaEvolutionMode:

Modalità Comportamento durante la lettura di nuovi column
addNewColumns (predefinito) Flusso non riesce. I nuovi columns vengono aggiunti al schema. I columns esistenti non modificano i tipi di dati.
rescue Schema non viene mai evoluto e il flusso non fallisce a causa delle modifiche di schema. Tutte le nuove columns vengono registrate nei dati recuperati column.
failOnNewColumns Flusso non riesce. Stream non si riavvia a meno che il schema specificato non venga aggiornato o il file di dati problematico venga rimosso.
none Non si evolve il schema, le nuove columns vengono ignorate e i dati non vengono recuperati a meno che l'opzione rescuedDataColumn non sia set. Lo stream non fallisce a causa delle modifiche schema.

Nota

La modalità addNewColumns è quella predefinita quando non viene specificata una schema, ma la modalità none viene utilizzata di default quando è fornita una schema. addNewColumns non è consentito quando viene fornito il schema del flusso, ma funziona se si specifica il schema come hint schema.

Come funzionano le partizioni con l'Autoloader?

Auto Loader tenta di dedurre partitioncolumns dalla struttura di directory sottostante dei dati se i dati sono disposti secondo il partizionamento in stile Hive. Ad esempio, il percorso del file base_path/event=click/date=2021-04-01/f0.json comporta l'inferenza di date e event come partitioncolumns. Se la struttura di directory sottostante contiene partizioni Hive in conflitto o non contiene il partizionamento dello stile Hive, partitioncolumns vengono ignorati.

Il file binario (binaryFile) e i formati di file text hanno schemi di dati fissi, ma supportano l'inferenza partitioncolumn. Databricks consiglia di impostare cloudFiles.schemaLocation per questi formati di file. In questo modo si evitano potenziali errori o perdite di informazioni e si impedisce l'inferenza delle partizioni columns ogni volta che inizia un caricatore automatico.

Partition columns non sono considerati per l'evoluzione di schema. Se si dispone di una struttura di directory iniziale come base_path/event=click/date=2021-04-01/f0.jsone quindi di iniziare a ricevere nuovi file come base_path/event=click/date=2021-04-01/hour=01/f1.json, Auto Loader ignora l'orario column. Per acquisire informazioni sui nuovi partitioncolumns, setcloudFiles.partitionColumns fino a event,date,hour.

Nota

L'opzione cloudFiles.partitionColumns accetta un list delimitato da virgole di nomi di column. Solo i columns che esistono in coppie di key=value nella tua struttura di directory vengono analizzati.

Quali sono i dati salvati column?

Quando il caricatore automatico deduce il schema, i dati salvati column vengono automaticamente aggiunti al schema come _rescued_data. È possibile rinominare il column o includerlo nei casi where si fornisce un schema impostando l'opzione rescuedDataColumn.

I dati salvati column assicurano che columns che non corrispondono al schema vengano salvati invece di essere eliminati. I dati salvati column contengono dati che non vengono analizzati per i motivi seguenti:

  • Il column non è presente nel schema.
  • Mancata corrispondenza del tipo.
  • Mancata corrispondenza tra maiuscole e minuscole.

I dati salvati column contengono un codice JSON contenente il columns salvato e il percorso del file di origine del record.

Nota

I parser JSON e CSV supportano tre modalità durante l'analisi dei record: PERMISSIVE, DROPMALFORMEDe FAILFAST. Se usato insieme a rescuedDataColumn, le mancate corrispondenze del tipo di dati non causano l'esclusione dei record in modalità DROPMALFORMED oppure generano un errore in modalità FAILFAST. Solo i record danneggiati vengono eliminati o generati errori, ad esempio JSON incompleto o in formato non valido o CSV. Se si usa durante l'analisi badRecordsPath di JSON o CSV, le mancate corrispondenze del tipo di dati non vengono considerate come record non validi quando si usa .rescuedDataColumn Solo i record JSON o CSV incompleti e in formato non valido vengono archiviati in badRecordsPath.

Modificare il comportamento con distinzione tra maiuscole e minuscole

A meno che non sia abilitata la distinzione tra maiuscole e minuscole, l'columns,abc, Abce ABC vengono considerati gli stessi column ai fini dell'inferenza schema. Il caso scelto è arbitrario e dipende dai dati campionati. È possibile usare schema hint per applicare il caso da usare. Dopo che è stata effettuata una selezione e il schema è stato dedotto, Auto Loader non considera le varianti di maiuscole e minuscole non selezionate coerenti con il schema.

Quando è abilitata la column dei dati salvati, i campi denominati in un caso diverso da quello del schema vengono caricati nella _rescued_datacolumn. Modificare questo comportamento impostando l'opzione readerCaseSensitive su false, nel qual caso il caricatore automatico legge i dati in modo senza distinzione tra maiuscole e minuscole.

eseguire l'override dell'inferenza schema con i suggerimenti schema

È possibile usare schema hint per applicare le informazioni schema che si conoscono e si prevedono in un schemadedotto. Quando si è certi che un column è di un tipo di dati specifico o se si desidera scegliere un tipo di dati più generale, ad esempio un double anziché un integer, è possibile fornire un numero arbitrario di hint per i tipi di dati column come stringa usando la sintassi delle specifiche di SQL schema, ad esempio:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Consultare la documentazione sui tipi di dati per list dei tipi di dati supportati.

Se un column non è presente all'inizio del flusso, puoi anche usare i suggerimenti schema per aggiungere column all'schemadedotto.

Di seguito è riportato un esempio di schema dedotto per visualizzare il comportamento con hint schema.

schemadedotto:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Specificando i seguenti suggerimenti schema:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

tu get:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Nota

Il supporto degli hint schema per array e mappa è disponibile in Databricks Runtime 9.1 LTS e versioni successive.

Di seguito è riportato un esempio di schema dedotto con tipi di dati complessi per visualizzare il comportamento con hint schema.

schemadedotto:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Specificando i seguenti suggerimenti schema:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

tu get:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Nota

Schema hint vengono usati solo se si non fornire un schema al caricatore automatico. È possibile usare il suggerimento schema indipendentemente dal fatto che cloudFiles.inferColumnTypes sia abilitato o disabilitato.