Usar o feed de dados de alteração do Delta Lake no Azure Databricks
O feed de dados de alteração permite que o Azure Databricks rastreie alterações no nível da linha entre versões de uma tabela Delta. Quando habilitado em uma tabela Delta, os registros de tempo de execução alteram eventos para todos os dados gravados na tabela. Isso inclui os dados da linha juntamente com metadados que indicam se a linha especificada foi inserida, excluída ou atualizada.
Importante
O feed de dados de alteração funciona em conjunto com o histórico de tabelas para fornecer informações de alteração. Como a clonagem de uma tabela Delta cria um histórico separado, o feed de dados de alteração em tabelas clonadas não corresponde ao da tabela original.
Processar dados de alteração incrementalmente
O Databricks recomenda o uso do feed de dados de alteração em combinação com o Streaming Estruturado para processar incrementalmente as alterações das tabelas Delta. Você deve usar o Streaming Estruturado para Azure Databricks para controlar automaticamente as versões do feed de dados de alteração da tabela.
Nota
Delta Live Tables fornece funcionalidade para fácil propagação de dados de alteração e armazenamento de resultados como tabelas SCD (dimensão de mudança lenta) tipo 1 ou tipo 2. Consulte As APIs APPLY CHANGES: Simplifique a captura de dados de alteração com o Delta Live Tables.
Para ler o feed de dados de alteração de uma tabela, você deve habilitar o feed de dados de alteração nessa tabela. Consulte Ativar feed de dados de alteração.
Defina a opção readChangeFeed
como true
ao configurar um fluxo em relação a uma tabela para ler o feed de dados de alteração, conforme mostrado no exemplo de sintaxe a seguir:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
Por padrão, o fluxo retorna o instantâneo mais recente da tabela quando o fluxo é iniciado pela primeira vez como um INSERT
e alterações futuras como dados de alteração.
Os dados de alteração são confirmados como parte da transação Delta Lake e ficam disponíveis ao mesmo tempo em que os novos dados são confirmados na tabela.
Opcionalmente, você pode especificar uma versão inicial. Consulte Devo especificar uma versão inicial?.
O feed de dados de alteração também suporta a execução em lote, o que requer a especificação de uma versão inicial. Consulte Ler alterações em consultas em lote.
Opções como limites de taxa (maxFilesPerTrigger
, maxBytesPerTrigger
) e excludeRegex
também são suportadas ao ler dados de alteração.
O limite de taxa pode ser atômico para versões diferentes da versão inicial do snapshot. Ou seja, toda a versão de confirmação será limitada ou toda a confirmação será retornada.
Devo especificar uma versão inicial?
Opcionalmente, você pode especificar uma versão inicial se quiser ignorar as alterações que aconteceram antes de uma versão específica. Você pode especificar uma versão usando um carimbo de data/hora ou o número de ID da versão registrado no log de transações Delta.
Nota
Uma versão inicial é necessária para leituras em lote, e muitos padrões de lote podem se beneficiar da configuração de uma versão final opcional.
Ao configurar cargas de trabalho de Streaming Estruturado envolvendo feed de dados de alteração, é importante entender como a especificação de uma versão inicial afeta o processamento.
Muitas cargas de trabalho de streaming, especialmente novos pipelines de processamento de dados, se beneficiam do comportamento padrão. Com o comportamento padrão, o primeiro lote é processado quando o fluxo registra pela primeira vez todos os registros existentes na tabela como INSERT
operações no feed de dados de alteração.
Se a tabela de destino já contiver todos os registros com alterações apropriadas até um determinado ponto, especifique uma versão inicial para evitar o processamento do estado da tabela de origem como INSERT
eventos.
O exemplo a seguir recupera a sintaxe de uma falha de streaming na qual o ponto de verificação foi corrompido. Neste exemplo, suponha as seguintes condições:
- O feed de dados de alteração foi habilitado na tabela de origem na criação da tabela.
- A tabela de destino a jusante processou todas as alterações até à versão 75, inclusive.
- O histórico de versões da tabela de origem está disponível para as versões 70 e superiores.
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
Neste exemplo, você também deve especificar um novo local de ponto de verificação.
Importante
Se você especificar uma versão inicial, o fluxo não será iniciado a partir de um novo ponto de verificação se a versão inicial não estiver mais presente no histórico da tabela. O Delta Lake limpa as versões históricas automaticamente, o que significa que todas as versões iniciais especificadas são eventualmente excluídas.
Consulte Posso usar o feed de dados de alteração para reproduzir todo o histórico de uma tabela?.
Ler alterações em consultas em lote
Você pode usar a sintaxe de consulta em lote para ler todas as alterações a partir de uma versão específica ou para ler as alterações dentro de um intervalo especificado de versões.
Você especifica uma versão como um inteiro e um carimbo de data/hora como uma cadeia de caracteres no formato yyyy-MM-dd[ HH:mm:ss[.SSS]]
.
As versões inicial e final são incluídas nas consultas. Para ler as alterações de uma determinada versão inicial para a versão mais recente da tabela, especifique apenas a versão inicial.
Se você fornecer uma versão inferior ou um carimbo de data/hora mais antigo do que um que registrou eventos de alteração, ou seja, quando o feed de dados de alteração foi habilitado, um erro será lançado indicando que o feed de dados de alteração não estava habilitado.
Os exemplos de sintaxe a seguir demonstram o uso de opções de versão inicial e final com leituras em lote:
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
Python
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Nota
Por padrão, se um usuário passar em uma versão ou carimbo de data/hora excedendo a última confirmação em uma tabela, o erro timestampGreaterThanLatestCommit
será lançado. No Databricks Runtime 11.3 LTS e superior, o feed de dados de alteração pode lidar com o caso da versão fora do intervalo se o usuário definir a seguinte configuração como true
:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Se você fornecer uma versão inicial maior que a última confirmação em uma tabela ou um carimbo de data/hora de início mais recente que a última confirmação em uma tabela, quando a configuração anterior estiver habilitada, um resultado de leitura vazio será retornado.
Se você fornecer uma versão final maior que a última confirmação em uma tabela ou um carimbo de data/hora final mais recente que a última confirmação em uma tabela, quando a configuração anterior estiver habilitada no modo de leitura em lote, todas as alterações entre a versão inicial e a última confirmação serão retornadas.
Qual é o esquema para o feed de dados de alteração?
Quando você lê a partir do feed de dados de alteração de uma tabela, o esquema para a versão mais recente da tabela é usado.
Nota
A maioria das operações de mudança e evolução de esquema são totalmente suportadas. A tabela com mapeamento de colunas ativado não suporta todos os casos de uso e demonstra um comportamento diferente. Consulte Alterar limitações do feed de dados para tabelas com mapeamento de colunas habilitado.
Além das colunas de dados do esquema da tabela Delta, o feed de dados de alteração contém colunas de metadados que identificam o tipo de evento de alteração:
Nome da coluna | Type | Valores |
---|---|---|
_change_type |
String | insert , update_preimage , update_postimage , ( delete 1) |
_commit_version |
Longo | A versão de log ou tabela Delta que contém a alteração. |
_commit_timestamp |
Carimbo de Data/Hora | O carimbo de data/hora associado quando a confirmação foi criada. |
(1) preimage
é o valor antes da atualização, postimage
é o valor após a atualização.
Nota
Não é possível habilitar a alteração do feed de dados em uma tabela se o esquema contiver colunas com os mesmos nomes dessas colunas adicionadas. Renomeie colunas na tabela para resolver esse conflito antes de tentar habilitar o feed de dados de alteração.
Ativar feed de dados de alteração
Você só pode ler o feed de dados de alteração para tabelas habilitadas. Você deve habilitar explicitamente a opção de feed de dados de alteração usando um dos seguintes métodos:
Nova tabela: defina a propriedade
delta.enableChangeDataFeed = true
table noCREATE TABLE
comando.CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
Tabela existente: defina a propriedade
delta.enableChangeDataFeed = true
table noALTER TABLE
comando.ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Todas as novas tabelas:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Importante
Somente as alterações feitas depois que você habilita o feed de dados de alteração são registradas. As alterações anteriores em uma tabela não são capturadas.
Alterar o armazenamento de dados
Habilitar o feed de dados de alteração causa um pequeno aumento nos custos de armazenamento de uma tabela. Os registros de dados de alteração são gerados à medida que a consulta é executada e geralmente são muito menores do que o tamanho total dos arquivos regravados.
O Azure Databricks registra dados de alteração para UPDATE
, DELETE
e MERGE
operações na _change_data
pasta sob o diretório da tabela. Algumas operações, como operações somente inserção e exclusões de partição completa, não geram dados no diretório porque o _change_data
Azure Databricks pode calcular com eficiência o feed de dados de alteração diretamente do log de transações.
Todas as leituras em arquivos de dados na _change_data
pasta devem passar por APIs Delta Lake suportadas.
Os arquivos na pasta seguem a _change_data
política de retenção da tabela. Os dados do feed de dados de alteração são excluídos quando o VACUUM
comando é executado.
Posso usar o feed de dados de alteração para reproduzir todo o histórico de uma mesa?
O feed de dados de alteração não se destina a servir como um registro permanente de todas as alterações em uma tabela. O feed de dados de alteração só regista as alterações que ocorrem depois de ativado.
O feed de dados de alteração e o Delta Lake permitem que você sempre reconstrua um instantâneo completo de uma tabela de origem, o que significa que você pode iniciar uma nova leitura de streaming em relação a uma tabela com o feed de dados de alteração habilitado e capturar a versão atual dessa tabela e todas as alterações que ocorrerem depois.
Você deve tratar os registros no feed de dados de alteração como transitórios e acessíveis apenas para uma janela de retenção especificada. O log de transações Delta remove as versões da tabela e suas versões de feed de dados de alteração correspondentes em intervalos regulares. Quando uma versão é removida do log de transações, você não pode mais ler o feed de dados de alteração dessa versão.
Se o seu caso de uso exigir a manutenção de um histórico permanente de todas as alterações em uma tabela, você deve usar a lógica incremental para gravar registros do feed de dados de alteração em uma nova tabela. O exemplo de código a seguir demonstra o uso trigger.AvailableNow
do , que aproveita o processamento incremental do Structured Streaming, mas processa os dados disponíveis como uma carga de trabalho em lote. Você pode agendar essa carga de trabalho de forma assíncrona com seus pipelines de processamento principais para criar um backup do feed de dados de alteração para fins de auditoria ou replayability total.
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Alterar limitações de feed de dados para tabelas com mapeamento de colunas habilitado
Com o mapeamento de colunas habilitado em uma tabela Delta, você pode soltar ou renomear colunas na tabela sem reescrever arquivos de dados para dados existentes. Com o mapeamento de colunas habilitado, o feed de dados de alteração tem limitações depois de executar alterações de esquema não aditivas, como renomear ou soltar uma coluna, alterar o tipo de dados ou alterações de anulabilidade.
Importante
- Não é possível ler o feed de dados de alteração para uma transação ou intervalo no qual ocorre uma alteração de esquema não aditiva usando semântica em lote.
- No Databricks Runtime 12.2 LTS e inferior, as tabelas com mapeamento de colunas habilitado que sofreram alterações de esquema não aditivas não suportam leituras de streaming no feed de dados de alteração. Consulte Transmissão em fluxo com mapeamento de colunas e alterações de esquema.
- No Databricks Runtime 11.3 LTS e abaixo, não é possível ler o feed de dados de alteração para tabelas com mapeamento de colunas habilitado que sofreram renomeação ou queda de coluna.
No Databricks Runtime 12.2 LTS e superior, você pode executar leituras em lote no feed de dados de alteração para tabelas com mapeamento de colunas habilitado que sofreram alterações de esquema não aditivas. Em vez de usar o esquema da versão mais recente da tabela, as operações de leitura usam o esquema da versão final da tabela especificada na consulta. As consultas ainda falham se o intervalo de versão especificado abranger uma alteração de esquema não aditiva.