Use o Delta Lake com dados de streaming
Todos os dados que exploramos até este ponto foram dados estáticos em arquivos. No entanto, muitos cenários de análise de dados envolvem streaming de dados que devem ser processados quase em tempo real. Por exemplo, talvez seja necessário capturar leituras emitidas por dispositivos de Internet das Coisas (IoT) e armazená-las em uma tabela à medida que ocorrem.
Streaming estruturado do Spark
Uma solução típica de processamento de fluxo envolve a leitura constante de um fluxo de dados de uma fonte, opcionalmente processando-o para selecionar campos específicos, agregar e agrupar valores ou manipular os dados e gravar os resultados em um coletor.
O Spark inclui suporte nativo para streaming de dados por meio do Spark Structured Streaming, uma API baseada em um dataframe ilimitado no qual os dados de streaming são capturados para processamento. Um dataframe do Spark Structured Streaming pode ler dados de muitos tipos diferentes de fonte de streaming, incluindo portas de rede, serviços de corretagem de mensagens em tempo real, como Hubs de Eventos do Azure ou Kafka, ou locais do sistema de arquivos.
Gorjeta
Para obter mais informações sobre o Spark Structured Streaming, consulte Structured Streaming Programming Guide na documentação do Spark.
Streaming com tabelas Delta Lake
Você pode usar uma tabela Delta Lake como fonte ou coletor para o Spark Structured Streaming. Por exemplo, você pode capturar um fluxo de dados em tempo real de um dispositivo IoT e gravar o fluxo diretamente em uma tabela Delta Lake como um coletor - permitindo que você consulte a tabela para ver os dados transmitidos mais recentes. Ou, você pode ler uma tabela delta como uma fonte de streaming, permitindo que você relate constantemente novos dados à medida que eles são adicionados à tabela.
Usando uma tabela Delta Lake como fonte de streaming
No exemplo PySpark a seguir, uma tabela Delta Lake é usada para armazenar detalhes de ordens de venda pela Internet. É criado um fluxo que lê dados da pasta da tabela Delta Lake à medida que novos dados são acrescentados.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("/delta/internetorders")
# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
Nota
Ao usar uma tabela Delta Lake como fonte de streaming, apenas operações de acréscimo podem ser incluídas no fluxo. As modificações de dados causarão um erro, a menos que você especifique a ignoreChanges
opção ou ignoreDeletes
.
Depois de ler os dados da tabela Delta Lake em um dataframe de streaming, você pode usar a API de streaming estruturado do Spark para processá-los. No exemplo acima, o dataframe é simplesmente exibido; mas você pode usar o Spark Structured Streaming para agregar os dados em janelas temporais (por exemplo, para contar o número de pedidos feitos a cada minuto) e enviar os resultados agregados para um processo downstream para visualização quase em tempo real.
Usando uma tabela Delta Lake como um coletor de streaming
No exemplo PySpark a seguir, um fluxo de dados é lido de arquivos JSON em uma pasta. Os dados JSON em cada arquivo contêm o status de um dispositivo IoT no formato {"device":"Dev1","status":"ok"}
Novos dados são adicionados ao fluxo sempre que um arquivo é adicionado à pasta. O fluxo de entrada é um dataframe ilimitado, que é então gravado em formato delta em um local de pasta para uma tabela Delta Lake.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
Nota
A checkpointLocation
opção é usada para gravar um arquivo de ponto de verificação que rastreia o estado do processamento do fluxo. Esse arquivo permite que você se recupere de uma falha no ponto em que o processamento de fluxo parou.
Depois que o processo de streaming for iniciado, você poderá consultar a tabela Delta Lake na qual a saída de streaming está sendo gravada para ver os dados mais recentes. Por exemplo, o código a seguir cria uma tabela de catálogo para a pasta de tabela Delta Lake e a consulta:
%%sql
CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';
SELECT device, status
FROM DeviceTable;
Para interromper o fluxo de dados que está sendo gravado na tabela Delta Lake, você pode usar o stop
método da consulta de streaming:
delta_stream.stop()
Gorjeta
Para obter mais informações sobre como usar tabelas Delta Lake para streaming de dados, consulte Leituras e gravações de streaming de tabela na documentação do Delta Lake.