Substituir dados seletivamente com o Delta Lake
O Azure Databricks aproveita a funcionalidade Delta Lake para dar suporte a duas opções distintas para substituições seletivas:
- A
replaceWhere
opção substitui atomicamente todos os registros que correspondem a um determinado predicado. - Você pode substituir diretórios de dados com base em como as tabelas são particionadas usando substituições de partição dinâmica.
Para a maioria das operações, o Databricks recomenda o uso replaceWhere
para especificar quais dados devem ser substituídos.
Importante
Se os dados tiverem sido substituídos acidentalmente, você poderá usar restaurar para desfazer a alteração.
Substituição seletiva arbitrária com replaceWhere
Você pode substituir seletivamente apenas os dados que correspondem a uma expressão arbitrária.
Nota
SQL requer Databricks Runtime 12.2 LTS ou superior.
O comando a seguir substitui atomicamente os eventos de janeiro na tabela de destino, que é particionada por start_date
, pelos dados em replace_data
:
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Este código de exemplo grava os dados no , valida que todas as linhas correspondem replace_data
ao predicado e executa uma substituição atômica usando overwrite
semântica. Se quaisquer valores na operação estiverem fora da restrição, essa operação falhará com um erro por padrão.
Você pode alterar esse comportamento para overwrite
valores dentro do intervalo de predicados e insert
registros que estão fora do intervalo especificado. Para fazer isso, desative a verificação de restrição definindo spark.databricks.delta.replaceWhere.constraintCheck.enabled
como false usando uma das seguintes configurações:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Comportamento herdado
Na configuração padrão herdada, replaceWhere
substituía dados que correspondiam a um predicado apenas nas colunas de partição. Com esse modelo herdado, o comando a seguir substituiria atomicamente o mês de janeiro na tabela de destino, que é particionada por date
, com os dados em df
:
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
Se você quiser voltar ao comportamento antigo, você pode desativar o spark.databricks.delta.replaceWhere.dataColumns.enabled
sinalizador:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
Substituições de partição dinâmica
Importante
Esta funcionalidade está em Pré-visualização Pública.
O Databricks Runtime 11.3 LTS e superior suporta modo de substituição de partição dinâmica para tabelas particionadas. Para tabelas com várias partições, o Databricks Runtime 11.3 LTS e inferior só suporta substituições de partições dinâmicas se todas as colunas de partição forem do mesmo tipo de dados.
Quando no modo de substituição de partição dinâmica, as operações substituem todos os dados existentes em cada partição lógica para a qual a gravação executa novos dados. Todas as partições lógicas existentes para as quais a gravação não contém dados permanecem inalteradas. Esse modo só é aplicável quando os dados estão sendo gravados no modo de substituição: INSERT OVERWRITE
em SQL ou em uma gravação DataFrame com df.write.mode("overwrite")
.
Configure o modo de substituição dinâmica de partições ajustando a configuração da sessão do Spark de spark.sql.sources.partitionOverwriteMode
para dynamic
. Você também pode habilitar isso definindo a DataFrameWriter
opção partitionOverwriteMode
como dynamic
. Se presente, a opção específica da consulta substitui o modo definido na configuração da sessão. O padrão para partitionOverwriteMode
é static
.
Importante
Valide que os dados gravados com substituição de partição dinâmica afetam apenas as partições esperadas. Uma única linha na partição incorreta pode levar à substituição involuntária de uma partição inteira.
O exemplo a seguir demonstra o uso de substituições de partição dinâmica:
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Nota
- A substituição dinâmica de partições entra em conflito com a opção
replaceWhere
para tabelas particionadas.- Se a substituição de partição dinâmica estiver habilitada na configuração de sessão do Spark e
replaceWhere
for fornecida como uma opçãoDataFrameWriter
, o Delta Lake substituirá os dados de acordo com a expressãoreplaceWhere
(opções específicas de consulta substituem as configurações de sessão). - Você receberá um erro se as opções de
DataFrameWriter
tiverem tanto a substituição de partição dinâmica comoreplaceWhere
habilitadas.
- Se a substituição de partição dinâmica estiver habilitada na configuração de sessão do Spark e
- Não é possível especificar
overwriteSchema
comotrue
ao usar a substituição de partição dinâmica.