Condividi tramite


Aggiornare lo schema della tabella Delta Lake

Delta Lake consente di aggiornare lo schema di una tabella. Sono supportati i tipi seguenti di modifiche:

  • Aggiunta di nuove colonne (in posizioni arbitrarie)
  • Riordinare le colonne esistenti
  • Ridenominazione delle colonne esistenti

È possibile apportare queste modifiche in modo esplicito usando DDL o in modo implicito usando DML.

Importante

Un aggiornamento di uno schema di tabella Delta è un'operazione in conflitto con tutte le operazioni di scrittura Delta simultanee.

Quando si aggiorna uno schema della tabella Delta, i flussi letti da tale tabella terminano. Se si vuole che lo streaming continui, è necessario riavviarlo. Per i metodi consigliati, vedere Considerazioni sulla produzione per Structured Streaming.

Aggiornare in modo esplicito lo schema per aggiungere colonne

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Per impostazione predefinita, il supporto dei valori Null è true.

Per aggiungere una colonna a un campo annidato, usare:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Ad esempio, se lo schema prima dell'esecuzione ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) è:

- root
| - colA
| - colB
| +-field1
| +-field2

lo schema dopo è:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Nota

L'aggiunta di colonne annidate è supportata solo per gli struct. Le matrici e le mappe non sono supportate.

Aggiornare in modo esplicito lo schema per modificare il commento o l'ordinamento delle colonne

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Per modificare una colonna in un campo annidato, usare:

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Ad esempio, se lo schema prima dell'esecuzione ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST è:

- root
| - colA
| - colB
| +-field1
| +-field2

lo schema dopo è:

- root
| - colA
| - colB
| +-field2
| +-field1

Aggiornare in modo esplicito lo schema per sostituire le colonne

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Ad esempio, quando si esegue il DDL seguente:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

se lo schema prima è:

- root
| - colA
| - colB
| +-field1
| +-field2

lo schema dopo è:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

Aggiornare in modo esplicito lo schema per rinominare le colonne

Nota

Questa funzionalità è disponibile in Databricks Runtime 10.4 LTS e versioni successive.

Per rinominare le colonne senza riscrivere i dati esistenti delle colonne, è necessario abilitare il mapping delle colonne per la tabella. Si veda Rinominare ed eliminare le colonne con mapping di colonne Delta Lake.

Per rinominare una colonna:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

Per rinominare un campo annidato:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

Ad esempio, quando si esegue il comando seguente:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Se lo schema prima è:

- root
| - colA
| - colB
| +-field1
| +-field2

Lo schema dopo è quindi:

- root
| - colA
| - colB
| +-field001
| +-field2

Si veda Rinominare ed eliminare le colonne con mapping di colonne Delta Lake.

Aggiornare in modo esplicito lo schema per eliminare le colonne

Nota

Questa funzionalità è disponibile in Databricks Runtime 11.3 LTS e versioni successive.

Per eliminare le colonne come operazione di sola metadati senza riscrivere alcun file di dati, è necessario abilitare il mapping delle colonne per la tabella. Si veda Rinominare ed eliminare le colonne con mapping di colonne Delta Lake.

Importante

L'eliminazione di una colonna dai metadati non comporta l'eliminazione dei dati sottostanti per la colonna nei file. Per eliminare i dati delle colonne eliminate, è possibile usare REORG TABLE per riscrivere i file. È quindi possibile usare VACUUM per eliminare fisicamente i file che contengono i dati delle colonne eliminate.

Per eliminare una colonna:

ALTER TABLE table_name DROP COLUMN col_name

Per eliminare più colonne:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

Aggiornare in modo esplicito lo schema per modificare il tipo di colonna o il nome

È possibile modificare il tipo o il nome di una colonna o eliminare una colonna riscrivendo la tabella. A tale scopo, usare l'opzione overwriteSchema .

L'esempio seguente illustra la modifica di un tipo di colonna:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

L'esempio seguente mostra la modifica di un nome di colonna:

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Abilitare l'evoluzione dello schema

È possibile abilitare l'evoluzione dello schema eseguendo una delle operazioni seguenti:

Databricks consiglia di abilitare l'evoluzione dello schema per ogni operazione di scrittura anziché impostare una conf Spark.

