Atualizar o esquema de tabelas do Delta Lake
O Delta Lake permite-lhe atualizar o esquema de uma tabela. São suportados os seguintes tipos de alterações:
- Adicionar novas colunas (em posições arbitrárias)
- Reordenar colunas existentes
- Renomeando colunas existentes
Pode efetuar estas alterações explicitamente através de DDL ou implicitamente através de DML.
Importante
Uma atualização para um esquema de tabela Delta é uma operação que entra em conflito com todas as operações de gravação Delta simultâneas.
Quando actualiza um esquema de tabela Delta, os fluxos que lêem a partir desta tabela terminam. Se quiser que o fluxo continue, tem de reiniciá-lo. Para obter os métodos recomendados, consulte Considerações de produção para streaming estruturado.
Atualizar explicitamente o esquema para adicionar colunas
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Por padrão, a anulabilidade é true
.
Para adicionar uma coluna a um campo aninhado, use:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
Por exemplo, se o esquema antes da execução ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
for:
- root
| - colA
| - colB
| +-field1
| +-field2
O esquema seguinte é:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
Nota
A adição de colunas aninhadas é suportada apenas para structs. Não há suporte para matrizes e mapas.
Atualizar explicitamente o esquema para alterar o comentário ou a ordenação das colunas
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Para alterar uma coluna em um campo aninhado, use:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
Por exemplo, se o esquema antes da execução ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST
for:
- root
| - colA
| - colB
| +-field1
| +-field2
O esquema seguinte é:
- root
| - colA
| - colB
| +-field2
| +-field1
Atualizar explicitamente o esquema para substituir colunas
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
Por exemplo, ao executar a seguinte DDL:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
se o esquema anterior for:
- root
| - colA
| - colB
| +-field1
| +-field2
O esquema seguinte é:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
Atualizar explicitamente o esquema para renomear colunas
Nota
Esse recurso está disponível no Databricks Runtime 10.4 LTS e superior.
Para renomear colunas sem reescrever nenhum dos dados existentes das colunas, você deve habilitar o mapeamento de colunas para a tabela. Consulte Renomear e soltar colunas com o mapeamento de colunas Delta Lake.
Para renomear uma coluna:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
Para renomear um campo aninhado:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
Por exemplo, quando você executa o seguinte comando:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
Se o esquema anterior for:
- root
| - colA
| - colB
| +-field1
| +-field2
Em seguida, o esquema seguinte é:
- root
| - colA
| - colB
| +-field001
| +-field2
Consulte Renomear e soltar colunas com o mapeamento de colunas Delta Lake.
Atualizar explicitamente o esquema para soltar colunas
Nota
Esse recurso está disponível no Databricks Runtime 11.3 LTS e superior.
Para soltar colunas como uma operação somente de metadados sem reescrever nenhum arquivo de dados, você deve habilitar o mapeamento de colunas para a tabela. Consulte Renomear e soltar colunas com o mapeamento de colunas Delta Lake.
Importante
Soltar uma coluna dos metadados não exclui os dados subjacentes da coluna nos arquivos. Para limpar os dados da coluna descartada, você pode usar REORG TABLE para reescrever arquivos. Em seguida, você pode usar VACUUM para excluir fisicamente os arquivos que contêm os dados da coluna descartada.
Para soltar uma coluna:
ALTER TABLE table_name DROP COLUMN col_name
Para soltar várias colunas:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
Atualizar explicitamente o esquema para alterar o tipo ou o nome da coluna
Você pode alterar o tipo ou o nome de uma coluna ou soltá-la reescrevendo a tabela. Para fazer isso, use a overwriteSchema
opção.
O exemplo a seguir mostra a alteração de um tipo de coluna:
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
O exemplo a seguir mostra a alteração do nome de uma coluna:
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)
Habilitar a evolução do esquema
Você pode habilitar a evolução do esquema seguindo um destes procedimentos:
- Defina o
.option("mergeSchema", "true")
como um DataFramewrite
ouwriteStream
operação do Spark. Consulte Habilitar evolução de esquema para gravações para adicionar novas colunas. - Use
MERGE WITH SCHEMA EVOLUTION
a sintaxe. Consulte Sintaxe de evolução do esquema para mesclagem. - Defina o conf
spark.databricks.delta.schema.autoMerge.enabled
Spark paratrue
o SparkSession atual.
O Databricks recomenda habilitar a evolução do esquema para cada operação de gravação em vez de definir um conf do Spark.
Quando você usa opções ou sintaxe para habilitar a evolução do esquema em uma operação de gravação, isso tem precedência sobre a conf do Spark.
Nota
Não há cláusula de evolução de esquema para INSERT INTO
instruções.
Habilitar a evolução do esquema para gravações para adicionar novas colunas
As colunas que estão presentes na consulta de origem, mas ausentes da tabela de destino, são adicionadas automaticamente como parte de uma transação de gravação quando a evolução do esquema está habilitada. Consulte Ativar evolução do esquema.
As maiúsculas e minúsculas são preservadas ao acrescentar uma nova coluna. Novas colunas são adicionadas ao final do esquema da tabela. Se as colunas adicionais estiverem em uma struct, elas serão acrescentadas ao final da struct na tabela de destino.
O exemplo a seguir demonstra o uso da opção com o mergeSchema
Auto Loader. Consulte O que é Auto Loader?.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
O exemplo a seguir demonstra o uso da mergeSchema
opção com uma operação de gravação em lote:
(spark.read
.table(source_table)
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("table_name")
)
Evolução automática do esquema para a fusão Delta Lake
A evolução do esquema permite que os usuários resolvam incompatibilidades de esquema entre a tabela de destino e de origem na mesclagem. Trata dos dois casos seguintes:
- Uma coluna na tabela de origem não está presente na tabela de destino. A nova coluna é adicionada ao esquema de destino e seus valores são inseridos ou atualizados usando os valores de origem.
- Uma coluna na tabela de destino não está presente na tabela de origem. O esquema de destino é deixado inalterado; Os valores na coluna de destino adicional são mantidos inalterados (para
UPDATE
) ou definidos comoNULL
(paraINSERT
).
Você deve habilitar manualmente a evolução automática do esquema. Consulte Ativar evolução do esquema.
Nota
No Databricks Runtime 12.2 LTS e superior, as colunas e os campos struct presentes na tabela de origem podem ser especificados pelo nome nas ações de inserção ou atualização. No Databricks Runtime 11.3 LTS e inferior, apenas INSERT *
ações ou UPDATE SET *
podem ser usadas para evolução de esquema com mesclagem.
No Databricks Runtime 13.3 LTS e superior, você pode usar a evolução do esquema com estruturas aninhadas dentro de mapas, como map<int, struct<a: int, b: int>>
.
Sintaxe de evolução do esquema para mesclagem
No Databricks Runtime 15.2 e superior, você pode especificar a evolução do esquema em uma instrução de mesclagem usando APIs de tabela SQL ou Delta:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
Python
from delta.tables import *
(targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
import io.delta.tables._
targetTable
.merge(sourceDF, "source.key = target.key")
.withSchemaEvolution()
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
Exemplo de operações de mesclagem com evolução de esquema
Aqui estão alguns exemplos dos efeitos da operação com e sem evolução do merge
esquema.
Colunas | Consulta (em SQL) | Comportamento sem evolução do esquema (padrão) | Comportamento com evolução do esquema |
---|---|---|---|
Colunas de destino: key, value Colunas de origem: key, value, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
O esquema da tabela permanece inalterado; apenas as colunas key , value são atualizadas/inseridas. |
O esquema da tabela é alterado para (key, value, new_value) . Os registros existentes com correspondências são atualizados com o value e new_value na fonte. Novas linhas são inseridas com o esquema (key, value, new_value) . |
Colunas de destino: key, old_value Colunas de origem: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * |
UPDATE e INSERT as ações geram um erro porque a coluna old_value de destino não está na origem. |
O esquema da tabela é alterado para (key, old_value, new_value) . Os registros existentes com correspondências são atualizados com o new_value na fonte deixando old_value inalterado. Novos registros são inseridos com o especificado key , , e NULL para o old_value new_value . |
Colunas de destino: key, old_value Colunas de origem: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN MATCHED THEN UPDATE SET new_value = s.new_value |
UPDATE lança um erro porque a coluna new_value não existe na tabela de destino. |
O esquema da tabela é alterado para (key, old_value, new_value) . Os registros existentes com correspondências são atualizados com o new_value na fonte deixando old_value inalterado, e registros incomparáveis foram NULL inseridos para new_value . Ver nota (1). |
Colunas de destino: key, old_value Colunas de origem: key, new_value |
MERGE INTO target_table t USING source_table s ON t.key = s.key WHEN NOT MATCHED THEN INSERT (key, new_value) VALUES (s.key, s.new_value) |
INSERT lança um erro porque a coluna new_value não existe na tabela de destino. |
O esquema da tabela é alterado para (key, old_value, new_value) . Novos registros são inseridos com o especificado key , , e NULL para o old_value new_value . Os registos existentes foram NULL introduzidos para new_value permanecerem old_value inalterados. Ver nota (1). |
(1) Esse comportamento está disponível no Databricks Runtime 12.2 LTS e superior; Databricks Runtime 11.3 LTS e abaixo erro nesta condição.
Excluir colunas com mesclagem Delta Lake
No Databricks Runtime 12.2 LTS e superior, você pode usar EXCEPT
cláusulas em condições de mesclagem para excluir explicitamente colunas. O comportamento da palavra-chave varia dependendo se a EXCEPT
evolução do esquema está habilitada ou não.
Com a evolução do esquema desativada, a EXCEPT
palavra-chave se aplica à lista de colunas na tabela de destino e permite excluir colunas de UPDATE
ou INSERT
ações. As colunas excluídas são definidas como null
.
Com a evolução do esquema habilitada, a EXCEPT
palavra-chave se aplica à lista de colunas na tabela de origem e permite excluir colunas da evolução do esquema. Uma nova coluna na origem que não está presente no destino não é adicionada ao esquema de destino se estiver listada EXCEPT
na cláusula. As colunas excluídas que já estão presentes no destino são definidas como null
.
Os exemplos a seguir demonstram essa sintaxe:
Colunas | Consulta (em SQL) | Comportamento sem evolução do esquema (padrão) | Comportamento com evolução do esquema |
---|---|---|---|
Colunas de destino: id, title, last_updated Colunas de origem: id, title, review, last_updated |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated) |
As linhas correspondentes são atualizadas definindo o last_updated campo para a data atual. Novas linhas são inseridas usando valores para id e title . O campo last_updated excluído está definido como null . O campo review é ignorado porque não está no alvo. |
As linhas correspondentes são atualizadas definindo o last_updated campo para a data atual. O esquema é evoluído para adicionar o campo review . Novas linhas são inseridas usando todos os campos de origem, exceto last_updated o que está definido como null . |
Colunas de destino: id, title, last_updated Colunas de origem: id, title, review, internal_count |
MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET last_updated = current_date() WHEN NOT MATCHED THEN INSERT * EXCEPT (last_updated, internal_count) |
INSERT lança um erro porque a coluna internal_count não existe na tabela de destino. |
As linhas correspondentes são atualizadas definindo o last_updated campo para a data atual. O review campo é adicionado à tabela de destino, mas o internal_count campo é ignorado. As novas linhas inseridas foram last_updated definidas como null . |
Lidando com NullType
colunas em atualizações de esquema
Como o Parquet não oferece suporte a NullType
, NullType
as colunas são descartadas do DataFrame ao gravar em tabelas Delta, mas ainda são armazenadas no esquema. Quando um tipo de dados diferente é recebido para essa coluna, o Delta Lake mescla o esquema com o novo tipo de dados. Se o Delta Lake receber um NullType
para uma coluna existente, o esquema antigo será mantido e a nova coluna será descartada durante a gravação.
NullType
em streaming não é suportado. Como você deve definir esquemas ao usar o streaming, isso deve ser muito raro. NullType
também não é aceite para tipos complexos como ArrayType
e MapType
.
Substituir esquemas de tabelas
Por padrão, a substituição dos dados em uma tabela não substitui o esquema. Ao substituir uma tabela usando mode("overwrite")
sem replaceWhere
, você ainda pode querer substituir o esquema dos dados que estão sendo gravados. Substitua o esquema e o particionamento da tabela definindo a overwriteSchema
opção como true
:
df.write.option("overwriteSchema", "true")
Importante
Não é possível especificar overwriteSchema
como true
ao usar a substituição de partição dinâmica.