Usar tabelas delta com dados de streaming
Todos os dados que exploramos até este ponto foram dados estáticos em arquivos. No entanto, muitos cenários de análise de dados envolvem streaming de dados que devem ser processados quase em tempo real. Por exemplo, talvez seja necessário capturar leituras emitidas por dispositivos de Internet das Coisas (IoT) e armazená-las em uma tabela à medida que ocorrem. O Spark processa dados em lote e dados de streaming da mesma maneira, permitindo que os dados de streaming sejam processados em tempo real usando a mesma API.
Streaming estruturado do Spark
Uma solução típica de processamento de fluxo envolve a leitura constante de um fluxo de dados de uma fonte, opcionalmente processando-o para selecionar campos específicos, agregar e agrupar valores ou manipular os dados e gravar os resultados em um coletor.
O Spark inclui suporte nativo para streaming de dados por meio do Spark Structured Streaming, uma API baseada em um dataframe ilimitado no qual os dados de streaming são capturados para processamento. Um dataframe do Spark Structured Streaming pode ler dados de muitos tipos diferentes de fonte de streaming, incluindo:
- Portas de rede
- Serviços de corretagem de mensagens em tempo real, como Hubs de Eventos do Azure ou Kafka
- Localizações do sistema de ficheiros.
Gorjeta
Para obter mais informações sobre o Spark Structured Streaming, consulte Structured Streaming Programming Guide na documentação do Spark.
Streaming com tabelas Delta
Você pode usar uma tabela Delta como fonte ou coletor para o Spark Structured Streaming. Por exemplo, você pode capturar um fluxo de dados em tempo real de um dispositivo IoT e gravar o fluxo diretamente em uma tabela Delta como um coletor. Em seguida, você pode consultar a tabela para ver os dados transmitidos mais recentes. Ou você pode ler um Delta como uma fonte de streaming, permitindo relatórios quase em tempo real à medida que novos dados são adicionados à tabela.
Usando uma tabela Delta como fonte de streaming
No exemplo PySpark a seguir, uma tabela Delta é criada para armazenar detalhes de ordens de venda pela Internet:
%%sql
CREATE TABLE orders_in
(
OrderID INT,
OrderDate DATE,
Customer STRING,
Product STRING,
Quantity INT,
Price DECIMAL
)
USING DELTA;
Um fluxo de dados hipotético de ordens pela Internet é inserido na tabela orders_in:
%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
(3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
(3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
(3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
(3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
(3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);
Para verificar, você pode ler e exibir dados da tabela de entrada:
# Read and display the input table
df = spark.read.format("delta").table("orders_in")
display(df)
Os dados são então carregados em um DataFrame de streaming da tabela Delta:
# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.table("orders_in")
Nota
Ao usar uma tabela Delta como fonte de streaming, somente operações de acréscimo podem ser incluídas no fluxo. As modificações de dados causarão um erro, a menos que você especifique a ignoreChanges
opção ou ignoreDeletes
.
Você pode verificar se o fluxo está transmitindo usando a isStreaming
propriedade que deve retornar True:
# Verify that the stream is streaming
stream_df.isStreaming
Transformar o fluxo de dados
Depois de ler os dados da tabela Delta em um DataFrame de streaming, você pode usar a API de streaming estruturado do Spark para processá-los. Por exemplo, você pode contar o número de pedidos feitos a cada minuto e enviar os resultados agregados para um processo downstream para visualização quase em tempo real.
Neste exemplo, todas as linhas com NULL na coluna Preço são filtradas e novas colunas são adicionadas para IsBike e Total.
from pyspark.sql.functions import col, expr
transformed_df = stream_df.filter(col("Price").isNotNull()) \
.withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
.withColumn('Total', expr("Quantity * Price").cast('decimal'))
Usando uma tabela Delta como um coletor de streaming
O fluxo de dados é então gravado em uma tabela Delta:
# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)
print("Streaming to orders_processed...")
Nota
A checkpointLocation
opção é usada para gravar um arquivo de ponto de verificação que rastreia o estado do processamento do fluxo. Esse arquivo permite que você se recupere de uma falha no ponto em que o processamento de fluxo parou.
Depois que o processo de streaming for iniciado, você poderá consultar a tabela Delta Lake para ver o que está na tabela de saída. Pode haver um pequeno atraso antes que você possa consultar a tabela.
%%sql
SELECT *
FROM orders_processed
ORDER BY OrderID;
Nos resultados desta consulta, a ordem 3005 é excluída porque tinha NULL na coluna Preço. E as duas colunas que foram adicionadas durante a transformação são exibidas - IsBike e Total.
OrderID | OrderDate | Cliente | Produto | Quantidade | Preço | IsBike | Total |
---|---|---|---|---|---|---|---|
3001 | 2023-09-01 | Yang | Bicicleta de estrada vermelha | 1 | 1200 | 1 | 1200 |
3002 | 2023-09-01 | Carlson | Mountain Bike Prata | 1 | 1500 | 1 | 1500 |
3003 | 2023-09-02 | Wilson | Bicicleta de estrada amarelo | 2 | 1350 | 1 | 2700 |
3004 | 2023-09-02 | Yang | Roda dianteira da estrada | 1 | 115 | 0 | 115 |
Quando terminar, pare os dados de streaming para evitar custos de processamento desnecessários usando o stop
método:
# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()
Gorjeta
Para obter mais informações sobre como usar tabelas Delta para streaming de dados, consulte Leituras e gravações de streaming de tabela na documentação do Delta Lake.