Compartilhar via


Selecionar um modo de saída para o Streaming Estruturado

Este artigo discute a seleção de um modo de saída para streaming com estado. Somente streams com estado contendo agregações exigem uma configuração de modo de saída.

As junções dão suporte apenas ao modo de saída de acréscimo e o modo de saída não afeta a eliminação de duplicação. Os operadores com estado arbitrários mapGroupsWithState e flatMapGroupsWithState emitem registros usando sua própria lógica personalizada, para que o modo de saída do fluxo não afete seu comportamento.

Para streaming sem estado, todos os modos de saída se comportam da mesma forma.

Para configurar o modo de saída corretamente, é necessário entender streaming com estado, marcas d'água e gatilhos. Veja os artigos a seguir:

O que é o modo de saída?

O modo de saída de uma consulta de Streaming Estruturado determina quais registros os operadores da consulta emitem durante cada gatilho. Os três tipos de registros que podem ser emitidos são:

  • Registra que o processamento futuro não é alterado.
  • Os registros que foram alterados desde o último gatilho.
  • Todos os registros na tabela de estado.

Saber quais tipos de registros emitir é importante para os operadores com estado porque uma linha específica produzida por um operador com estado pode mudar de gatilho para gatilho. Por exemplo, como um operador de agregação de streaming recebe mais linhas para uma janela específica, os valores de agregação dessa janela podem ser alterados entre gatilhos.

Para operadores sem estado, a distinção entre tipos de registro não afeta o comportamento do operador. Os registros que um operador sem estado emite durante um gatilho são sempre os registros de origem processados durante esse gatilho.

Modos de saída disponíveis

Há três modos de saída que informam a um operador quais registros emitir durante um gatilho específico:

Modo de saída Descrição
Modo de acréscimo (padrão) Por padrão, as consultas de streaming são executadas no modo de acréscimo. Nesse modo, os operadores emitem apenas linhas que não são alteradas em gatilhos futuros. Operadores com estado usam a marca d'água para determinar quando isso acontece.
Modo de atualização No modo de atualização, os operadores emitem todas as linhas que foram alteradas durante o gatilho, mesmo que o registro emitido possa ser alterado em um gatilho subsequente.
Modo completo O modo completo funciona apenas com agregações de streaming. No modo completo, todas as linhas resultantes já produzidas pelo operador são emitidas downstream.

Considerações sobre produção

Para muitas operações de streaming com estado, você deve escolher entre os modos de acréscimo e atualização. As seções a seguir descrevem considerações que podem informar sua decisão.

Observação

O modo completo tem algumas aplicações, mas pode ter um desempenho ruim à medida que os dados escalam. O Databricks recomenda usar exibições materializadas para obter garantias semânticas associadas ao modo completo com processamento incremental para muitas operações com estado. Confira Usar exibições materializadas no Databricks SQL.

Semântica do aplicativo

A semântica do aplicativo descreve como os aplicativos downstream usam os dados de streaming.

Se os serviços downstream precisarem executar uma única ação para cada gravação downstream, use o modo de acréscimo na maioria dos casos. Por exemplo, se você tiver um serviço de notificação downstream enviando notificações para cada novo registro gravado no coletor, o modo de acréscimo garante que cada registro seja gravado apenas uma vez. O modo de atualização grava o registro sempre que as informações de estado são alteradas, o que resultaria em várias atualizações.

Se os serviços downstream precisarem de novos resultados, o modo de atualização garantirá que o coletor esteja o mais atualizado possível. Exemplos incluem um modelo de machine learning que lê recursos em tempo real ou um painel de análise que acompanha agregações em tempo real.

Compatibilidade do operador e do coletor

O Streaming Estruturado não dá suporte a todas as operações disponíveis no Apache Spark e não há suporte para algumas operações de streaming em todos os modos de saída. Para obter mais informações sobre as limitações do operador, consulte os documentos de streaming do software de código aberto.

