ストリーミング データで Delta Lake を使用する

完了

ここまで調べてきたすべてのデータは、ファイル内の静的データでした。 しかし、多くのデータ分析シナリオでは、準リアルタイムで処理する必要がある "ストリーミング" データを使用します。 たとえば、モノのインターネット (IoT) デバイスによって生成された読み取り値をキャプチャし、それらが発生したときにテーブルに格納する必要がある場合があります。

Spark Structured Streaming

一般的なストリーム処理ソリューションでは、"ソース" から絶えずデータのストリームを読み取り、必要に応じてそれを処理して特定のフィールドを選択したり、値を集計およびグループ化したり、データを操作したり、または結果を "シンク" に書き込んだりします。

Spark には、ストリーミング データのネイティブ サポートが、Spark Structured Streaming によって含まれています。これは、ストリーミング データをキャプチャして処理する境界のないデータフレームに基づく API です。 Spark Structured Streaming データフレームでは、ネットワーク ポート、Azure Event Hubs や Kafka などのリアルタイム メッセージ ブローカー サービス、ファイル システムの場所など、さまざまな種類のストリーミング ソースからデータを読み取ることができます。

ヒント

Spark Structured Streaming について詳しくは、Spark ドキュメントの「Structured Streaming Programming Guide (Structured Streaming プログラミング ガイド)」を参照してください。

Delta Lake テーブルを使用したストリーミング

Delta Lake テーブルは、Spark Structured Streaming のソースまたはシンクとして使用できます。 たとえば、IoT デバイスからリアルタイム データのストリームをキャプチャし、ストリームをシンクとしての Delta Lake テーブルに直接書き込めば、テーブルにクエリを実行して最新のストリーミング データを表示できます。 または、Delta テーブルをストリーミング ソースとして読み取ると、テーブルに追加された新しいデータを絶えずレポートできます。

Delta Lake テーブルをストリーミング ソースとして使用する

次の PySpark の例では、Delta Lake テーブルを使ってインターネット販売注文の詳細を格納します。 新しいデータが追加されると、Delta Lake テーブル フォルダーからデータを読み取るストリームが作成されます。

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .load("/delta/internetorders")

# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

Note

Delta Lake テーブルをストリーミング ソースとして使用する場合、"追加" 操作のみをストリームに含めることができます。 ignoreChanges または ignoreDeletes オプションを指定しない限り、データを変更するとエラーが発生します。

Delta Lake テーブルからストリーミング データフレームにデータを読み取った後は、Spark Structured Streaming API を使って処理できます。 上記の例では、データフレームはシンプルに表示されます。しかし、Spark Structured Streaming を使用して、テンポラル ウィンドウにわたるデータを集計し (たとえば、1 分ごとに発注された注文数をカウントするなど)、集計された結果をダウンストリーム プロセスに送信して、準リアルタイムで視覚化することもできます。

Delta Lake テーブルをストリーミング シンクとして使用する

次の PySpark の例では、フォルダー内の JSON ファイルからデータのストリームが読み取られます。 各ファイルの JSON データには、IoT デバイスの状態が {"device":"Dev1","status":"ok"} という形式で含まれます。フォルダーにファイルが追加されるたびに、ストリームに新しいデータが追加されます。 入力ストリームは境界のないデータフレームであり、Delta Lake テーブルのフォルダーの場所にデルタ形式で書き込まれます。

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
    StructField("device", StringType(), False),
    StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)

Note

checkpointLocation オプションは、ストリーム処理の状態を追跡するチェックポイント ファイルを書き込むために使用されます。 このファイルを使用すると、ストリーム処理が中断された時点で障害から復旧できます。

ストリーミング処理が開始されたら、ストリーミングの出力が書き込まれている Delta Lake テーブルに対してクエリを実行し、最新のデータを確認できます。 たとえば、次のコードでは、Delta Lake テーブル フォルダーのカタログ テーブルを作成し、クエリを実行します。

%%sql

CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';

SELECT device, status
FROM DeviceTable;

Delta Lake テーブルに書き込まれているデータのストリームを停止するには、ストリーミング クエリの stop メソッドを使用します。

delta_stream.stop()

ヒント

データのストリーミングに Delta Lake テーブルを使用する方法について詳しくは、Delta Lake ドキュメントの「Table streaming reads and writes (テーブル ストリーミングの読み取りと書き込み)」を参照してください。