編集

次の方法で共有


Azure Databricks によるストリーム処理

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

この参照アーキテクチャでは、エンド ツー エンドのストリーム処理パイプラインを示します。 このパイプラインの 4 つの段階は、取り込み、処理、格納、および分析とレポートです。 この参照アーキテクチャでは、パイプラインは、2 つのソースからデータを取り込み、各ストリームの関連するレコードに対して結合を実行し、結果を強化させ、リアルタイムで平均を計算します。 結果は、さらに分析するために格納されます。

GitHub logo このアーキテクチャの参照実装は、 GitHub で入手できます。

アーキテクチャ

Azure Databricks を使用したストリーム処理の参照アーキテクチャを示す図。

このアーキテクチャの Visio ファイル をダウンロードします。

ワークフロー

次のデータフローは、前の図に対応しています。

  1. このアーキテクチャには、リアルタイムでデータ ストリームを生成する 2 つのデータ ソースがあります。 最初のストリームには乗車情報が含まれており、2 番目のストリームには料金情報が含まれます。 参照アーキテクチャには、一連の静的ファイルから読み取り、データを Azure Event Hubs にプッシュするシミュレートされたデータ ジェネレーターが含まれています。 実際のアプリケーションのデータ ソースは、タクシーにインストールされているデバイスです。

  2. Event Hubs は、イベント インジェスト サービスです。 このアーキテクチャでは、2 つのイベント ハブ インスタンス (データ ソースごとに 1 つ) を使用します。 各データ ソースは、関連付けられたイベント ハブにデータ ストリームを送信します。

  3. Azure Databricks は、Microsoft Azure クラウド サービス プラットフォーム用に最適化された Apache Spark ベースの分析プラットフォームです。 Azure Databricks は、タクシーの乗車データと料金データを関連付け、関連付けられたデータを Azure Databricks ファイル システムに格納されている近隣データにエンリッチするために使用されます。

  4. Azure Cosmos DB は、フル マネージドのマルチモデル データベース サービスです。 Azure Databricks ジョブの出力は一連のレコードであり、 Azure Cosmos DB for Apache Cassandraに書き込まれます。 Azure Cosmos DB for Apache Cassandra が使用されるのは、時系列データ モデリングをサポートしているためです。

    • Azure Synapse Link for Azure Cosmos DBすると、トランザクション ワークロードにパフォーマンスやコストの影響を与えることなく、Azure Cosmos DB の運用データに対してほぼリアルタイムで分析を実行できます。 これらの結果を得るには、サーバーレス SQL プール を使用し、Spark プール します。 これらの分析エンジンは、Azure Synapse Analytics ワークスペースから入手できます。

    • Microsoft Fabricで Azure Cosmos DB for NoSQL をミラーリング、Azure Cosmos DB データを Microsoft Fabric の残りのデータと統合できます。

  5. Log Analytics は、さまざまなソースからのログ データのクエリと分析を行う Azure Monitor 内のツールです。 Azure Monitor 収集されるアプリケーション ログ データは、Log Analytics ワークスペースに格納されます。 Log Analytics クエリを使用してメトリックを分析および視覚化し、ログ メッセージを検査してアプリケーション内の問題を特定できます。

シナリオの詳細

タクシー会社は、各タクシー乗車に関するデータを収集します。 このシナリオでは、2 つの個別のデバイスがデータを送信することを前提としています。 タクシーには、所要時間、距離、乗車場所と降車場所など、各乗車に関する情報を送信するメーターがあります。 別のデバイスでは、乗客からの支払いを受け付け、料金に関するデータを送信します。 タクシー会社は、ライダーの傾向を特定するために、各近隣の 1 マイルあたりの平均チップをリアルタイムで計算したいと考えています。

データ インジェスト

データ ソースをシミュレートするために、この参照アーキテクチャでは、ニューヨーク市のタクシー データ データセット1を使用します。 このデータセットには、2010 年から 2013 年までのニューヨーク市でのタクシー乗車に関するデータが含まれています。 乗車データと料金データ レコードの両方が含まれています。 乗車データには、乗車時間、乗車距離、乗車場所と降車場所が含まれます。 料金データには、料金、税、チップの金額が含まれます。 両方のレコードの種類のフィールドには、medallion 番号、ハック ライセンス、ベンダー ID が含まれます。 これら 3 つのフィールドの組み合わせは、タクシーとドライバーを一意に識別します。 データは CSV 形式で保存されます。

