Compartilhar via


Obter dados de streaming para o lakehouse com streaming estruturado do Spark

O Streaming estruturado é um mecanismo de processamento de fluxo escalonável e tolerante a falhas integrado no Spark. O Spark cuida da execução da operação de streaming de forma incremental e contínua à medida que os dados continuam a chegar.

O streaming estruturado ficou disponível no Spark 2.2. Desde então, tem sido a abordagem recomendada para streaming de dados. O princípio fundamental por trás do fluxo estruturado é tratar um fluxo de dados ao vivo como uma tabela em que novos dados são sempre acrescentados continuamente, como uma nova linha em uma tabela. Há algumas fontes de arquivo de streaming internas definidas, como CSV, JSON, ORC, Parquet e suporte integrado para serviços de mensagens como Kafka e Hubs de Eventos.

Esse artigo fornece insights sobre como otimizar o processamento e a ingestão de eventos por meio do streaming estruturado do Spark em ambientes de produção com alta taxa de transferência. As abordagens sugeridas incluem:

  • Otimização de taxa de transferência de streaming de dados
  • Otimização de operações de gravação na tabela do Delta e
  • Envio em lote de eventos

Definições de trabalho do Spark e notebooks do Spark

Os notebooks do Spark são uma excelente ferramenta para validar ideias e fazer experimentos para obter insights de seus dados ou código. Os notebooks são amplamente utilizados na preparação, visualização, no aprendizado de máquina e em outros cenários de big data. As definições de trabalho do Spark são tarefas não interativas baseadas em código em execução em um cluster Spark por longos períodos. As definições de trabalho do Spark fornecem robustez e disponibilidade.

Notebooks do Spark são excelentes fontes para testar a lógica do seu código e atender a todos os requisitos de negócios. No entanto, para mantê-lo em execução em um cenário de produção, as definições de trabalho do Spark com a Política de repetição habilitada são a melhor solução.

Política de repetição para definições de trabalho do Spark

No Microsoft Fabric, o usuário pode definir uma política de repetição para trabalhos de Definição de trabalho do Spark. Embora o script no trabalho possa ser infinito, a infraestrutura que executa o script pode incorrer em um problema que exige a interrupção do trabalho. Ou o trabalho pode ser eliminado devido às necessidades subjacentes de aplicação de patch da infraestrutura. A política de repetição permite que o usuário defina regras para reiniciar automaticamente o trabalho se ele for interrompido devido a problemas subjacentes. Os parâmetros especificam a frequência com que o trabalho deve ser reiniciado, até tentativas infinitas, e definem o tempo entre repetições. Dessa forma, os usuários podem garantir que seus trabalhos de Definição de trabalho do Spark continuem sendo executados infinitamente até que o usuário decida interrompê-los.

Fontes de streaming

A configuração do streaming com Os Hubs de Eventos exige configuração básica, que inclui o nome do namespace dos Hubs de Eventos, o nome do hub, o nome da chave de acesso compartilhado e o grupo de consumidores. Um grupo de consumidores é uma exibição de um hub de eventos inteiro. Isso permite que vários aplicativos de consumo tenham um modo de exibição separado do Eventstream e leiam o fluxo de forma independente em seu próprio ritmo e com seus próprios deslocamentos.

As partições são uma parte essencial de ser capaz de lidar com um alto volume de dados. Um único processador tem uma capacidade limitada para lidar com eventos por segundo, enquanto vários processadores podem fazer um trabalho melhor quando executados em paralelo. As partições permitem a possibilidade de processar grandes volumes de eventos em paralelo.

Se muitas partições forem usadas com uma taxa de ingestão baixa, os leitores de partição lidarão com uma pequena parte desses dados, resultando em processamento não ideal. O número ideal de partições depende diretamente da taxa de processamento desejada. Se você quer escalonar o processamento do seu evento, considere adicionar mais partições. Não há limite de taxa de transferência específico em uma partição. No entanto, a taxa de transferência agregada em seu namespace é limitada pelo número de unidades de produtividade. Conforme você aumenta o número de unidades de produtividade no seu namespace, considere partições adicionais para permitir que cada um dos leitores simultâneos alcance sua taxa de transferência máxima.

A recomendação é investigar e testar o melhor número de partições para seu cenário de taxa de transferência. Mas é comum ver cenários com alta taxa de transferência usando 32 ou mais partições.

O Conector de Hubs de Eventos do Azure para Apache Spark (azure-event-hubs-spark) é recomendado para conectar o aplicativo Spark aos Hubs de Eventos do Azure.

Lakehouse como coletor de streaming

O Delta Lake é uma camada de armazenamento de software livre que fornece transações ACID (atomicidade, consistência, isolamento e durabilidade) sobre as soluções de armazenamento do data lake. O Delta Lake também dá suporte à manipulação de metadados escalonáveis, evolução do esquema, viagem no tempo (controle de versão de dados), formato aberto e outros recursos.

Na Engenharia de Dados do Fabric, o Delta Lake é usado para:

  • Fazer upsert (inserir/atualizar) facilmente e excluir dados usando o Spark SQL.
  • Compactar dados para minimizar o tempo gasto consultando dados.
  • Exibir o estado das tabelas antes e depois da execução das operações.
  • Recupere um histórico de operações executadas em tabelas.

