Usar tabelas Delta com dados de streaming

Concluído

Todos os dados que exploramos até agora foram dados estáticos em arquivos. No entanto, muitos cenários de análise de dados envolvem dados de streaming, que precisam ser processados quase em tempo real. Por exemplo, talvez seja necessário capturar leituras emitidas por dispositivos IoT (Internet das Coisas) e armazená-las em uma tabela conforme elas ocorrem. O Spark processa dados em lote e dados de stream contínuo da mesma maneira, permitindo que dados de stream contínuo sejam processados em tempo real usando a mesma API.

Streaming estruturado do Spark

Uma solução típica de processamento de fluxo envolve a leitura constante de um fluxo de dados de uma origem, opcionalmente processando-os para selecionar campos específicos, agregar e agrupar valores ou manipular os dados e gravar os resultados em um coletor.

O Spark inclui suporte nativo para streaming de dados por meio do Streaming Estruturado do Spark, uma API baseada em um dataframe sem limites, no qual os dados de streaming são capturados para processamento. Um DataFrame de Streaming Estruturado do Spark pode ler dados de diferentes tipos de fontes de streaming, incluindo:

  • Portas de rede
  • Serviços de intermediação de mensagens em tempo real, como Hubs de Eventos do Azure ou Kafka
  • Locais do sistema de arquivos.

Dica

Para obter mais informações sobre o Streaming Estruturado do Spark, confira o Guia de programação de Streaming Estruturado na documentação do Spark.

Streaming com tabelas Delta

Você pode usar uma tabela Delta como origem ou como coletor para o Streaming Estruturado do Spark. Por exemplo, você pode capturar um fluxo de dados em tempo real de um dispositivo de IoT e gravar o stream diretamente em uma tabela Delta como coletor. Você pode então consultar a tabela para ver os dados mais recentes transmitidos. Ou você pode fazer a leitura de um Delta como uma fonte de streaming, possibilitando relatórios quase em tempo real à medida que novos dados são adicionados à tabela.

Usando uma tabela Delta como uma fonte de streaming

No exemplo a seguir do PySpark, uma tabela Delta é criada para armazenar detalhes dos pedidos de vendas pela Internet:

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

Um fluxo de dados hipotético de pedidos pela Internet é inserido na tabela orders_in:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

Para verificar, você pode ler e exibir os dados da tabela de entrada:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

Os dados são então carregados em um DataFrame de streaming a partir da tabela Delta:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Observação

Ao usar uma tabela Delta como uma fonte de streaming, apenas operações de acréscimo podem ser incluídas no stream. Modificações de dados causarão um erro, a menos que você especifique a opção ignoreChanges ou ignoreDeletes.

Você pode verificar se o stream está em andamento usando a propriedade isStreaming, que deve retornar Verdadeiro:

# Verify that the stream is streaming
stream_df.isStreaming

Transformar o fluxo de dados

Após fazer a leitura dos dados da tabela Delta em um DataFrame de streaming, você poderá usar a API de Streaming Estruturado do Spark para processá-los. Por exemplo, você pode contar o número de pedidos feitos a cada minuto e enviar os resultados agregados para um processo downstream para visualização quase em tempo real.

Neste exemplo, todas as linhas com NULO na coluna Preço são filtradas e novas colunas são adicionadas para IsBike e Total.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Usando uma tabela Delta como um coletor de streaming

O fluxo de dados é então gravado em uma tabela Delta:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Observação

A opção checkpointLocation é usada para gravar um arquivo de ponto de verificação que rastreia o estado do processamento do fluxo. Esse arquivo permite que você se recupere de uma falha no ponto em que o processamento de fluxo parou.

Após o início do processo de streaming, você poderá consultar a tabela Delta Lake para ver o que está na tabela de saída. Pode haver um pequeno atraso antes que você possa consultar a tabela.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

Nos resultados dessa consulta, o pedido 3005 é excluído porque tinha NULO na coluna Preço. E as duas colunas que foram adicionadas durante a transformação são exibidas: IsBike e Total.

OrderID OrderDate Cliente Produto Quantidade Preço IsBike Total
3001 2023-09-01 Yang Road Bike Vermelha 1 1200 1 1200
3002 2023-09-01 Carlson Mountain Bike Prata 1 1500 1 1500
3003 2023-09-02 Wilson Road Bike Amarela 2 1350 1 2700
3004 2023-09-02 Yang Roda Dianteira de Estrada 1 115 0 115

Quando terminar, pare o stream contínuo de dados para evitar custos de processamento desnecessários, utilizando o método stop:

# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()

Dica

Para mais informações sobre o uso de tabelas Delta para stream contínuo de dados, confira Leituras e gravações de streaming em tabela na documentação do Delta Lake.