[1] Donovan, Brian; Work, Dan (2016):New York City Taxi Trip Data (2010-2013). イリノイ大学アーバナシャンペーン校。 https://doi.org/10.13012/J8PN93H8

データ ジェネレーターは、レコードを読み取って Event Hubs に送信する .NET Core アプリケーションです。 ジェネレーターは、JSON 形式の乗車データと CSV 形式の料金データを送信します。

Event Hubs では、 パーティション を使用してデータをセグメント化します。 複数のパーティションでは、コンシューマーは各パーティションを並列で読み取ることができます。 Event Hubs にデータを送信するときに、パーティション キーを直接指定できます。 それ以外の場合は、ラウンド ロビン方式でパーティションにレコードが割り当てられます。

このシナリオでは、乗車データと料金データに、特定のタクシーに対して同じパーティション ID を割り当てる必要があります。 この割り当てにより、Databricks は 2 つのストリームを関連付けるときに並列処理の程度を適用できます。 たとえば、乗車データの n パーティション内のレコードは、料金データのパーティション n のレコードと一致します。

Azure Databricks と Event Hubs によるストリーム処理のダイアグラム。

このアーキテクチャの Visio ファイルをダウンロード します。

データ ジェネレーターでは、両方のレコードの種類に対応した共通データ モデルに、 PartitionKeyMedallionHackLicenseを連結した VendorId プロパティがあります。

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

このプロパティは、Event Hubs にデータを送信するときに明示的なパーティション キーを提供するために使用されます。

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Event Hubs のスループット容量は、 スループット ユニット で測定されます。 イベント ハブを自動スケールするには、自動インフレを有効にします。 この機能は、構成された最大値まで、トラフィックに基づいてスループット ユニットを自動的にスケーリングします。

ストリーム処理

Azure Databricks では、ジョブがデータ処理を実行します。 ジョブはクラスターに割り当てられ、そのジョブで実行されます。 ジョブには、Java で記述されたカスタム コードや、Spark ノートブックを使用できます。

この参照アーキテクチャでは、ジョブは Java と Scala で記述されたクラスを含む Java アーカイブです。 Databricks ジョブの Java アーカイブを指定すると、Databricks クラスターによって操作のクラスが指定されます。 ここで、 main クラスの com.microsoft.pnp.TaxiCabReader メソッドにはデータ処理ロジックが含まれています。

2 つのイベント ハブ インスタンスからストリームを読み取ります

データ処理ロジックでは、2 つの Azure イベント ハブ インスタンスからの読み取りに、 Spark 構造化ストリーミング が使用されます。

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

近隣情報を使用してデータを強化する

乗車データには、乗車場所と降車場所の緯度と経度の座標が含まれます。 これらの座標は便利ですが、分析には簡単には使用できません。 したがって、このデータは、シェープファイルから読み取られた近傍データでエンリッチされます。

シェープファイル形式はバイナリであり、簡単には解析されません。 ただし、GeoTools ライブラリには、シェープファイル形式を使用する地理空間データ用のツールが用意されています。 このライブラリは、com.microsoft.pnp.GeoFinder クラスで、乗車場所と降車場所の座標に基づいて近隣名を決定するために使用されます。

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

乗車データと料金データを結合する

最初に、乗車データと料金データが変換されます。

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

次に、乗車データが料金データと結合されます。

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

データを処理して Azure Cosmos DB に挿入する

各近隣の平均料金は、特定の時間間隔で計算されます。

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

その後、平均料金が Azure Cosmos DB に挿入されます。

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

考慮事項

これらの考慮事項は、ワークロードの品質向上に使用できる一連の基本原則である Azure Well-Architected Framework の要素を組み込んでいます。 詳細については、「Microsoft Azure Well-Architected Framework」を参照してください。

セキュリティ

セキュリティは、意図的な攻撃や貴重なデータとシステムの誤用に対する保証を提供します。 詳細については、「セキュリティ設計レビューチェックリスト」を参照してください。

Azure Databricks ワークスペースへのアクセスは、管理者コンソールを使用して制御されます。 管理者コンソールには、ユーザーの追加、ユーザーのアクセス許可の管理、シングル サインオンの設定を行う機能が含まれています。 ワークスペース、クラスター、ジョブ、およびテーブルのアクセス制御も、管理者コンソールで設定できます。

シークレットを管理する

