Använda deltatabeller med strömmande data
Alla data som vi har utforskat fram till nu har varit statiska data i filer. Många dataanalysscenarier omfattar dock strömmande data som måste bearbetas nästan i realtid. Du kan till exempel behöva samla in avläsningar som genereras av IoT-enheter (Internet-of-things) och lagra dem i en tabell när de inträffar. Spark bearbetar batchdata och strömmande data på samma sätt, vilket gör att strömmande data kan bearbetas i realtid med samma API.
Spark Structured Streaming
En typisk dataströmbearbetningslösning innebär att ständigt läsa en dataström från en källa, eventuellt bearbeta den för att välja specifika fält, aggregera och gruppera värden eller på annat sätt manipulera data och skriva resultatet till en mottagare.
Spark har inbyggt stöd för strömmande data via Spark Structured Streaming, ett API som baseras på en gränslös dataram där strömmande data samlas in för bearbetning. En Spark Structured Streaming-dataram kan läsa data från många olika typer av strömningskällor, inklusive:
- Nätverksportar
- Koordinatortjänster för realtidsmeddelanden, till exempel Azure Event Hubs eller Kafka
- Filsystemplatser.
Dricks
Mer information om Spark Structured Streaming finns i Programmeringsguide för strukturerad direktuppspelning i Spark-dokumentationen.
Direktuppspelning med Delta-tabeller
Du kan använda en Delta-tabell som källa eller mottagare för Spark Structured Streaming. Du kan till exempel samla in en ström med realtidsdata från en IoT-enhet och skriva strömmen direkt till en Delta-tabell som mottagare. Du kan sedan fråga tabellen för att se de senaste strömmade data. Eller så kan du läsa ett Delta som en strömmande källa, vilket möjliggör rapportering i nära realtid när nya data läggs till i tabellen.
Använda en Delta-tabell som en strömmande källa
I följande PySpark-exempel skapas en Delta-tabell för att lagra information om internetförsäljningsbeställningar:
%%sql
CREATE TABLE orders_in
(
OrderID INT,
OrderDate DATE,
Customer STRING,
Product STRING,
Quantity INT,
Price DECIMAL
)
USING DELTA;
En hypotetisk dataström med internetbeställningar infogas i tabellen orders_in:
%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
(3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
(3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
(3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
(3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
(3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);
För att verifiera kan du läsa och visa data från indatatabellen:
# Read and display the input table
df = spark.read.format("delta").table("orders_in")
display(df)
Data läses sedan in i en strömmande DataFrame från Delta-tabellen:
# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.table("orders_in")
Kommentar
När du använder en Delta-tabell som en strömmande källa kan endast tilläggsåtgärder inkluderas i strömmen. Dataändringar orsakar ett fel om du inte anger ignoreChanges
alternativet eller ignoreDeletes
.
Du kan kontrollera att strömmen strömmas med hjälp av egenskapen isStreaming
som ska returnera True:
# Verify that the stream is streaming
stream_df.isStreaming
Transformera dataströmmen
När du har läst data från Delta-tabellen i en strömmande DataFrame kan du använda Spark Structured Streaming API för att bearbeta dem. Du kan till exempel räkna antalet beställningar som görs varje minut och skicka de aggregerade resultaten till en nedströmsprocess för nästan realtidsvisualisering.
I det här exemplet filtreras alla rader med NULL i kolumnen Pris och nya kolumner läggs till för IsBike och Total.
from pyspark.sql.functions import col, expr
transformed_df = stream_df.filter(col("Price").isNotNull()) \
.withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
.withColumn('Total', expr("Quantity * Price").cast('decimal'))
Använda en Delta-tabell som en direktuppspelningsmottagare
Dataströmmen skrivs sedan till en Delta-tabell:
# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)
print("Streaming to orders_processed...")
Kommentar
Alternativet checkpointLocation
används för att skriva en kontrollpunktsfil som spårar dataströmbearbetningens tillstånd. Med den här filen kan du återställa från fel vid den tidpunkt då dataströmbearbetningen slutade fungera.
När strömningsprocessen har startat kan du fråga Delta Lake-tabellen för att se vad som finns i utdatatabellen. Det kan uppstå en kort fördröjning innan du kan köra frågor mot tabellen.
%%sql
SELECT *
FROM orders_processed
ORDER BY OrderID;
I resultatet av den här frågan undantas order 3005 eftersom den hade NULL i kolumnen Pris. Och de två kolumnerna som lades till under omvandlingen visas – IsBike och Total.
OrderID | OrderDate | Kund | Produkt | Quantity | Pris | IsBike | Totalt |
---|---|---|---|---|---|---|---|
3001 | 2023-09-01 | Yang | Road Bike Red | 1 | 1200 | 1 | 1200 |
3002 | 2023-09-01 | Carlson | Mountain Bike Silver | 1 | 1500 | 1 | 1500 |
3003 | 2023-09-02 | Wilson | Vägcykel gul | 2 | 1350 | 1 | 2700 |
3004 | 2023-09-02 | Yang | Framhjul för väg | 1 | 115 | 0 | 115 |
När du är klar stoppar du strömmande data för att undvika onödiga bearbetningskostnader med hjälp av stop
metoden:
# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()
Dricks
Mer information om hur du använder Delta-tabeller för strömmande data finns i Läsa och skriva tabelluppspelning i Delta Lake-dokumentationen.