Spark 構造化ストリーミングを使用してレイクハウスにストリーミング データを取得する
構造化ストリーミングは、Spark を基盤とするスケーラブルなフォールト トレラント ストリーム処理エンジンです。 Spark は、データの到着が続くにつれて、ストリーミング操作を段階的かつ継続的に実行します。
Spark 2.2 で構造化ストリーミングが利用可能になりました。 それ以来、データ ストリーミングに推奨されるアプローチとなっています。 構造化ストリームの背後にある基本的な原則は、ライブ データ ストリームを、テーブル内の新しい行のように、常に新しいデータが継続的に追加されるテーブルとして扱うことです。 CSV、JSON、ORC、Parquet などの定義済みの組み込みストリーミング ファイル ソースと、Kafka や Event Hubs などのメッセージング サービスの組み込みサポートがあります。
この記事では、高スループットの運用環境で Spark 構造化ストリーミングを使用してイベントの処理と取り込みを最適化する方法について説明します。 推奨されるアプローチをいくつか次に示します。
- データ ストリーミング スループットの最適化
- デルタ テーブルでの書き込み操作の最適化と
- イベントのバッチ処理
Spark ジョブ定義と Spark ノートブック
Spark ノートブックは、アイデアを検証し、データやコードから分析情報を得るための実験を行うための優れたツールです。 ノートブックは、データの準備、視覚化、機械学習、およびその他のビッグ データのシナリオでも広く使用されています。 Spark ジョブ定義は、Spark クラスターで長時間実行されている非対話型のコード指向タスクです。 Spark ジョブ定義は、堅牢性と可用性を提供します。
Spark ノートブックは、コードのロジックをテストし、すべてのビジネス要件に対処するための優れたソースです。 ただし、運用環境のシナリオで実行し続けるために、再試行ポリシーが有効になっている Spark ジョブ定義が最適なソリューションです。
Spark ジョブ定義の再試行ポリシー
Microsoft Fabric では、ユーザーは Spark ジョブ定義ジョブの再試行ポリシーを設定できます。 ジョブ内のスクリプトは無限である可能性がありますが、スクリプトを実行しているインフラストラクチャでは、ジョブの停止が必要な問題が発生する可能性があります。 または、基になるインフラストラクチャの修正プログラムの適用ニーズにより、ジョブを削除することもできます。 再試行ポリシーを使用すると、基になる問題が原因でジョブが停止した場合に、ジョブを自動的に再起動するためのルールを設定できます。 パラメーターは、ジョブを再起動する頻度、最大無限の再試行、再試行間隔の設定を指定します。 これにより、ユーザーは、ユーザーが停止するまで Spark ジョブ定義ジョブを無限に実行し続けることができます。
ストリーミング ソース
Event Hubs でストリーミングを設定するには、基本的な構成が必要です。これには、Event Hubs 名前空間名、ハブ名、共有アクセス キー名、コンシューマー グループが含まれます。 コンシューマー グループは、イベント ハブ全体のビューです。 こうすることで、複数の使用アプリケーションが、Eventstream の個別のビューをそれぞれ保有し、独自のペースで独自のオフセットによってストリームを別々に読み取ることができます。
パーティションは、大量のデータを処理できるようにするために不可欠な部分です。 1 つのプロセッサの 1 秒あたりのイベント処理容量は限られていますが、複数のプロセッサは並列で実行するとより優れたジョブを実行できます。 パーティションを使用すると、大量のイベントを並列で処理できます。
インジェスト率の低いパーティションが多すぎると、パーティション リーダーはこのデータのごく一部を処理し、最適でない処理を引き起こします。 パーティションの理想的な数は、必要な処理速度によって直接左右されます。 イベント処理のスケール設定を希望している場合は、パーティションを追加したほうが良い場合があります。 パーティションに特定のスループット制限はありません。 しかし、名前空間の総スループットは、スループット ユニットの数によって制限されます。 名前空間内のスループット ユニットの数を増やすときは、独自の最大スループットを実現するために、同時読み取りを許可するための追加のパーティションが必要になる場合があります。
推奨される方法は、スループット シナリオに最適なパーティション数を調査してテストすることです。 ただし、32 以上のパーティションを使用して高スループットのシナリオを見るのはよくあることです。
Spark アプリケーションを Azure Event Hubs に接続するには、Apache Spark 用の Azure Event Hubs コネクタ (azure-event-hubs-spark) をお勧めします。
ストリーミング シンクとしてのレイクハウス
Delta Lake は、データ レイク ストレージ ソリューションの上に ACID (原子性、整合性、独立性、持続性) トランザクションを提供するオープンソース ストレージ レイヤーです。 Delta Lake では、スケーラブルなメタデータ処理、スキーマの進化、タイム トラベル (データのバージョン管理)、オープン形式、その他の機能もサポートされています。
Fabric Data Engineering では、Delta Lake は次の用途に使用されます。
- Spark SQL を使用してデータのアップサート (挿入/更新) と削除を簡単に行うことができます。
- データのクエリに費やされる時間を最小限に抑えるためにデータを圧縮します。
- 操作の実行前と実行後のテーブルの状態を表示します。
- テーブルに対して実行された操作の履歴を取得します。
Delta は、writeStream で使用可能な出力シンク形式の 1 つとして追加されます。 既存の出力シンクの詳細については、「Spark Structured Streaming Programming Guide」を参照してください。
次の例では、Delta Lake にデータをストリーミングする方法を示します。
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
Schema = StructType([StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True)])
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.toTable("deltaeventstable")
例で切り取られたコードについて:
- format() は、データの出力形式を定義する命令です。
- outputMode() は、ストリーミング内の新しい行の書き込み方法 (つまり、追加、上書き) を定義します。
- toTable() は、パラメーターとして渡された値を使用して作成された Delta テーブルに、ストリーミングされたデータを保持します。
Delta 書き込みの最適化
データパーティション分割は、堅牢なストリーミング ソリューションを作成する上で重要な部分です。パーティション分割により、データがよく整理され、スループットも向上します。 Delta 操作後にファイルが簡単に断片化され、小さなファイルが過多になります。 また、ディスクに書き込む時間が長いため、大きすぎるファイルも問題になります。 データパーティション分割の課題は、最適なファイル サイズを生み出す適切なバランスを見つけることです。 Spark では、メモリとディスクでのパーティション分割がサポートされています。 適切にパーティション分割されたデータは、Delta Lake にデータを保持し、Delta Lake からのデータに対してクエリを実行するときに最適なパフォーマンスを提供できます。
- ディスク上のデータをパーティション分割する場合は、 partitionBy() を使用して列に基づいてデータをパーティション分割する方法を選択できます。 partitionBy() は、ディスクへの書き込み中に提供される 1 つまたは複数の列に基づいて、大きなセマンティック モデルをより小さなファイルにパーティション分割するために使用される関数です。 パーティション分割は、大規模なセマンティック モデルを操作するときのクエリのパフォーマンスを向上させる方法です。 小さすぎる、または大きすぎるパーティションを生成する列を選択しないでください。 カーディナリティの良い列のセットに基づいてパーティションを定義し、データを最適なサイズのファイルに分割します。
- メモリ内のデータのパーティション分割は、repartition() または coalesce() 変換を使用して行い、複数のワーカー ノードにデータを分散し、回復性がある分散データセット (RDD) の基礎を使用してデータを並列に読み取って処理できる複数のタスクを作成します。 これにより、セマンティック モデルを論理パーティションに分割できます。これは、クラスターのさまざまなノードで計算できます。
- repartition() は、メモリ内のパーティションの数を増減するために使用されます。 再パーティション分割では、ネットワーク経由でデータ全体が再シャッフルされ、すべてのパーティションに分散されます。
- coalesce() は、パーティションの数を効率的に減らすためだけに使用されます。 これは最適化されたバージョンの repartition() です。coalesce() を使用すると、すべてのパーティション間でのデータの移動が低くなります。
両方のパーティション分割アプローチを組み合わせることは、高スループットのシナリオで適したソリューションです。 repartition() は メモリ内に特定の数のパーティションを作成し、 partitionBy() はメモリ パーティションとパーティション分割列ごとにファイルをディスクに書き込みます。 次の例は、同じ Spark ジョブでの両方のパーティション分割戦略の使用方法を示しています。データは、最初にメモリ内の 48 個のパーティションに分割され (合計 48 個の CPU コアがある場合)、ペイロード内の 2 つの既存の列に基づいてディスクにパーティション分割されます。
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
最適化された書き込み
Delta Lake への書き込みを最適化するもう 1 つのオプションは、最適化された書き込みを使用することです。 最適化された書き込みは、Delta テーブルへのデータの書き込み方法を改善するオプションの機能です。 Spark は、データを書き込む前にパーティションを結合または分割し、ディスクに書き込まれるデータのスループットを最大化します。 ただし、フル シャッフルが発生するため、一部のワークロードではパフォーマンスが低下する可能性があります。 coalesce() や repartition() を使用してディスク上のデータをパーティション分割するジョブは、代わりに最適化された書き込みの使用を開始するようにリファクタリングできます。
次のコードは、最適化された書き込みの使用例です。 partitionBy() は引き続き使用されることに注意してください。
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true)
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
イベントのバッチ処理
Delta Lake へのデータの取り込みに費やされる時間を短縮するための操作の数を最小限に抑えるために、イベントのバッチ処理は実用的な代替手段です。
トリガーは、ストリーミング クエリを実行 (トリガー) して、新しいデータを出力する頻度を定義します。 これらを設定すると、マイクロバッチの定期的な処理時間のサイクル間隔が定義され、常にディスクに書き込みを行うのではなく、データが蓄積され、イベントがいくつかの永続的な操作にバッチ処理されます。
次の例は、イベントが 1 分間隔で定期的に処理されるストリーミング クエリを示しています。
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(f.from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.trigger(processingTime="1 minute") \
.toTable("deltaeventstable")
Delta テーブル書き込み操作でイベントのバッチ処理を組み合わせる利点は、より多くのデータを含む大きな Delta ファイルを作成し、小さなファイルを回避できることです。 取り込まれるデータの量を分析し、Delta ライブラリによって作成された Parquet ファイルのサイズを最適化するための最適な処理時間を見つける必要があります。
監視
Spark 3.1 以降のバージョンには、次のストリーミング メトリックを含む構造化ストリーミング UI が組み込まれています。
- 入力レート
- プロセスレート
- 入力行
- バッチ期間
- オペレーション期間
関連するコンテンツ
- レイクハウスにストリーミング データを送り SQL 分析エンドポイントを使用してアクセスします。