Azure Databricks には、資格情報を格納し、ノートブックやジョブで参照するために使用される シークレット ストア が含まれています。 Azure Databricks シークレット ストア内のパーティション シークレットをスコープします。

databricks secrets create-scope --scope "azure-databricks-job"

シークレットは、スコープ レベルで追加されます。

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Note

ネイティブの Azure Databricks スコープの代わりに、Azure Key Vault ベースのスコープ を使用します。

コードでは、Azure Databricks シークレット ユーティリティを介してシークレットにアクセスします。

コストの最適化

コストの最適化では、不要な経費を削減し、運用効率を向上させる方法に重点を置いています。 詳細については、「コストの最適化設計レビューチェックリスト」を参照してください。

コストの見積もりには、Azure 料金計算ツール をご利用ください。 この参照アーキテクチャで使用される次のサービスについて考えてみましょう。

Event Hubs のコストに関する考慮事項

この参照アーキテクチャでは、Standard レベルで Event Hubs をデプロイします。 価格モデルは、スループット ユニット、イングレス イベント、およびキャプチャ イベントに基づいています。 イングレス イベントは、64 KB 以下のデータの単位です。 よりサイズが大きいメッセージには、64 KB の倍数で課金されます。 スループット ユニットは、Azure portal または Event Hubs の管理 API のどちらかを使用して、指定します。

保持日数を増やす必要がある場合は、専用レベルを検討してください。 このレベルでは、厳しい要件を持つシングルテナントデプロイが提供されます。 このオファリングは、容量ユニットに基づくクラスターを構築し、スループット ユニットに依存しません。 Standard レベルは、イングレス イベントとスループット ユニットに基づいて課金されます。

詳細については、「Event Hubs の価格」を参照してください。

Azure Databricks のコストに関する考慮事項

Azure Databricks には Standard レベルと Premium レベルが用意されており、どちらも 3 つのワークロードをサポートしています。 この参照アーキテクチャでは、Premium レベルに Azure Databricks ワークスペースがデプロイされます。

データ エンジニアリング ワークロードは、ジョブ クラスターで実行する必要があります。 データ エンジニアは、クラスターを使用してジョブを構築および実行します。 データ分析ワークロードは、万能クラスターで実行する必要があり、データ サイエンティストが対話形式でデータと分析情報を探索、視覚化、操作、共有することを目的としています。

Azure Databricks には、複数の価格モデルが用意されています。

  • 従量課金制プランの する

    選択した VM インスタンスに基づいて、クラスターと Azure Databricks ユニット (DBU) でプロビジョニングされた仮想マシン (VM) に対して課金されます。 DBU は、1 秒あたりの使用量によって課金される処理機能の単位です。 DBU の使用量は、Azure Databricks で実行されるインスタンスのサイズと種類によって異なります。 価格は、選択したワークロードと階層によって異なります。

  • 購入前プランの

    従量課金制モデルと比較して、その期間の総保有コストを削減するために、1 年または 3 年間、Azure Databricks コミット ユニットとして DBU にコミットします。

詳細については、Azure Databricks の価格 に関するページを参照してください。

Azure Cosmos DB のコストに関する考慮事項

このアーキテクチャでは、Azure Databricks ジョブは一連のレコードを Azure Cosmos DB に書き込みます。 予約した容量に対して課金されます。これは 1 秒あたりの要求ユニット数 (RU/秒) で測定されます。 この容量は、挿入操作を実行するために使用されます。 課金の単位は 1 時間あたり 100 RU/秒です。 たとえば、100 KB の項目を書き込むコストは 50 RU/秒です。

書き込み操作の場合、1 秒あたりに必要な書き込みの数をサポートするために十分な容量をプロビジョニングします。 書き込み操作を実行する前にポータルまたは Azure CLI を使用し、それらの操作が完了した後でスループットを減らすことで、プロビジョニングされたスループットを増やすことができます。 書き込み期間のスループットは、特定のデータに必要な最小スループットと挿入操作に必要なスループットの合計です。 この計算では、他のワークロードが実行されていないことを前提としています。

コスト分析の例

コンテナーでスループット値 1,000 RU/秒を構成するとします。 30 日間、合計で 720 時間、24 時間デプロイされます。

コンテナーは、1 時間あたり 100 RU/秒の 10 単位で課金されます。 $0.008 (1 時間あたり 100 RU/秒あたり) の 10 ユニットは、1 時間あたり $0.08 で課金されます。

