Używanie usługi Delta Lake z danymi przesyłanymi strumieniowo
Wszystkie eksplorowane dane były danymi statycznym w plikach. Jednak wiele scenariuszy analizy danych obejmuje dane przesyłane strumieniowo , które muszą być przetwarzane niemal w czasie rzeczywistym. Na przykład może być konieczne przechwycenie odczytów emitowanych przez urządzenia internetu rzeczy (IoT) i przechowywanie ich w tabeli w miarę ich występowania.
Przesyłanie strumieniowe ze strukturą platformy Spark
Typowe rozwiązanie do przetwarzania strumieniowego polega na ciągłym odczytywaniu strumienia danych ze źródła, opcjonalnie przetwarzaniu go w celu wybrania określonych pól, agregowania i grupowania wartości lub manipulowania danymi oraz zapisywania wyników do ujścia.
Platforma Spark obejmuje natywną obsługę danych przesyłanych strumieniowo za pośrednictwem przesyłania strumieniowego ze strukturą platformy Spark, czyli interfejsu API opartego na nieograniczonej ramce danych, w której dane przesyłane strumieniowo są przechwytywane do przetwarzania. Ramka danych przesyłania strumieniowego ze strukturą platformy Spark może odczytywać dane z wielu różnych rodzajów źródła przesyłania strumieniowego, w tym portów sieciowych, usług brokera komunikatów w czasie rzeczywistym, takich jak Azure Event Hubs lub Kafka, lub lokalizacji systemu plików.
Napiwek
Aby uzyskać więcej informacji na temat przesyłania strumieniowego ze strukturą platformy Spark, zobacz Przewodnik programowania przesyłania strumieniowego ze strukturą platformy Spark w dokumentacji platformy Spark.
Przesyłanie strumieniowe za pomocą tabel usługi Delta Lake
Tabelę usługi Delta Lake można użyć jako źródła lub ujścia przesyłania strumieniowego ze strukturą platformy Spark. Można na przykład przechwycić strumień danych czasu rzeczywistego z urządzenia IoT i zapisać strumień bezpośrednio w tabeli usługi Delta Lake jako ujście — umożliwiając wykonywanie zapytań dotyczących tabeli w celu wyświetlenia najnowszych strumieniowych danych. Możesz też odczytać tabelę delty jako źródło przesyłania strumieniowego, umożliwiając ciągłe raportowanie nowych danych w miarę dodawania ich do tabeli.
Używanie tabeli usługi Delta Lake jako źródła przesyłania strumieniowego
W poniższym przykładzie PySpark tabela usługi Delta Lake służy do przechowywania szczegółów internetowych zamówień sprzedaży. Tworzony jest strumień, który odczytuje dane z folderu tabeli usługi Delta Lake w miarę dołączania nowych danych.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("/delta/internetorders")
# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
Uwaga
W przypadku używania tabeli usługi Delta Lake jako źródła przesyłania strumieniowego tylko operacje dołączania mogą być uwzględniane w strumieniu. Modyfikacje danych spowodują błąd, chyba że zostanie określona ignoreChanges
opcja lub ignoreDeletes
.
Po odczytaniu danych z tabeli usługi Delta Lake do ramki danych przesyłania strumieniowego możesz użyć interfejsu API przesyłania strumieniowego ze strukturą platformy Spark, aby je przetworzyć. W powyższym przykładzie ramka danych jest po prostu wyświetlana; Można jednak użyć przesyłania strumieniowego ze strukturą platformy Spark, aby zagregować dane w oknach czasowych (na przykład w celu zliczenia liczby zamówień złożonych co minutę) i wysłać zagregowane wyniki do procesu podrzędnego na potrzeby wizualizacji niemal w czasie rzeczywistym.
Używanie tabeli usługi Delta Lake jako ujścia przesyłania strumieniowego
W poniższym przykładzie PySpark strumień danych jest odczytywany z plików JSON w folderze. Dane JSON w każdym pliku zawierają stan urządzenia IoT w formacie {"device":"Dev1","status":"ok"}
Nowe dane są dodawane do strumienia za każdym razem, gdy plik zostanie dodany do folderu. Strumień wejściowy jest nieograniczoną ramką danych, która jest następnie zapisywana w formacie różnicowym w lokalizacji folderu dla tabeli usługi Delta Lake.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
Uwaga
Ta checkpointLocation
opcja służy do zapisywania pliku punktu kontrolnego, który śledzi stan przetwarzania strumienia. Ten plik umożliwia odzyskanie sprawności po awarii w punkcie, w którym przetwarzanie strumienia zostało przerwane.
Po rozpoczęciu procesu przesyłania strumieniowego możesz wysłać zapytanie do tabeli usługi Delta Lake, do której są zapisywane dane wyjściowe przesyłania strumieniowego, aby wyświetlić najnowsze dane. Na przykład poniższy kod tworzy tabelę wykazu dla folderu tabeli usługi Delta Lake i wysyła do niego zapytanie:
%%sql
CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';
SELECT device, status
FROM DeviceTable;
Aby zatrzymać strumień danych zapisywanych w tabeli usługi Delta Lake, możesz użyć stop
metody zapytania przesyłania strumieniowego:
delta_stream.stop()
Napiwek
Aby uzyskać więcej informacji na temat używania tabel usługi Delta Lake na potrzeby danych przesyłanych strumieniowo, zobacz Odczyty i zapisy w dokumentacji usługi Delta Lake.