Quando si usano opzioni o sintassi per abilitare l'evoluzione dello schema in un'operazione di scrittura, questa ha la precedenza sulla conf spark.

Nota

Non esiste alcuna clausola di evoluzione dello schema per INSERT INTO le istruzioni.

Abilitare l'evoluzione dello schema per le scritture per aggiungere nuove colonne

Le colonne presenti nella query di origine ma mancanti nella tabella di destinazione vengono aggiunte automaticamente come parte di una transazione di scrittura quando l'evoluzione dello schema è abilitata. Vedere Abilitare l'evoluzione dello schema.

La distinzione tra maiuscole e minuscole viene mantenuta quando si aggiunge una nuova colonna. Le nuove colonne vengono aggiunte alla fine dello schema della tabella. Se le colonne aggiuntive si trovano in uno struct, vengono aggiunte alla fine dello struct nella tabella di destinazione.

L'esempio seguente illustra l'uso dell'opzione mergeSchema con il caricatore automatico. Vedere Che cos'è l’Autoloader?.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

L'esempio seguente illustra l'uso dell'opzione mergeSchema con un'operazione di scrittura batch:

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Evoluzione automatica dello schema per l'unione delta Lake

L'evoluzione dello schema consente agli utenti di risolvere le mancate corrispondenze dello schema tra la tabella di destinazione e quella di origine in merge. Gestisce i due casi seguenti:

  1. Una colonna nella tabella di origine non è presente nella tabella di destinazione. La nuova colonna viene aggiunta allo schema di destinazione e i relativi valori vengono inseriti o aggiornati usando i valori di origine.
  2. Una colonna nella tabella di destinazione non è presente nella tabella di origine. Lo schema di destinazione rimane invariato; I valori nella colonna di destinazione aggiuntiva vengono lasciati invariati (per UPDATE) o impostati su NULL (per INSERT).

È necessario abilitare manualmente l'evoluzione automatica dello schema. Vedere Abilitare l'evoluzione dello schema.

Nota

In Databricks Runtime 12.2 LTS e versioni successive i campi colonne e struct presenti nella tabella di origine possono essere specificati in base al nome nelle azioni di inserimento o aggiornamento. In Databricks Runtime 11.3 LTS e versioni successive è possibile usare solo INSERT * azioni o UPDATE SET * per l'evoluzione dello schema con merge.

In Databricks Runtime 13.3 LTS e versioni successive è possibile usare l'evoluzione dello schema con struct annidati all'interno delle mappe, ad esempio map<int, struct<a: int, b: int>>.

Sintassi dell'evoluzione dello schema per l'unione

In Databricks Runtime 15.2 e versioni successive è possibile specificare l'evoluzione dello schema in un'istruzione merge usando le API di tabella SQL o Delta:

SQL

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Python

from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Operazioni di esempio di unione con evoluzione dello schema

Ecco alcuni esempi degli effetti dell'operazione merge con e senza evoluzione dello schema.

Colonne Query (in SQL) Comportamento senza evoluzione dello schema (impostazione predefinita) Comportamento con l'evoluzione dello schema
Colonne di destinazione: key, value

Colonne di origine: key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Lo schema della tabella rimane invariato; solo le colonne key, value vengono aggiornate/inserite. Lo schema della tabella viene modificato in (key, value, new_value). I record esistenti con corrispondenze vengono aggiornati con value e new_value nell'origine. Le nuove righe vengono inserite con lo schema (key, value, new_value).
Colonne di destinazione: key, old_value

Colonne di origine: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
UPDATE le azioni e INSERT generano un errore perché la colonna old_value di destinazione non si trova nell'origine. Lo schema della tabella viene modificato in (key, old_value, new_value). I record esistenti con corrispondenze vengono aggiornati con nell'origine new_value lasciando old_value invariati. I nuovi record vengono inseriti con l'oggetto , new_valuee NULL specificato keyper .old_value
Colonne di destinazione: key, old_value

Colonne di origine: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE genera un errore perché la colonna new_value non esiste nella tabella di destinazione. Lo schema della tabella viene modificato in (key, old_value, new_value). I record esistenti con corrispondenze vengono aggiornati con nell'oggetto new_value nell'origine lasciando old_value invariati e i record non corrispondenti sono NULL stati immessi per new_value. Vedere la nota (1).
Colonne di destinazione: key, old_value