720 時間または 7,200 ユニット (100 RU) の場合、その月に対して 57.60 ドルが課金されます。

ストレージは、格納されているデータとインデックスに使用される GB ごとにも課金されます。 詳細については、 Azure Cosmos DB の価格モデルに関するページを参照してください。

ワークロード コストの見積もりを簡単に行うには、Azure Cosmos DB 容量計算ツール を使用します。

オペレーショナル エクセレンス

オペレーショナル エクセレンスは、アプリケーションをデプロイし、運用環境で実行し続ける運用プロセスを対象としています。 詳細については、「オペレーショナル エクセレンス設計レビュー チェックリスト」を参照してください。

監視

Azure Databricks は Apache Spark に基づいています。 Azure Databricks と Apache Spark はどちらも、Apache Log4j をログ記録用の標準ライブラリとして使用します。 Apache Spark で提供される既定のログ記録に加えて、Log Analytics にログ記録を実装できます。 詳細については、「Azure Databricks の監視」を参照してください。

com.microsoft.pnp.TaxiCabReader クラスが乗車メッセージと料金メッセージを処理するため、メッセージの形式が正しくないため、無効になる可能性があります。 運用環境では、これらの形式が正しくないメッセージを分析してデータ ソースの問題を特定し、データ損失を防ぐために迅速に修正できるようにすることが重要です。 com.microsoft.pnp.TaxiCabReader クラスは、不正な料金レコードと乗車レコードの数を追跡する Apache Spark アキュムレータを登録します。

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark は Dropwizard ライブラリを使用してメトリックを送信します。 一部のネイティブの Dropwizard メトリック フィールドは Log Analytics と互換性がありません。このため、この参照アーキテクチャにはカスタムの Dropwizard シンクとレポーターが含まれています。 Log Analytics が想定する形式でメトリックを書式設定します。 Apache Spark によってメトリックがレポートされると、形式に誤りがある乗車データと料金データに対するカスタム メトリックも送信されます。

Log Analytics ワークスペースで次のクエリ例を使用して、ストリーミング ジョブの操作を監視できます。 各クエリの引数 ago(1d) は、最後の日に生成されたすべてのレコードを返します。 このパラメーターを調整して、別の期間を表示できます。

ストリーム クエリ操作中にログに記録される例外

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

形式に誤りがある料金データと乗車データの蓄積

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

時間の経過に伴うジョブ操作

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

リソースの編成とデプロイ

  • 運用、開発、およびテスト環境それぞれに対して個別のリソース グループを作成してください。 個別のリソース グループにより、デプロイの管理、テスト デプロイの削除、およびアクセス権の割り当てが行いやすくなります。

  • Azure Resource Manager テンプレート を使用して、コードとしてのインフラストラクチャ プロセスに従って Azure リソースをデプロイします。 テンプレートを使用すると、Azure DevOps サービス またはその他の継続的インテグレーションおよび継続的デリバリー (CI/CD) ソリューションを使用してデプロイを簡単に自動化できます。

  • 各ワークロードを別々のデプロイ テンプレートに配置し、リソースをソース管理システムに格納します。 テンプレートは一緒にデプロイすることも、CI/CD プロセスの一環として個別にデプロイすることもできます。 この方法により、自動化プロセスが簡略化されます。

    このアーキテクチャでは、Event Hubs、Log Analytics、および Azure Cosmos DB が 1 つのワークロードとして識別されます。 これらのリソースは、1 つの Azure Resource Manager テンプレートに含まれています。

  • ワークロードをステージングすることを検討してください。 次のステージに進む前に、さまざまなステージにデプロイし、各ステージで検証チェックを実行します。 そうすることで、運用環境に更新プログラムをプッシュする方法を制御し、予期しないデプロイの問題を最小限に抑えることができます。

    このアーキテクチャには、複数のデプロイ ステージがあります。 Azure DevOps パイプラインを作成し、それらのステージを追加することを検討してください。 次のステージを自動化できます。

    • Databricks クラスターを起動します。
    • Databricks CLI を構成します。
    • Scala ツールをインストールします。
    • Databricks シークレットを追加します。

    Databricks コードとそのライフサイクルの品質と信頼性を向上させるために、自動化された統合テストを作成することを検討してください。

このシナリオのデプロイ

参照実装をデプロイして実行するには、GitHub readmeの手順に従います。

次のステップ