Usar tabelas delta com dados de streaming

Concluído

Todos os dados que exploramos até este ponto foram dados estáticos em arquivos. No entanto, muitos cenários de análise de dados envolvem streaming de dados que devem ser processados quase em tempo real. Por exemplo, talvez seja necessário capturar leituras emitidas por dispositivos de Internet das Coisas (IoT) e armazená-las em uma tabela à medida que ocorrem. O Spark processa dados em lote e dados de streaming da mesma maneira, permitindo que os dados de streaming sejam processados em tempo real usando a mesma API.

Streaming estruturado do Spark

Uma solução típica de processamento de fluxo envolve a leitura constante de um fluxo de dados de uma fonte, opcionalmente processando-o para selecionar campos específicos, agregar e agrupar valores ou manipular os dados e gravar os resultados em um coletor.

O Spark inclui suporte nativo para streaming de dados por meio do Spark Structured Streaming, uma API baseada em um dataframe ilimitado no qual os dados de streaming são capturados para processamento. Um dataframe do Spark Structured Streaming pode ler dados de muitos tipos diferentes de fonte de streaming, incluindo:

  • Portas de rede
  • Serviços de corretagem de mensagens em tempo real, como Hubs de Eventos do Azure ou Kafka
  • Localizações do sistema de ficheiros.

Gorjeta

Para obter mais informações sobre o Spark Structured Streaming, consulte Structured Streaming Programming Guide na documentação do Spark.

Streaming com tabelas Delta

Você pode usar uma tabela Delta como fonte ou coletor para o Spark Structured Streaming. Por exemplo, você pode capturar um fluxo de dados em tempo real de um dispositivo IoT e gravar o fluxo diretamente em uma tabela Delta como um coletor. Em seguida, você pode consultar a tabela para ver os dados transmitidos mais recentes. Ou você pode ler um Delta como uma fonte de streaming, permitindo relatórios quase em tempo real à medida que novos dados são adicionados à tabela.

Usando uma tabela Delta como fonte de streaming

No exemplo PySpark a seguir, uma tabela Delta é criada para armazenar detalhes de ordens de venda pela Internet:

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

Um fluxo de dados hipotético de ordens pela Internet é inserido na tabela orders_in:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

Para verificar, você pode ler e exibir dados da tabela de entrada:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

Os dados são então carregados em um DataFrame de streaming da tabela Delta:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Nota

Ao usar uma tabela Delta como fonte de streaming, somente operações de acréscimo podem ser incluídas no fluxo. As modificações de dados causarão um erro, a menos que você especifique a ignoreChanges opção ou ignoreDeletes .

Você pode verificar se o fluxo está transmitindo usando a isStreaming propriedade que deve retornar True:

# Verify that the stream is streaming
stream_df.isStreaming

Transformar o fluxo de dados

Depois de ler os dados da tabela Delta em um DataFrame de streaming, você pode usar a API de streaming estruturado do Spark para processá-los. Por exemplo, você pode contar o número de pedidos feitos a cada minuto e enviar os resultados agregados para um processo downstream para visualização quase em tempo real.

Neste exemplo, todas as linhas com NULL na coluna Preço são filtradas e novas colunas são adicionadas para IsBike e Total.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Usando uma tabela Delta como um coletor de streaming

O fluxo de dados é então gravado em uma tabela Delta:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Nota

A checkpointLocation opção é usada para gravar um arquivo de ponto de verificação que rastreia o estado do processamento do fluxo. Esse arquivo permite que você se recupere de uma falha no ponto em que o processamento de fluxo parou.

Depois que o processo de streaming for iniciado, você poderá consultar a tabela Delta Lake para ver o que está na tabela de saída. Pode haver um pequeno atraso antes que você possa consultar a tabela.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

Nos resultados desta consulta, a ordem 3005 é excluída porque tinha NULL na coluna Preço. E as duas colunas que foram adicionadas durante a transformação são exibidas - IsBike e Total.

OrderID OrderDate Cliente Produto Quantidade Preço IsBike Total
3001 2023-09-01 Yang Bicicleta de estrada vermelha 1 1200 1 1200
3002 2023-09-01 Carlson Mountain Bike Prata 1 1500 1 1500
3003 2023-09-02 Wilson Bicicleta de estrada amarelo 2 1350 1 2700
3004 2023-09-02 Yang Roda dianteira da estrada 1 115 0 115

Quando terminar, pare os dados de streaming para evitar custos de processamento desnecessários usando o stop método:

# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()

Gorjeta

Para obter mais informações sobre como usar tabelas Delta para streaming de dados, consulte Leituras e gravações de streaming de tabela na documentação do Delta Lake.