ストリーミング データにデルタ テーブルを使用する
ここまで調べてきたすべてのデータは、ファイル内の静的データでした。 しかし、多くのデータ分析シナリオでは、準リアルタイムで処理する必要がある "ストリーミング" データを使用します。 たとえば、モノのインターネット (IoT) デバイスによって生成された読み取り値をキャプチャし、それらが発生したときにテーブルに格納する必要がある場合があります。 Spark はバッチ データとストリーミング データを同じ方法で処理するため、同じ API を使用してストリーミング データをリアルタイムで処理できます。
Spark Structured Streaming
一般的なストリーム処理ソリューションには、次が含まれます。
- "ソース" から絶えずデータのストリームを読み取ります。
- 必要に応じてデータを処理して特定のフィールドを選択したり、値を集計およびグループ化したり、またはデータを操作したりします。
- 結果を "シンク" に書き込みます。
Spark には、ストリーミング データのネイティブ サポートが、Spark Structured Streaming によって含まれています。これは、ストリーミング データをキャプチャして処理する境界のないデータフレームに基づく API です。 Spark Structured Streaming データフレームは、次のようなさまざまなストリーミング ソースからデータを読み取ることができます。
- ネットワーク ポート
- Azure Event Hubs や Kafka などのリアル タイム メッセージ ブローカー サービス
- ファイル システムの場所。
ヒント
Spark Structured Streaming について詳しくは、Spark ドキュメントの「Structured Streaming Programming Guide (Structured Streaming プログラミング ガイド)」を参照してください。
Delta テーブルを使用したストリーミング
Delta テーブルは、Spark Structured Streaming の "ソース" または "シンク" として使用できます。 たとえば、IoT デバイスからリアル タイム データのストリームを取り込み、そのストリームをシンクとして Delta テーブルに直接書き込むことができます。 次に、テーブルに対してクエリを実行し、最新のストリーミング データを確認できます。 または、ストリーミング ソースとして差分を読み取り、新しいデータがテーブルに追加されたときに準リアルタイムのレポートを実現することもできます。
Delta テーブルをストリーミング ソースとして使用する
次の PySpark の例では、インターネット販売注文の詳細を格納する Delta テーブルが作成されます。
%%sql
CREATE TABLE orders_in
(
OrderID INT,
OrderDate DATE,
Customer STRING,
Product STRING,
Quantity INT,
Price DECIMAL
)
USING DELTA;
インターネット注文の架空のデータ ストリームが orders_in テーブルに挿入されます。
%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
(3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
(3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
(3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
(3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
(3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);
確認するには、入力テーブルからデータを読み取り、表示します。
# Read and display the input table
df = spark.read.format("delta").table("orders_in")
display(df)
次に、データは Delta テーブルからストリーミング DataFrame に読み込まれます。
# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.table("orders_in")
Note
Delta テーブルをストリーミング ソースとして使用する場合、"追加" 操作のみをストリームに含めることができます。 ignoreChanges
または ignoreDeletes
オプションを指定しない限り、データを変更するとエラーが発生する可能性があります。
isStreaming
プロパティを使用して、ストリームがストリーミングしていることを確認できます。これで True が返されるはずです。
# Verify that the stream is streaming
stream_df.isStreaming
データ ストリームを変換する
Delta テーブルからストリーミング DataFrame にデータを読み取った後は、Spark Structured Streaming API を使って処理できます。 たとえば、1 分ごとに行われた注文の数をカウントし、集計結果をダウンストリームのプロセスに送信して、準リアルタイムで視覚化できます。
この例では、Price 列に NULL が含まれる行がフィルター処理され、IsBike と Total の新しい列が追加されます。
from pyspark.sql.functions import col, expr
transformed_df = stream_df.filter(col("Price").isNotNull()) \
.withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
.withColumn('Total', expr("Quantity * Price").cast('decimal'))
Delta テーブルをストリーミング シンクとして使用する
次に、データ ストリームは Delta テーブルに書き込まれます。
# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)
print("Streaming to orders_processed...")
Note
checkpointLocation
オプションは、ストリーム処理の状態を追跡するチェックポイント ファイルを書き込むために使用されます。 このファイルを使用すると、ストリーム処理が中断された時点で障害から復旧できます。
ストリーミング プロセスが始まったら、Delta Lake テーブルに対してクエリを実行し、出力テーブルの内容を確認できます。 テーブルに対してクエリを実行できるようになるまでに少し時間がかかる場合があります。
%%sql
SELECT *
FROM orders_processed
ORDER BY OrderID;
このクエリの結果では、注文 3005 は、Price 列に NULL が含まれていたため除外されています。 そして、変換中に追加された 2 列 (IsBike と Total) が表示されます。
OrderID | OrderDate | Customer | Product | Quantity | 価格 | IsBike | トータル |
---|---|---|---|---|---|---|---|
3001 | 2023-09-01 | Yang | Road Bike Red | 1 | 1200 | 1 | 1200 |
3002 | 2023-09-01 | Carlson | Mountain Bike Silver | 1 | 1500 | 1 | 1500 |
3003 | 2023-09-02 | Wilson | Road Bike Yellow | 2 | 1350 | 1 | 2700 |
3004 | 2023-09-02 | Yang | Road Front Wheel | 1 | 115 | 0 | 115 |
完了したら、不要な処理コストが生じないように、stop
メソッドを使用してストリーミング データを停止します。
# Stop the streaming data to avoid excessive processing costs
deltastream.stop()
ヒント
データのストリーミングに Delta テーブルを使用する方法の詳細については、Delta Lake ドキュメントの「テーブル ストリーミングの読み取りと書き込み」を参照してください。