Nem todos os coletores dão suporte a todos os modos de saída. O Delta Lake, que apoia todas as tabelas gerenciadas do Catálogo do Unity, e o Kafka dão suporte a todos os modos de saída. Para obter mais informações sobre a compatibilidade do coletor, consulte os documentos de streaming do software de código aberto.

Latência e custo

O modo de saída afeta quanto tempo deve passar antes de gravar um registro, e a frequência e a quantidade de dados gravados podem afetar os custos associados aos pipelines de streaming.

O modo de acréscimo força os operadores com estado a emitir resultados somente depois que os resultados com estado são finalizados, o que é pelo menos tão longo quanto o atraso da marca d'água. Um atraso de marca d'água de 1 hour no modo de saída de acréscimo significa que seus registros têm pelo menos um atraso de uma hora antes de serem emitidos downstream.

O modo de atualização resulta em uma gravação por gatilho por valor agregado. Se o coletor cobra por gravação por registro, isso pode ser caro se os registros forem atualizados muitas vezes antes que o atraso da marca d'água passe.

Exemplos de configuração

Os exemplos de código a seguir mostram a configuração do modo de saída para atualizações de streaming nas tabelas do Catálogo do Unity:

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Consulte os documentos do software de código aberto para PySpark DataStreamWriter.outputMode ou Scala DataStreamWriter.outputMode.

Exemplo de modos de saída e streaming com estado

O exemplo a seguir visa ajudá-lo a entender como o modo de saída interage com marcas d'água para streaming com estado.

Considere uma agregação de streaming que calcula a receita total gerada a cada hora em uma loja, com um atraso de marca d'água de 15 minutos. O primeiro micro lote processa os seguintes registros:

  • US$ 15 às 14h40
  • US$ 10 às 14h30
  • US$ 30 às 15h10

Neste ponto, a marca d'água do mecanismo é 14h55 porque subtrai 15 minutos (o atraso) do tempo máximo visto (15h10). O operador de agregação de streaming tem o seguinte em seu estado:

  • [2pm, 3pm]: US$ 25
  • [3pm, 4pm]: US$ 30

A tabela a seguir descreve o que aconteceria em cada modo de saída:

Modo de saída Resultado e motivo
Acrescentar O operador de agregação de streaming não emite nada downstream. Isso ocorre porque ambas as janelas podem ser alteradas à medida que novos valores aparecem com um gatilho subsequente: a marca d'água de 14h55 indica que os registros após as 14h55 ainda podem chegar, e esses registros podem cair na janela [2pm, 3pm] ou na janela [3pm, 4pm].
Atualizar O operador emite ambos os registros, pois eles receberam atualizações.
Concluir O operador emite todos os registros.

Agora, suponha que o streaming receba mais um registro:

  • US$ 20 às 15h20

A marca d'água é atualizada para 15h05 porque o mecanismo subtrai 15 minutos a partir das 15h20. Neste ponto, o operador de agregação de streaming tem o seguinte em seu estado:

  • [2pm, 3pm]: US$ 25
  • [3pm, 4pm]: US$ 50

A tabela a seguir descreve o que aconteceria em cada modo de saída:

Modo de saída Resultado e motivo
Acrescentar O operador de agregação de streaming observa que a marca d'água das 15h05 é maior que o final da janela [2pm, 3pm]. Pela definição da marca d'água, essa janela não pode mais ser alterada, então ela emite a janela [2pm, 3pm].
Atualizar O operador de agregação de streaming emite a janela [3pm, 4pm] porque o valor do estado foi alterado de US$ 30 para US$ 50.
Concluir O operador emite todos os registros.

O seguinte resume como os operadores com estado se comportam em cada modo de acréscimo:

  • No modo de acréscimo, grave registros uma vez após o atraso da marca d'água.
  • No modo de atualização, grave registros que foram alterados desde o gatilho anterior.
  • No modo completo, grave todos os registros já produzidos pelo operador com estado.