Použití rozdílových tabulek se streamovanými daty
Všechna data, která jsme v tomto okamžiku prozkoumali, jsou statická data v souborech. Mnoho scénářů analýzy dat ale zahrnuje streamovaná data, která se musí zpracovávat téměř v reálném čase. Můžete například potřebovat zachytit čtení vygenerovaná zařízeními IoT (Internet-of-things) a uložit je do tabulky, když k nim dojde. Spark zpracovává dávková data a streamovaná data stejným způsobem, což umožňuje zpracování streamovaných dat v reálném čase pomocí stejného rozhraní API.
Strukturované streamování Sparku
Typické řešení zpracování datových proudů zahrnuje:
- Neustále čte datový proud ze zdroje.
- Volitelně můžete data zpracovat a vybrat konkrétní pole, agregovat a seskupit hodnoty nebo jinak manipulovat s daty.
- Zápis výsledků do jímky
Spark zahrnuje nativní podporu streamování dat prostřednictvím strukturovaného streamování Sparku, rozhraní API založené na neomezeném datovém rámci, ve kterém se streamovaná data zaznamenávají ke zpracování. Datový rámec strukturovaného streamování Sparku může číst data z mnoha různých druhů zdroje streamování, včetně:
- Síťové porty
- Služby zprostředkování zpráv v reálném čase, jako je Azure Event Hubs nebo Kafka
- Umístění systému souborů.
Tip
Další informace o strukturovaném streamování Sparku najdete v průvodci programováním strukturovaného streamování v dokumentaci sparku.
Streamování s tabulkami Delta
Tabulku Delta můžete použít jako zdroj nebo jímku pro strukturované streamování Sparku. Můžete například zaznamenat datový proud dat v reálném čase ze zařízení IoT a zapsat stream přímo do tabulky Delta jako jímku. Potom se můžete dotazem na tabulku podívat na nejnovější streamovaná data. Nebo můžete číst Rozdíl jako zdroj streamování, který umožňuje generování sestav téměř v reálném čase, protože do tabulky se přidají nová data.
Použití tabulky Delta jako zdroje streamování
V následujícím příkladu PySpark se vytvoří tabulka Delta pro ukládání podrobností o internetových prodejních objednávkách:
%%sql
CREATE TABLE orders_in
(
OrderID INT,
OrderDate DATE,
Customer STRING,
Product STRING,
Quantity INT,
Price DECIMAL
)
USING DELTA;
Hypotetický datový proud internetových objednávek se vloží do tabulky orders_in:
%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
(3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
(3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
(3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
(3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
(3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);
Pokud to chcete ověřit, můžete číst a zobrazovat data ze vstupní tabulky:
# Read and display the input table
df = spark.read.format("delta").table("orders_in")
display(df)
Data se pak načtou do streamovaného datového rámce z tabulky Delta:
# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.table("orders_in")
Poznámka:
Při použití tabulky Delta jako zdroje streamování je možné do streamu zahrnout pouze operace připojení . Úpravy dat můžou způsobit chybu, pokud nezadáte ignoreChanges
nebo ignoreDeletes
možnost.
Stream můžete zkontrolovat pomocí isStreaming
vlastnosti, která by měla vrátit hodnotu True:
# Verify that the stream is streaming
stream_df.isStreaming
Transformace datového proudu
Po načtení dat z tabulky Delta do streamovaného datového rámce můžete ke zpracování použít rozhraní API strukturovaného streamování Sparku. Můžete například spočítat počet objednávek zadaných každou minutu a odeslat agregované výsledky do podřízeného procesu pro vizualizaci téměř v reálném čase.
V tomto příkladu se vyfiltrují všechny řádky s hodnotou NULL ve sloupci Cena a pro IsBike a Total se přidají nové sloupce.
from pyspark.sql.functions import col, expr
transformed_df = stream_df.filter(col("Price").isNotNull()) \
.withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
.withColumn('Total', expr("Quantity * Price").cast('decimal'))
Použití tabulky Delta jako jímky streamování
Datový proud se pak zapíše do tabulky Delta:
# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)
print("Streaming to orders_processed...")
Poznámka:
Tato checkpointLocation
možnost se používá k zápisu souboru kontrolního bodu, který sleduje stav zpracování datových proudů. Tento soubor vám umožní zotavit se z selhání v okamžiku, kdy zpracování datového proudu skončilo.
Po spuštění procesu streamování můžete dotazovat tabulku Delta Lake a zjistit, co je ve výstupní tabulce. Před dotazem na tabulku může dojít k krátké prodlevě.
%%sql
SELECT *
FROM orders_processed
ORDER BY OrderID;
Ve výsledcích tohoto dotazu je pořadí 3005 vyloučeno, protože má hodnotu NULL ve sloupci Cena. Zobrazí se dva sloupce přidané během transformace – IsBike a Total.
OrderID | OrderDate | Zákazník | Produkt | Množství | Cena | IsBike | Celkem |
---|---|---|---|---|---|---|---|
3001 | 2023-09-01 | Jang | Silniční kolo Červené | 0 | 1200 | 0 | 1200 |
3002 | 2023-09-01 | Carlson | Mountain Bike Silver | 0 | 1500 | 0 | 1500 |
3003 | 2023-09-02 | Wilson | Silniční kolo žlutá | 2 | 1 350 | 0 | 2700 |
3004 | 2023-09-02 | Jang | Silniční přední kolo | 0 | 115 | 0 | 115 |
Po dokončení zastavte streamovaná data, abyste se vyhnuli zbytečným nákladům na stop
zpracování pomocí metody:
# Stop the streaming data to avoid excessive processing costs
deltastream.stop()
Tip
Další informace o používání tabulek Delta pro streamovaná data najdete v tématu Čtení a zápisy streamování tabulek v dokumentaci k Delta Lake.