Colonne di origine: key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT genera un errore perché la colonna new_value non esiste nella tabella di destinazione. Lo schema della tabella viene modificato in (key, old_value, new_value). I nuovi record vengono inseriti con l'oggetto , new_valuee NULL specificato keyper .old_value I record esistenti sono NULL stati immessi per new_value lasciare old_value invariati. Vedere la nota (1).

(1) Questo comportamento è disponibile in Databricks Runtime 12.2 LTS e versioni successive; Databricks Runtime 11.3 LTS e sotto l'errore in questa condizione.

Escludere colonne con merge Delta Lake

In Databricks Runtime 12.2 LTS e versioni successive è possibile usare EXCEPT le clausole nelle condizioni di merge per escludere in modo esplicito le colonne. Il comportamento della EXCEPT parola chiave varia a seconda che l'evoluzione dello schema sia abilitata o meno.

Con l'evoluzione dello schema disabilitata, la EXCEPT parola chiave si applica all'elenco di colonne nella tabella di destinazione e consente di escludere colonne da UPDATE o INSERT azioni. Le colonne escluse sono impostate su null.

Con l'evoluzione dello schema abilitata, la EXCEPT parola chiave si applica all'elenco di colonne nella tabella di origine e consente di escludere colonne dall'evoluzione dello schema. Una nuova colonna nell'origine non presente nella destinazione non viene aggiunta allo schema di destinazione se è elencata nella EXCEPT clausola . Le colonne escluse già presenti nella destinazione sono impostate su null.

Gli esempi seguenti illustrano questa sintassi:

Colonne Query (in SQL) Comportamento senza evoluzione dello schema (impostazione predefinita) Comportamento con l'evoluzione dello schema
Colonne di destinazione: id, title, last_updated

Colonne di origine: id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
Le righe corrispondenti vengono aggiornate impostando il last_updated campo sulla data corrente. Le nuove righe vengono inserite usando i valori per id e title. Il campo last_updated escluso è impostato su null. Il campo review viene ignorato perché non si trova nella destinazione. Le righe corrispondenti vengono aggiornate impostando il last_updated campo sulla data corrente. Lo schema si è evoluto per aggiungere il campo review. Le nuove righe vengono inserite usando tutti i campi di origine, ad eccezione last_updated del quale è impostato su null.
Colonne di destinazione: id, title, last_updated

Colonne di origine: id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT genera un errore perché la colonna internal_count non esiste nella tabella di destinazione. Le righe corrispondenti vengono aggiornate impostando il last_updated campo sulla data corrente. Il review campo viene aggiunto alla tabella di destinazione, ma il internal_count campo viene ignorato. Le nuove righe inserite sono last_updated impostate su null.

NullType Gestione delle colonne negli aggiornamenti dello schema

Poiché Parquet non supporta NullType, NullType le colonne vengono eliminate dal dataframe durante la scrittura in tabelle Delta, ma vengono comunque archiviate nello schema. Quando viene ricevuto un tipo di dati diverso per tale colonna, Delta Lake unisce lo schema al nuovo tipo di dati. Se Delta Lake riceve un oggetto NullType per una colonna esistente, lo schema precedente viene mantenuto e la nuova colonna viene eliminata durante la scrittura.

NullType in streaming non è supportato. Poiché è necessario impostare gli schemi quando si usa lo streaming, questo dovrebbe essere molto raro. NullType non è inoltre accettato per tipi complessi, ad ArrayType esempio e MapType.

Sostituire lo schema delle tabelle

Per impostazione predefinita, la sovrascrittura dei dati in una tabella non sovrascrive lo schema. Quando si sovrascrive una tabella usando mode("overwrite") senza replaceWhere, è comunque possibile sovrascrivere lo schema dei dati scritti. Sostituire lo schema e il partizionamento della tabella impostando l'opzione overwriteSchema su true:

df.write.option("overwriteSchema", "true")

Importante

Non è possibile specificare overwriteSchema come true quando si usa la sovrascrittura della partizione dinamica.