ストリーミング データにデルタ テーブルを使用する

完了

ここまで調べてきたすべてのデータは、ファイル内の静的データでした。 しかし、多くのデータ分析シナリオでは、準リアルタイムで処理する必要がある "ストリーミング" データを使用します。 たとえば、モノのインターネット (IoT) デバイスによって生成された読み取り値をキャプチャし、それらが発生したときにテーブルに格納する必要がある場合があります。 Spark はバッチ データとストリーミング データを同じ方法で処理するため、同じ API を使用してストリーミング データをリアルタイムで処理できます。

Spark Structured Streaming

一般的なストリーム処理ソリューションには、次が含まれます。

  • "ソース" から絶えずデータのストリームを読み取ります。
  • 必要に応じてデータを処理して特定のフィールドを選択したり、値を集計およびグループ化したり、またはデータを操作したりします。
  • 結果を "シンク" に書き込みます。

Spark には、ストリーミング データのネイティブ サポートが、Spark Structured Streaming によって含まれています。これは、ストリーミング データをキャプチャして処理する境界のないデータフレームに基づく API です。 Spark Structured Streaming データフレームは、次のようなさまざまなストリーミング ソースからデータを読み取ることができます。

  • ネットワーク ポート
  • Azure Event Hubs や Kafka などのリアル タイム メッセージ ブローカー サービス
  • ファイル システムの場所。

ヒント

Spark Structured Streaming について詳しくは、Spark ドキュメントの「Structured Streaming Programming Guide (Structured Streaming プログラミング ガイド)」を参照してください。

Delta テーブルを使用したストリーミング

Delta テーブルは、Spark Structured Streaming の "ソース" または "シンク" として使用できます。 たとえば、IoT デバイスからリアル タイム データのストリームを取り込み、そのストリームをシンクとして Delta テーブルに直接書き込むことができます。 次に、テーブルに対してクエリを実行し、最新のストリーミング データを確認できます。 または、ストリーミング ソースとして差分を読み取り、新しいデータがテーブルに追加されたときに準リアルタイムのレポートを実現することもできます。

Delta テーブルをストリーミング ソースとして使用する

次の PySpark の例では、インターネット販売注文の詳細を格納する Delta テーブルが作成されます。

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

インターネット注文の架空のデータ ストリームが orders_in テーブルに挿入されます。

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

確認するには、入力テーブルからデータを読み取り、表示します。

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

次に、データは Delta テーブルからストリーミング DataFrame に読み込まれます。

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Note

Delta テーブルをストリーミング ソースとして使用する場合、"追加" 操作のみをストリームに含めることができます。 ignoreChanges または ignoreDeletes オプションを指定しない限り、データを変更するとエラーが発生する可能性があります。

isStreaming プロパティを使用して、ストリームがストリーミングしていることを確認できます。これで True が返されるはずです。

# Verify that the stream is streaming
stream_df.isStreaming

データ ストリームを変換する

Delta テーブルからストリーミング DataFrame にデータを読み取った後は、Spark Structured Streaming API を使って処理できます。 たとえば、1 分ごとに行われた注文の数をカウントし、集計結果をダウンストリームのプロセスに送信して、準リアルタイムで視覚化できます。

この例では、Price 列に NULL が含まれる行がフィルター処理され、IsBike と Total の新しい列が追加されます。

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Delta テーブルをストリーミング シンクとして使用する

次に、データ ストリームは Delta テーブルに書き込まれます。

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Note

checkpointLocation オプションは、ストリーム処理の状態を追跡するチェックポイント ファイルを書き込むために使用されます。 このファイルを使用すると、ストリーム処理が中断された時点で障害から復旧できます。

ストリーミング プロセスが始まったら、Delta Lake テーブルに対してクエリを実行し、出力テーブルの内容を確認できます。 テーブルに対してクエリを実行できるようになるまでに少し時間がかかる場合があります。

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

このクエリの結果では、注文 3005 は、Price 列に NULL が含まれていたため除外されています。 そして、変換中に追加された 2 列 (IsBike と Total) が表示されます。

OrderID OrderDate Customer Product Quantity 価格 IsBike トータル
3001 2023-09-01 Yang Road Bike Red 1 1200 1 1200
3002 2023-09-01 Carlson Mountain Bike Silver 1 1500 1 1500
3003 2023-09-02 Wilson Road Bike Yellow 2 1350 1 2700
3004 2023-09-02 Yang Road Front Wheel 1 115 0 115

完了したら、不要な処理コストが生じないように、stop メソッドを使用してストリーミング データを停止します。

# Stop the streaming data to avoid excessive processing costs
deltastream.stop()

ヒント

データのストリーミングに Delta テーブルを使用する方法の詳細については、Delta Lake ドキュメントの「テーブル ストリーミングの読み取りと書き込み」を参照してください。