Usar o Delta Lake com o tipo de dados de streaming
Todos os dados que exploramos até agora foram dados estáticos em arquivos. No entanto, muitos cenários de análise de dados envolvem dados de streaming, que precisam ser processados quase em tempo real. Por exemplo, talvez seja necessário capturar leituras emitidas por dispositivos IoT (Internet das Coisas) e armazená-las em uma tabela conforme elas ocorrem.
Streaming estruturado do Spark
Uma solução típica de processamento de fluxo envolve a leitura constante de um fluxo de dados de uma origem, opcionalmente processando-os para selecionar campos específicos, agregar e agrupar valores ou manipular os dados e gravar os resultados em um coletor.
O Spark inclui suporte nativo para streaming de dados por meio do Streaming Estruturado do Spark, uma API baseada em um dataframe sem limites, no qual os dados de streaming são capturados para processamento. Um dataframe de Streaming Estruturado do Spark pode ler dados de vários tipos diferentes de fonte de streaming, incluindo portas de rede, serviços de intermediação de mensagens em tempo real, como Hubs de Eventos do Azure ou Kafka, ou locais do sistema de arquivos.
Dica
Para obter mais informações sobre o Streaming Estruturado do Spark, confira o Guia de programação de Streaming Estruturado na documentação do Spark.
Streaming com tabelas do Delta Lake
Você pode usar uma tabela do Delta Lake como uma origem ou um coletor para o Streaming Estruturado do Spark. Por exemplo, você pode capturar um fluxo de dados em tempo real de um dispositivo IoT e gravar o fluxo diretamente em uma tabela do Delta Lake como um coletor – permitindo que você consulte a tabela para ver os dados transmitidos mais recentes. Alternativamente, você pode ler uma Tabela Delta como uma origem de streaming, permitindo que você relate constantemente novos dados conforme eles são adicionados à tabela.
Usar uma tabela do Delta Lake como uma origem de streaming
No exemplo de PySpark a seguir, uma tabela do Delta Lake é usada para armazenar detalhes de pedidos de vendas na Internet. Um fluxo é criado que lê dados da pasta da tabela do Delta Lake à medida que novos dados são acrescentados.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("/delta/internetorders")
# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
Observação
Ao usar uma tabela do Delta Lake como origem de streaming, somente operações de acréscimo podem ser incluídas no fluxo. Modificações de dados causarão um erro, a menos que você especifique a opção ignoreChanges
ou ignoreDeletes
.
Depois de ler os dados da tabela do Delta Lake em um dataframe de streaming, você pode usar a API de Streaming Estruturado do Spark para processá-los. No exemplo acima, o dataframe é simplesmente exibido, mas você pode usar o Streaming Estruturado do Spark para agregar os dados em janelas temporais (por exemplo, para contar o número de pedidos feitos a cada minuto) e enviar os resultados agregados para um processo downstream para visualização quase em tempo real.
Usar uma tabela do Delta Lake como um coletor de streaming
No exemplo de PySpark a seguir, um fluxo de dados é lido de arquivos JSON em uma pasta. Os dados JSON em cada arquivo contêm o status de um dispositivo IoT no formato {"device":"Dev1","status":"ok"}
Novos dados são adicionados ao fluxo sempre que um arquivo é adicionado à pasta. O fluxo de entrada é um dataframe sem limites, que é gravado em formato delta em um local de pasta para uma tabela do Delta Lake.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
Observação
A opção checkpointLocation
é usada para gravar um arquivo de ponto de verificação que rastreia o estado do processamento do fluxo. Esse arquivo permite que você se recupere de uma falha no ponto em que o processamento de fluxo parou.
Depois que o processo de streaming for iniciado, você poderá consultar a tabela do Delta Lake na qual a saída de streaming está sendo gravada para ver os dados mais recentes. Por exemplo, o seguinte código cria uma tabela de catálogo para a pasta de tabela do Delta Lake e consulta essa tabela:
%%sql
CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';
SELECT device, status
FROM DeviceTable;
Para impedir que o fluxo de dados seja gravado na tabela do Delta Lake, você pode usar o método stop
da consulta de streaming:
delta_stream.stop()
Dica
Para obter mais informações sobre como usar tabelas do Delta Lake para dados de streaming, confira Leituras e gravações de streaming de tabela na documentação do Delta Lake.