Używanie tabel różnicowych z danymi przesyłanymi strumieniowo

Ukończone

Wszystkie dane, które zbadaliśmy do tego momentu, to dane statyczne w plikach. Jednak wiele scenariuszy analizy danych obejmuje dane przesyłane strumieniowo , które muszą być przetwarzane niemal w czasie rzeczywistym. Na przykład może być konieczne przechwycenie odczytów emitowanych przez urządzenia internetu rzeczy (IoT) i przechowywanie ich w tabeli w miarę ich występowania. Platforma Spark przetwarza dane wsadowe i dane przesyłane strumieniowo w ten sam sposób, umożliwiając przetwarzanie danych przesyłanych strumieniowo w czasie rzeczywistym przy użyciu tego samego interfejsu API.

Przesyłanie strumieniowe ze strukturą platformy Spark

Typowe rozwiązanie do przetwarzania strumieniowego obejmuje:

  • Stale odczytywanie strumienia danych ze źródła.
  • Opcjonalnie przetwarzanie danych w celu wybrania określonych pól, agregowania i grupowania wartości lub manipulowania danymi w inny sposób.
  • Zapisywanie wyników w ujściu.

Platforma Spark obejmuje natywną obsługę danych przesyłanych strumieniowo za pośrednictwem przesyłania strumieniowego ze strukturą platformy Spark, czyli interfejsu API opartego na nieograniczonej ramce danych, w której dane przesyłane strumieniowo są przechwytywane do przetwarzania. Ramka danych przesyłania strumieniowego ze strukturą platformy Spark może odczytywać dane z wielu różnych rodzajów źródła przesyłania strumieniowego, w tym:

  • Porty sieciowe
  • Usługi brokera komunikatów w czasie rzeczywistym, takie jak Azure Event Hubs lub Kafka
  • Lokalizacje systemu plików.

Napiwek

Aby uzyskać więcej informacji na temat przesyłania strumieniowego ze strukturą platformy Spark, zobacz Przewodnik programowania przesyłania strumieniowego ze strukturą platformy Spark w dokumentacji platformy Spark.

Przesyłanie strumieniowe za pomocą tabel delty

Tabelę delty można użyć jako źródła lub ujścia przesyłania strumieniowego ze strukturą platformy Spark. Można na przykład przechwycić strumień danych czasu rzeczywistego z urządzenia IoT i zapisać strumień bezpośrednio w tabeli delty jako ujście. Następnie możesz wykonać zapytanie względem tabeli, aby wyświetlić najnowsze dane przesyłane strumieniowo. Możesz też odczytać funkcję delta jako źródło przesyłania strumieniowego, umożliwiając raportowanie niemal w czasie rzeczywistym, gdy nowe dane są dodawane do tabeli.

Używanie tabeli delty jako źródła przesyłania strumieniowego

W poniższym przykładzie PySpark jest tworzona tabela delty do przechowywania szczegółów internetowych zamówień sprzedaży:

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

Hipotetyczny strumień danych zamówień internetowych jest wstawiany do tabeli orders_in:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

Aby sprawdzić, możesz odczytywać i wyświetlać dane z tabeli wejściowej:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

Dane są następnie ładowane do przesyłania strumieniowego ramki danych z tabeli delty:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Uwaga

W przypadku używania tabeli delty jako źródła przesyłania strumieniowego tylko operacje dołączania mogą być uwzględniane w strumieniu. Modyfikacje danych mogą spowodować błąd, chyba że zostanie określona ignoreChanges opcja lub ignoreDeletes .

Możesz sprawdzić, czy strumień jest przesyłany strumieniowo, używając isStreaming właściwości , która powinna zwrócić wartość True:

# Verify that the stream is streaming
stream_df.isStreaming

Przekształcanie strumienia danych

Po odczytaniu danych z tabeli delta do przesyłania strumieniowego ramki danych możesz użyć interfejsu API przesyłania strumieniowego ze strukturą platformy Spark, aby je przetworzyć. Można na przykład policzyć liczbę zamówień złożonych co minutę i wysłać zagregowane wyniki do procesu podrzędnego na potrzeby wizualizacji niemal w czasie rzeczywistym.

W tym przykładzie wszystkie wiersze o wartości NULL w kolumnie Cena są filtrowane, a nowe kolumny są dodawane dla kolumn IsBike i Total.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Używanie tabeli delty jako ujścia przesyłania strumieniowego

Strumień danych jest następnie zapisywany w tabeli delty:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Uwaga

Ta checkpointLocation opcja służy do zapisywania pliku punktu kontrolnego, który śledzi stan przetwarzania strumienia. Ten plik umożliwia odzyskanie sprawności po awarii w punkcie, w którym przetwarzanie strumienia zostało przerwane.

Po rozpoczęciu procesu przesyłania strumieniowego możesz wykonać zapytanie względem tabeli usługi Delta Lake, aby zobaczyć, co znajduje się w tabeli wyjściowej. Zanim będzie można wykonać zapytanie dotyczące tabeli, może wystąpić krótkie opóźnienie.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

W wynikach tego zapytania kolejność 3005 jest wykluczona, ponieważ miała wartość NULL w kolumnie Price (Cena). Zostaną wyświetlone dwie kolumny, które zostały dodane podczas transformacji — IsBike i Total.

OrderID OrderDate (Data zamówienia) Klient Rezultat Ilość Cena IsBike Łącznie
3001 2023-09-01 Yang Road Bike Red 1 1200 1 1200
3002 2023-09-01 Carlson Mountain Bike Silver 1 1500 1 1500
3003 2023-09-02 Wilson Rower drogowy żółty 2 1350 1 2700
3004 2023-09-02 Yang Koło przednie drogi 1 210 0 210

Po zakończeniu zatrzymaj dane przesyłane strumieniowo, aby uniknąć niepotrzebnych kosztów przetwarzania przy użyciu stop metody :

# Stop the streaming data to avoid excessive processing costs
deltastream.stop()

Napiwek

Aby uzyskać więcej informacji na temat używania tabel delty na potrzeby danych przesyłanych strumieniowo, zobacz Artykuł Odczyty i zapisy w dokumentacji usługi Delta Lake.