O Delta é adicionado como um dos formatos de coletores de saída possíveis usados no writeStream. Para obter mais informações sobre os coletores de saída existentes, consulte Guia de Programação de Streaming Estruturado do Spark.

O exemplo a seguir demonstra como é possível transmitir dados para o Delta Lake.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

Sobre o snippet de código no exemplo:

  • format() é a instrução que define o formato de saída dos dados.
  • outputMode() define de que maneira as novas linhas no streaming são gravadas (ou seja, acrescentadas, substituídas).
  • toTable() persiste os dados transmitidos para uma tabela do Delta criada usando o valor passado como parâmetro.

Otimizar gravações do Delta

O particionamento de dados é uma parte essencial na criação de uma solução de streaming robusta: o particionamento melhora a forma como os dados são organizados e também melhora a taxa de transferência. Os arquivos são fragmentados facilmente após as operações do Delta, resultando em muitos arquivos pequenos. E arquivos muito grandes também são um problema, devido ao longo tempo para gravá-los no disco. O desafio com o particionamento de dados é encontrar o equilíbrio adequado que resulta em tamanhos de arquivo ideais. O Spark dá suporte ao particionamento na memória e no disco. Os dados particionados corretamente podem fornecer o melhor desempenho ao persistir dados no Delta Lake e consultar dados no Delta Lake.

  • Ao particionar dados no disco, você pode escolher como particionar os dados com base em colunas usando partitionBy(). partitionBy() é uma função usada para particionar grandes modelos semânticos em arquivos menores com base em uma ou várias colunas fornecidas durante a gravação em disco. O particionamento é uma maneira de melhorar o desempenho da consulta ao trabalhar com um grande modelo semântico. Evite escolher uma coluna que gere partições muito pequenas ou muito grandes. Defina uma partição com base em um conjunto de colunas com uma boa cardinalidade e divida os dados em arquivos de tamanho ideal.
  • O particionamento de dados na memória pode ser feito usando transformações repartition() ou coalesce(), distribuindo dados em vários nós de trabalho e criando várias tarefas que podem ler e processar dados em paralelo usando os conceitos básicos do RDD (Conjunto de Dados Distribuídos Resilientes). Ele permite dividir o modelo semântico em partições lógicas, que podem ser computadas em nós diferentes do cluster.
    • repartition() é usado para aumentar ou diminuir o número de partições na memória. A repartição reformula dados inteiros pela rede e os equilibra em todas as partições.
    • coalesce() só é usado para diminuir o número de partições com eficiência. Essa é uma versão otimizada de repartition() em que a movimentação de dados em todas as partições é menor usando coalesce().

Combinar ambas as abordagens de particionamento é uma boa solução no cenário com alta taxa de transferência. repartition() cria um número específico de partições na memória, enquanto partitionBy() grava arquivos em disco para cada partição de memória e coluna de particionamento. O exemplo a seguir ilustra o uso de ambas as estratégias de particionamento no mesmo trabalho do Spark: os dados são divididos primeiro em 48 partições na memória (supondo que tenhamos um total de 48 núcleos de CPU) e particionados em disco com base em duas colunas existentes no conteúdo.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Gravação otimizada

Outra opção para otimizar gravações no Delta Lake é usar a Gravação Otimizada. A Gravação Otimizada é um recurso opcional que melhora a forma como os dados são gravados na tabela do Delta. O Spark mescla ou divide as partições antes de gravar os dados, maximizando a taxa de transferência dos dados que estão sendo gravados no disco. No entanto, ele incorre em ordem aleatória total, portanto, para algumas cargas de trabalho, pode causar uma degradação de desempenho. Trabalhos que usam coalesce() e/ou repartition() para particionar dados em disco podem ser refatorados para começar a usar a Gravação Otimizada.

O código a seguir é um exemplo do uso de Gravação Otimizada. Observe que partitionBy() ainda é usado.

spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Eventos de envio em lote

Para minimizar o número de operações para melhorar o tempo gasto na ingestão de dados no Delta Lake, os eventos de envio em lote são uma alternativa prática.

Os gatilhos definem a frequência com que uma consulta de streaming deve ser executada (disparada) e emitir novos dados. Configurá-los define um intervalo de tempo de processamento periódico para microlotes, acumulando dados e agrupando eventos em poucas operações persistentes, em vez de gravar no disco o tempo todo.

O exemplo a seguir mostra uma consulta de streaming em que os eventos são processados periodicamente em intervalos de um minuto.

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

A vantagem de combinar o envio em lote de eventos em operações de gravação de tabela do Delta é que ele cria arquivos do Delta maiores com mais dados, evitando arquivos pequenos. Você deve analisar a quantidade de dados que estão sendo ingeridos e encontrar o melhor tempo de processamento para otimizar o tamanho dos arquivos Parquet criados pela biblioteca do Delta.

Monitoramento

O Spark 3.1 e versões superiores têm uma interface do usuário de streaming estruturada interna contendo as seguintes métricas de streaming:

  • Taxa de entrada
  • Taxa de processamento
  • Linhas de entrada
  • Duração do lote
  • Duração da operação