Delta-tabellen gebruiken met streaminggegevens
Alle gegevens die we tot nu toe hebben verkend, zijn statische gegevens in bestanden. Veel scenario's voor gegevensanalyse omvatten echter streaminggegevens die in bijna realtime moeten worden verwerkt. U moet bijvoorbeeld mogelijk leesbewerkingen vastleggen die worden verzonden door IoT-apparaten (Internet of Things) en deze opslaan in een tabel wanneer ze optreden. Spark verwerkt batchgegevens en streaminggegevens op dezelfde manier, waardoor streaminggegevens in realtime kunnen worden verwerkt met dezelfde API.
Spark Structured Streaming
Een typische oplossing voor stroomverwerking omvat het voortdurend lezen van een gegevensstroom uit een bron, optioneel verwerken om specifieke velden te selecteren, aggregatie- en groepswaarden te selecteren, of de gegevens op een andere manier te manipuleren en de resultaten naar een sink te schrijven.
Spark bevat systeemeigen ondersteuning voor streaminggegevens via Spark Structured Streaming, een API die is gebaseerd op een gebonden dataframe waarin streaminggegevens worden vastgelegd voor verwerking. Een Spark Structured Streaming-gegevensframe kan gegevens lezen uit veel verschillende soorten streamingbron, waaronder:
- Netwerkpoorten
- Realtime berichtenbrokerservices zoals Azure Event Hubs of Kafka
- Bestandssysteemlocaties.
Tip
Zie de Spark-documentatie voor meer informatie over Spark Structured Streaming Programming Guide .
Streamen met Delta-tabellen
U kunt een Delta-tabel gebruiken als bron of sink voor Spark Structured Streaming. U kunt bijvoorbeeld een stroom realtimegegevens van een IoT-apparaat vastleggen en de stream rechtstreeks naar een Delta-tabel schrijven als sink. Vervolgens kunt u een query uitvoeren op de tabel om de meest recente gestreamde gegevens weer te geven. U kunt een Delta ook lezen als streamingbron, waardoor bijna realtime rapportage mogelijk wordt wanneer er nieuwe gegevens aan de tabel worden toegevoegd.
Een Delta-tabel gebruiken als streamingbron
In het volgende PySpark-voorbeeld wordt een Delta-tabel gemaakt om details van internetverkooporders op te slaan:
%%sql
CREATE TABLE orders_in
(
OrderID INT,
OrderDate DATE,
Customer STRING,
Product STRING,
Quantity INT,
Price DECIMAL
)
USING DELTA;
Een hypothetische gegevensstroom van internetorders wordt ingevoegd in de orders_in tabel:
%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
(3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
(3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
(3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
(3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
(3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);
U kunt dit controleren door gegevens uit de invoertabel te lezen en weer te geven:
# Read and display the input table
df = spark.read.format("delta").table("orders_in")
display(df)
De gegevens worden vervolgens vanuit de Delta-tabel geladen in een streaming DataFrame:
# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.table("orders_in")
Notitie
Wanneer u een Delta-tabel als streamingbron gebruikt, kunnen alleen toevoegbewerkingen in de stream worden opgenomen. Gegevenswijzigingen veroorzaken een fout, tenzij u de ignoreChanges
of ignoreDeletes
optie opgeeft.
U kunt controleren of de stream wordt gestreamd met behulp van de isStreaming
eigenschap die Waar moet retourneren:
# Verify that the stream is streaming
stream_df.isStreaming
De gegevensstroom transformeren
Nadat u de gegevens uit de Delta-tabel hebt gelezen in een streaming DataFrame, kunt u de Spark Structured Streaming-API gebruiken om deze te verwerken. U kunt bijvoorbeeld het aantal orders tellen dat elke minuut is geplaatst en de geaggregeerde resultaten verzenden naar een downstreamproces voor bijna realtime visualisatie.
In dit voorbeeld worden rijen met NULL in de kolom Prijs gefilterd en worden nieuwe kolommen toegevoegd voor IsBike en Totaal.
from pyspark.sql.functions import col, expr
transformed_df = stream_df.filter(col("Price").isNotNull()) \
.withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
.withColumn('Total', expr("Quantity * Price").cast('decimal'))
Een Delta-tabel gebruiken als streaming-sink
De gegevensstroom wordt vervolgens naar een Delta-tabel geschreven:
# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)
print("Streaming to orders_processed...")
Notitie
De checkpointLocation
optie wordt gebruikt om een controlepuntbestand te schrijven waarmee de status van de stroomverwerking wordt bijgehouden. Met dit bestand kunt u herstellen van een fout op het punt waar de stroomverwerking is gebleven.
Nadat het streamingproces is gestart, kunt u een query uitvoeren op de Delta Lake-tabel om te zien wat er in de uitvoertabel staat. Er kan een korte vertraging optreden voordat u een query kunt uitvoeren op de tabel.
%%sql
SELECT *
FROM orders_processed
ORDER BY OrderID;
In de resultaten van deze query wordt order 3005 uitgesloten omdat deze NULL had in de kolom Prijs. En de twee kolommen die tijdens de transformatie zijn toegevoegd, worden weergegeven - IsBike en Totaal.
OrderID | OrderDate | Customer | Product | Quantity | Prijs | IsBike | Totaal |
---|---|---|---|---|---|---|---|
3001 | 2023-09-01 | Yang | Road Bike Rood | 1 | 1200 | 1 | 1200 |
3002 | 2023-09-01 | Carlson | Mountainbike Zilver | 1 | 1500 | 1 | 1500 |
3003 | 2023-09-02 | Wilson | Road Bike Geel | 2 | 1350 | 1 | 2700 |
3004 | 2023-09-02 | Yang | Weg voorwiel | 1 | 115 | 0 | 115 |
Wanneer u klaar bent, stopt u de streaminggegevens om onnodige verwerkingskosten te voorkomen met behulp van de stop
methode:
# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()
Tip
Zie Tabelstreaming-lees- en schrijfbewerkingen in de Delta Lake-documentatie voor meer informatie over het gebruik van Delta-tabellen voor streaminggegevens.