Delta-tabellen gebruiken met streaminggegevens

Voltooid

Alle gegevens die we tot nu toe hebben verkend, zijn statische gegevens in bestanden. Veel scenario's voor gegevensanalyse omvatten echter streaminggegevens die in bijna realtime moeten worden verwerkt. U moet bijvoorbeeld mogelijk leesbewerkingen vastleggen die worden verzonden door IoT-apparaten (Internet of Things) en deze opslaan in een tabel wanneer ze optreden. Spark verwerkt batchgegevens en streaminggegevens op dezelfde manier, waardoor streaminggegevens in realtime kunnen worden verwerkt met dezelfde API.

Spark Structured Streaming

Een typische oplossing voor stroomverwerking omvat het voortdurend lezen van een gegevensstroom uit een bron, optioneel verwerken om specifieke velden te selecteren, aggregatie- en groepswaarden te selecteren, of de gegevens op een andere manier te manipuleren en de resultaten naar een sink te schrijven.

Spark bevat systeemeigen ondersteuning voor streaminggegevens via Spark Structured Streaming, een API die is gebaseerd op een gebonden dataframe waarin streaminggegevens worden vastgelegd voor verwerking. Een Spark Structured Streaming-gegevensframe kan gegevens lezen uit veel verschillende soorten streamingbron, waaronder:

  • Netwerkpoorten
  • Realtime berichtenbrokerservices zoals Azure Event Hubs of Kafka
  • Bestandssysteemlocaties.

Streamen met Delta-tabellen

U kunt een Delta-tabel gebruiken als bron of sink voor Spark Structured Streaming. U kunt bijvoorbeeld een stroom realtimegegevens van een IoT-apparaat vastleggen en de stream rechtstreeks naar een Delta-tabel schrijven als sink. Vervolgens kunt u een query uitvoeren op de tabel om de meest recente gestreamde gegevens weer te geven. U kunt een Delta ook lezen als streamingbron, waardoor bijna realtime rapportage mogelijk wordt wanneer er nieuwe gegevens aan de tabel worden toegevoegd.

Een Delta-tabel gebruiken als streamingbron

In het volgende PySpark-voorbeeld wordt een Delta-tabel gemaakt om details van internetverkooporders op te slaan:

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

Een hypothetische gegevensstroom van internetorders wordt ingevoegd in de orders_in tabel:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

U kunt dit controleren door gegevens uit de invoertabel te lezen en weer te geven:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

De gegevens worden vervolgens vanuit de Delta-tabel geladen in een streaming DataFrame:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Notitie

Wanneer u een Delta-tabel als streamingbron gebruikt, kunnen alleen toevoegbewerkingen in de stream worden opgenomen. Gegevenswijzigingen veroorzaken een fout, tenzij u de ignoreChanges of ignoreDeletes optie opgeeft.

U kunt controleren of de stream wordt gestreamd met behulp van de isStreaming eigenschap die Waar moet retourneren:

# Verify that the stream is streaming
stream_df.isStreaming

De gegevensstroom transformeren

Nadat u de gegevens uit de Delta-tabel hebt gelezen in een streaming DataFrame, kunt u de Spark Structured Streaming-API gebruiken om deze te verwerken. U kunt bijvoorbeeld het aantal orders tellen dat elke minuut is geplaatst en de geaggregeerde resultaten verzenden naar een downstreamproces voor bijna realtime visualisatie.

In dit voorbeeld worden rijen met NULL in de kolom Prijs gefilterd en worden nieuwe kolommen toegevoegd voor IsBike en Totaal.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Een Delta-tabel gebruiken als streaming-sink

De gegevensstroom wordt vervolgens naar een Delta-tabel geschreven:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Notitie

De checkpointLocation optie wordt gebruikt om een controlepuntbestand te schrijven waarmee de status van de stroomverwerking wordt bijgehouden. Met dit bestand kunt u herstellen van een fout op het punt waar de stroomverwerking is gebleven.

Nadat het streamingproces is gestart, kunt u een query uitvoeren op de Delta Lake-tabel om te zien wat er in de uitvoertabel staat. Er kan een korte vertraging optreden voordat u een query kunt uitvoeren op de tabel.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

In de resultaten van deze query wordt order 3005 uitgesloten omdat deze NULL had in de kolom Prijs. En de twee kolommen die tijdens de transformatie zijn toegevoegd, worden weergegeven - IsBike en Totaal.

OrderID OrderDate Customer Product Quantity Prijs IsBike Totaal
3001 2023-09-01 Yang Road Bike Rood 1 1200 1 1200
3002 2023-09-01 Carlson Mountainbike Zilver 1 1500 1 1500
3003 2023-09-02 Wilson Road Bike Geel 2 1350 1 2700
3004 2023-09-02 Yang Weg voorwiel 1 115 0 115

Wanneer u klaar bent, stopt u de streaminggegevens om onnodige verwerkingskosten te voorkomen met behulp van de stop methode:

# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()