Obtenha dados de streaming para o lakehouse com o streaming estruturado do Spark
O Streaming Estruturado é um mecanismo de processamento de fluxo escalável e tolerante a falhas criado no Spark. O Spark se encarrega de executar a operação de streaming de forma incremental e contínua à medida que os dados continuam a chegar.
O streaming estruturado ficou disponível no Spark 2.2. Desde então, tem sido a abordagem recomendada para o streaming de dados. O princípio fundamental por trás do fluxo estruturado é tratar um fluxo de dados ao vivo como uma tabela onde novos dados são sempre continuamente acrescentados, como uma nova linha em uma tabela. Existem algumas fontes de arquivos de streaming integradas definidas, como CSV, JSON, ORC, Parquet e suporte integrado para serviços de mensagens como Kafka e Hubs de Eventos.
Este artigo fornece informações sobre como otimizar o processamento e a ingestão de eventos por meio do streaming estruturado do Spark em ambientes de produção com alta taxa de transferência. As abordagens sugeridas incluem:
- Otimização da taxa de transferência de streaming de dados
- Otimizando operações de gravação na tabela delta e
- Processamento em lote de eventos
Definições de trabalho do Spark e blocos de anotações do Spark
Os blocos de anotações Spark são uma excelente ferramenta para validar ideias e fazer experimentos para obter insights de seus dados ou código. Os notebooks são amplamente utilizados na preparação de dados, visualização, aprendizado de máquina e outros cenários de big data. As definições de trabalho do Spark são tarefas não interativas orientadas a código executadas em um cluster do Spark por longos períodos. As definições de trabalho do Spark fornecem robustez e disponibilidade.
Os notebooks Spark são uma excelente fonte para testar a lógica do seu código e atender a todos os requisitos de negócios. No entanto, para mantê-lo em execução em um cenário de produção, as definições de trabalho do Spark com a Política de Repetição habilitada são a melhor solução.
Política de repetição para definições de trabalho do Spark
No Microsoft Fabric, o usuário pode definir uma política de repetição para trabalhos de definição de trabalho do Spark. Embora o script no trabalho possa ser infinito, a infraestrutura que executa o script pode incorrer em um problema que exija a interrupção do trabalho. Ou o trabalho pode ser eliminado devido às necessidades de correção da infraestrutura subjacente. A política de repetição permite que o usuário defina regras para reiniciar automaticamente o trabalho se ele parar devido a quaisquer problemas subjacentes. Os parâmetros especificam com que frequência o trabalho deve ser reiniciado, até infinitas tentativas, e definindo o tempo entre as tentativas. Dessa forma, os usuários podem garantir que seus trabalhos de definição de trabalho do Spark continuem sendo executados infinitamente até que o usuário decida pará-los.
Fontes de streaming
A configuração de streaming com Hubs de Eventos requer configuração básica, que inclui o nome do namespace dos Hubs de Eventos, o nome do hub, o nome da chave de acesso compartilhado e o grupo de consumidores. Um grupo de consumidores é uma visão de um hub de eventos inteiro. Ele permite que vários aplicativos consumidores tenham uma visão separada do fluxo de eventos e leiam o fluxo de forma independente em seu próprio ritmo e com seus deslocamentos.
As partições são uma parte essencial da capacidade de lidar com um grande volume de dados. Um único processador tem uma capacidade limitada para lidar com eventos por segundo, enquanto vários processadores podem fazer um trabalho melhor quando executados em paralelo. As partições permitem a possibilidade de processar grandes volumes de eventos em paralelo.
Se muitas partições forem usadas com uma baixa taxa de ingestão, os leitores de partições lidam com uma pequena porção desses dados, causando um processamento não ideal. O número ideal de partições depende diretamente da taxa de processamento desejada. Se quiser dimensionar o processamento de eventos, considere adicionar mais partições. Não há limite de taxa de transferência específico em uma partição. No entanto, a taxa de transferência agregada em seu namespace é limitada pelo número de unidades de taxa de transferência. À medida que você aumenta o número de unidades de taxa de transferência em seu namespace, convém que partições extras permitam que leitores simultâneos atinjam sua taxa de transferência máxima.
A recomendação é investigar e testar o melhor número de partições para o seu cenário de taxa de transferência. Mas é comum ver cenários com alta taxa de transferência usando 32 ou mais partições.
O Conector de Hubs de Eventos do Azure para Apache Spark (azure-event-hubs-spark) é recomendado para conectar o aplicativo Spark aos Hubs de Eventos do Azure.
Lakehouse como pia de fluxo
O Delta Lake é uma camada de armazenamento de código aberto que fornece transações ACID (atomicidade, consistência, isolamento e durabilidade) sobre soluções de armazenamento de data lake. O Delta Lake também suporta manipulação de metadados escaláveis, evolução do esquema, viagem no tempo (versionamento de dados), formato aberto e outros recursos.
No Fabric Data Engineering, o Delta Lake é usado para:
- Facilmente upsert (inserir/atualizar) e excluir dados usando o Spark SQL.
- Compacte dados para minimizar o tempo gasto consultando dados.
- Visualize o estado das tabelas antes e depois da execução das operações.
- Recupere um histórico de operações executadas em tabelas.
Delta é adicionado como um dos possíveis formatos de coletores de saída usados no writeStream. Para obter mais informações sobre os coletores de saída existentes, consulte Spark Structured Streaming Programming Guide.
O exemplo a seguir demonstra como é possível transmitir dados para o Delta Lake.
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
Schema = StructType([StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True)])
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.toTable("deltaeventstable")
Sobre o código cortado no exemplo:
- format() é a instrução que define o formato de saída dos dados.
- outputMode() define de que maneira as novas linhas no streaming são gravadas (ou seja, acrescentar, substituir).
- toTable() persiste os dados transmitidos em uma tabela Delta criada usando o valor passado como parâmetro.
Otimizando gravações Delta
O particionamento de dados é uma parte crítica na criação de uma solução de streaming robusta: o particionamento melhora a forma como os dados são organizados e também melhora a taxa de transferência. Os arquivos são facilmente fragmentados após operações Delta, resultando em muitos arquivos pequenos. E arquivos muito grandes também são um problema, devido ao longo tempo para escrevê-los no disco. O desafio com o particionamento de dados é encontrar o equilíbrio adequado que resulta em tamanhos de arquivo ideais. O Spark suporta particionamento na memória e no disco. Dados particionados corretamente podem fornecer o melhor desempenho ao persistir dados no Delta Lake e consultar dados do Delta Lake.
- Ao particionar dados no disco, você pode escolher como particionar os dados com base em colunas usando partitionBy(). partitionBy() é uma função usada para particionar grandes modelos semânticos em arquivos menores com base em uma ou várias colunas fornecidas durante a gravação no disco. O particionamento é uma maneira de melhorar o desempenho da consulta ao trabalhar com um modelo semântico grande. Evite escolher uma coluna que gere partições muito pequenas ou muito grandes. Defina uma partição com base em um conjunto de colunas com uma boa cardinalidade e divida os dados em arquivos de tamanho ideal.
- O particionamento de dados na memória pode ser feito usando transformações repartition() ou coalesce(), distribuindo dados em vários nós de trabalho e criando várias tarefas que podem ler e processar dados em paralelo usando os fundamentos do Resilient Distributed Dataset (RDD). Permite dividir o modelo semântico em partições lógicas, que podem ser calculadas em diferentes nós do cluster.
- repartition() é usado para aumentar ou diminuir o número de partições na memória. A repartição reorganiza dados inteiros na rede e equilibra-os em todas as partições.
- coalesce() é usado apenas para diminuir o número de partições de forma eficiente. Essa é uma versão otimizada de repartition() onde o movimento de dados em todas as partições é menor usando coalesce().
Combinar ambas as abordagens de particionamento é uma boa solução em cenários com alta taxa de transferência. repartition() cria um número específico de partições na memória, enquanto partitionBy() grava arquivos no disco para cada partição de memória e coluna de particionamento. O exemplo a seguir ilustra o uso de ambas as estratégias de particionamento no mesmo trabalho do Spark: os dados são primeiro divididos em 48 partições na memória (supondo que tenhamos um total de 48 núcleos de CPU) e, em seguida, particionados no disco com base em duas colunas existentes na carga útil.
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
Escrita otimizada
Outra opção para otimizar as gravações no Delta Lake é usar a gravação otimizada. A gravação otimizada é um recurso opcional que melhora a maneira como os dados são gravados na tabela Delta. O Spark mescla ou divide as partições antes de gravar os dados, maximizando a taxa de transferência dos dados que estão sendo gravados no disco. No entanto, ele incorre em embaralhamento total, portanto, para algumas cargas de trabalho, pode causar uma degradação de desempenho. Os trabalhos que usam coalesce() e/ou repartition() para particionar dados no disco podem ser refatorados para começar a usar a Gravação Otimizada.
O código a seguir é um exemplo do uso da Gravação Otimizada. Observe que partitionBy() ainda é usado.
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true)
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
Eventos de processamento em lote
A fim de minimizar o número de operações para melhorar o tempo gasto na ingestão de dados no lago Delta, os eventos em lote são uma alternativa prática.
Os gatilhos definem com que frequência uma consulta de streaming deve ser executada (acionada) e emitem novos dados. Configurá-los define um intervalo de tempo de processamento periódico para microlotes, acumulando dados e eventos em lote em poucas operações persistentes, em vez de gravar em disco o tempo todo.
O exemplo a seguir mostra uma consulta de streaming em que os eventos são processados periodicamente em intervalos de um minuto.
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.trigger(processingTime="1 minute") \
.toTable("deltaeventstable")
A vantagem de combinar lotes de eventos em operações de escrita de tabelas Delta é que cria arquivos Delta maiores com mais dados neles, evitando arquivos pequenos. Você deve analisar a quantidade de dados que estão sendo ingeridos e encontrar o melhor tempo de processamento para otimizar o tamanho dos arquivos Parquet criados pela biblioteca Delta.
Monitorização
O Spark 3.1 e versões superiores têm uma interface do usuário de streaming estruturada integrada contendo as seguintes métricas de streaming:
- Taxa de entrada
- Taxa de processo
- Linhas de entrada
- Duração do lote
- Duração da Operação
Conteúdos relacionados
- Obtenha dados de streaming no lakehouse e acesse com o endpoint de análise SQL.