Použití rozdílových tabulek se streamovanými daty

Dokončeno

Všechna data, která jsme v tomto okamžiku prozkoumali, jsou statická data v souborech. Mnoho scénářů analýzy dat ale zahrnuje streamovaná data, která se musí zpracovávat téměř v reálném čase. Můžete například potřebovat zachytit čtení vygenerovaná zařízeními IoT (Internet-of-things) a uložit je do tabulky, když k nim dojde. Spark zpracovává dávková data a streamovaná data stejným způsobem, což umožňuje zpracování streamovaných dat v reálném čase pomocí stejného rozhraní API.

Strukturované streamování Sparku

Typické řešení zpracování datových proudů zahrnuje:

  • Neustále čte datový proud ze zdroje.
  • Volitelně můžete data zpracovat a vybrat konkrétní pole, agregovat a seskupit hodnoty nebo jinak manipulovat s daty.
  • Zápis výsledků do jímky

Spark zahrnuje nativní podporu streamování dat prostřednictvím strukturovaného streamování Sparku, rozhraní API založené na neomezeném datovém rámci, ve kterém se streamovaná data zaznamenávají ke zpracování. Datový rámec strukturovaného streamování Sparku může číst data z mnoha různých druhů zdroje streamování, včetně:

  • Síťové porty
  • Služby zprostředkování zpráv v reálném čase, jako je Azure Event Hubs nebo Kafka
  • Umístění systému souborů.

Tip

Další informace o strukturovaném streamování Sparku najdete v průvodci programováním strukturovaného streamování v dokumentaci sparku.

Streamování s tabulkami Delta

Tabulku Delta můžete použít jako zdroj nebo jímku pro strukturované streamování Sparku. Můžete například zaznamenat datový proud dat v reálném čase ze zařízení IoT a zapsat stream přímo do tabulky Delta jako jímku. Potom se můžete dotazem na tabulku podívat na nejnovější streamovaná data. Nebo můžete číst Rozdíl jako zdroj streamování, který umožňuje generování sestav téměř v reálném čase, protože do tabulky se přidají nová data.

Použití tabulky Delta jako zdroje streamování

V následujícím příkladu PySpark se vytvoří tabulka Delta pro ukládání podrobností o internetových prodejních objednávkách:

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

Hypotetický datový proud internetových objednávek se vloží do tabulky orders_in:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

Pokud to chcete ověřit, můžete číst a zobrazovat data ze vstupní tabulky:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

Data se pak načtou do streamovaného datového rámce z tabulky Delta:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Poznámka:

Při použití tabulky Delta jako zdroje streamování je možné do streamu zahrnout pouze operace připojení . Úpravy dat můžou způsobit chybu, pokud nezadáte ignoreChanges nebo ignoreDeletes možnost.

Stream můžete zkontrolovat pomocí isStreaming vlastnosti, která by měla vrátit hodnotu True:

# Verify that the stream is streaming
stream_df.isStreaming

Transformace datového proudu

Po načtení dat z tabulky Delta do streamovaného datového rámce můžete ke zpracování použít rozhraní API strukturovaného streamování Sparku. Můžete například spočítat počet objednávek zadaných každou minutu a odeslat agregované výsledky do podřízeného procesu pro vizualizaci téměř v reálném čase.

V tomto příkladu se vyfiltrují všechny řádky s hodnotou NULL ve sloupci Cena a pro IsBike a Total se přidají nové sloupce.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Použití tabulky Delta jako jímky streamování

Datový proud se pak zapíše do tabulky Delta:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Poznámka:

Tato checkpointLocation možnost se používá k zápisu souboru kontrolního bodu, který sleduje stav zpracování datových proudů. Tento soubor vám umožní zotavit se z selhání v okamžiku, kdy zpracování datového proudu skončilo.

Po spuštění procesu streamování můžete dotazovat tabulku Delta Lake a zjistit, co je ve výstupní tabulce. Před dotazem na tabulku může dojít k krátké prodlevě.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

Ve výsledcích tohoto dotazu je pořadí 3005 vyloučeno, protože má hodnotu NULL ve sloupci Cena. Zobrazí se dva sloupce přidané během transformace – IsBike a Total.

OrderID OrderDate Zákazník Produkt Množství Cena IsBike Celkem
3001 2023-09-01 Jang Silniční kolo Červené 0 1200 0 1200
3002 2023-09-01 Carlson Mountain Bike Silver 0 1500 0 1500
3003 2023-09-02 Wilson Silniční kolo žlutá 2 1 350 0 2700
3004 2023-09-02 Jang Silniční přední kolo 0 115 0 115

Po dokončení zastavte streamovaná data, abyste se vyhnuli zbytečným nákladům na stop zpracování pomocí metody:

# Stop the streaming data to avoid excessive processing costs
deltastream.stop()

Tip

Další informace o používání tabulek Delta pro streamovaná data najdete v tématu Čtení a zápisy streamování tabulek v dokumentaci k Delta Lake.