編輯

共用方式為


使用 Azure Databricks 進行串流處理

Azure Cosmos DB
Azure Databricks
Azure 事件中樞
Azure Log Analytics
Azure 監視器

此參考架構顯示了端對端流處理管線。 這種類型的管線有四個階段:攝取、處理、儲存以及分析和報告。 對於此參考架構,管線從兩個來源獲取資料,對每個流中的相關記錄執行聯接,豐富結果,並即時計算平均值。 結果被儲存以供進一步分析。

GitHub 標誌GitHub 上提供了此架構的參考實作。

架構

顯示使用 Azure Databricks 進行流程處理的參考體系結構的圖表。

下載此架構的 Visio 檔案。

工作流程

此架構由下列元件組成:

資料來源。 在這個架構中,有兩個即時產生資料流的資料來源。 第一個流包含乘車訊息,第二個流包含票價資訊。 這個參考架構包括一個模擬資料產生器,它從一組靜態檔案中讀取資料並將資料推送到事件中心。 實際應用程式中的資料來源是安裝在計程車中的設備。

Azure 事件中樞事件中樞是一種事件擷取服務。 此體系結構使用兩個事件中心執行個體,每個資料來源一個。 每個資料來源都會向關聯的事件中心發送資料流。

Azure Databricks。 Databricks 是一個基於 Apache Spark 的分析平台,針對 Microsoft Azure 雲端服務平台進行了最佳化。 Databricks 用於關聯計程車行程和票價資料,並利用 Databricks 文件系統中儲存的鄰裡資料來豐富相關資料。

Azure Cosmos DB。 Azure Databricks 作業的輸出是一系列記錄,這些記錄會寫入 Azure Cosmos DB for Apache Cassandra。 使用 Azure Cosmos DB for Apache Cassandra 是因為它支援時間序列資料模型化。

Azure 記錄分析 (OMS)Azure 監視器收集的應用程式記錄資料儲存在記錄分析工作區中。 記錄分析查詢可用於分析和視覺化指標並檢查記錄訊息以識別應用程式中的問題。

替代項目

  • Synapse Link 是 Microsoft 在 Azure Cosmos DB 資料之上進行分析的首選解決方案。

案例詳細資料

場景:一家計程車公司收集有關每次計程車行程的資料。 對於這種情況,我們假設有兩個單獨的裝置發送資料。 計程車有一個計價器,可以發送每次乘車的訊息——持續時間、距離以及上車和下車地點。 一個單獨的設備接受客戶的付款並發送有關票價的資料。 為了了解客流量趨勢,計程車公司希望即時計算每個社區每英里行駛的平均小費。

潛在使用案例

該解決方案針對零售業進行了最佳化。

資料提取

為了模擬資料來源,此參考架構使用紐約市計程車資料資料集 [1]。 該資料集包含有關紐約市四年期間 (2010 年至 2013 年) 計程車出行的資料。 它包含兩種類型的記錄:行程資料和票價資料。 行程資料包括行程持續時間、行程距離以及上車和下車地點。 票價資料包括票價、稅金和小費金額。 兩種記錄類型中的常見欄位包括獎章號碼、駭客許可證和供應商 ID。 這三個字段共同唯一地標識出租車和司機。 資料以 CSV 格式儲存。

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013).[1] 布萊恩‧多諾萬; Work,Dan (2016):紐約市計程車出行資料(2010-2013)。 伊利諾大學厄巴納香檳校區。 https://doi.org/10.13012/J8PN93H8

資料產生器是一個 .NET Core 應用程式,它會讀取記錄並將其傳送到 Azure 事件中心。 生成器會傳送 JSON 格式的乘車資料和 CSV 格式的票價資料。

事件中心使用分區來分段資料。 分割區允許使用者並行讀取每個分割區。 將資料傳送到事件中心時,可以明確指定分區鍵。 否則,記錄將以循環方式分配給分割區。

在這種情況下,對於給定的計程車,乘車資料和票價資料應具有相同的分區 ID。 這使得 Databricks 在關聯兩個流時能夠應用一定程度的並行性。 乘車資料的分區 n 中的記錄將與票價資料的分區 n 中的記錄相符。

使用 Azure Databricks 和事件中心進行流程處理的圖表。

下載此架構的 Visio 檔案。

在資料產生器中,兩種記錄類型的公共資料模型都有一個屬性,即 PartitionKeyMedallion HackLicenseVendorId 的串聯。

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

此屬性用於在傳送到事件中心時提供明確分割區鍵:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

事件中樞

事件中心的輸送量以輸送量單位來衡量。 您可以透過啟用自動擴充來自動縮放事件中心,該功能會根據流量自動縮放輸送量單位,直到達到設定的最大值。

串流處理

在 Azure Databricks 中,資料處理由作業執行。 該作業被分配到叢集並在叢集上執行。 這份作業可以是用 Java 寫的自訂程式碼,也可以是 Spark 筆記本

在此參考架構中,作業是一個 Java 存檔,其中包含用 Java 和 Scala 編寫的類別。 為 Databricks 作業指定 Java 存檔時,該類別被指定為由 Databricks 叢集執行。 這裡,com.microsoft.pnp.TaxiCabReader 類別的 main 方法包含了資料處理邏輯。

從兩個事件中樞執行個體讀取串流

資料處理邏輯使用 Spark 結構化串流從兩個 Azure 事件中心執行個體讀取:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

利用鄰裡資訊豐富資料

乘車資料包括上車和下車地點的緯度和經度座標。 雖然這些座標很有用,但它們不容易用於分析。 因此,該資料透過從 shapefile 讀取的鄰域資料進行了豐富。

shapefile 格式是二進位的,不易解析,但 GeoTools 函式庫提供了用於使用 shapefile 格式的地理空間資料的工具。 該資料庫在 com.microsoft.pnp.GeoFinder 類別上使用,根據接送座標確定社區名稱。

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

加入行程和票價資料

首先,轉換行程和票價資料:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

然後將乘車資料與票價資料連接起來:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

處理資料並將其插入 Azure Cosmos DB

計算在給定時間間隔內每個街區的平均票價:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

然後將其插入到 Azure Cosmos DB 中:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

考量

這些考量能實作 Azure Well-Architected Framework 的支柱,其為一組指導原則,可以用來改善工作負載的品質。 如需更多資訊,請參閱 Microsoft Azure 結構完善的架構

安全性

安全性可提供保證,以避免刻意攻擊和濫用您寶貴的資料和系統。 如需詳細資訊,請參閱安全性支柱的概觀

使用管理員控制台控制對 Azure Databricks 工作區的存取。 管理員控制台包括新增使用者、管理使用者權限和設定單一登入的功能。 也可以透過管理員控制台設定工作區、叢集、作業和表格的存取控制。

管理密碼

Azure Databricks 包含一個機密存放區,用來儲存認證,並在筆記本和工作中參考。 Azure Databricks 秘密儲存中的秘密依範圍進行分割:

databricks secrets create-scope --scope "azure-databricks-job"

秘密是在範圍層級新增的:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

注意

應使用 Azure Key Vault 支援的範圍來取代本機 Azure Databricks 範圍。 若要深入了解,請參閱 Azure Key Vault 支援的範圍

在程式碼中,可透過 Azure Databricks 機密實用程式存取機密。

監視

Azure Databricks 是基於 Apache Spark,並且都使用 log4j 作為記錄的標準函式庫。 除了 Apache Spark 提供的預設記錄之外,您還可以依照監視 Azure Databricks 一文實作 Azure 記錄分析。

com.microsoft.pnp.TaxiCabReader 類別處理乘車和票價訊息時,其中任何一個訊息都可能格式錯誤,因此無效。 在生產環境中,分析這些格式錯誤的訊息以識別資料來源的問題非常重要,這樣就可以快速修復問題以防止資料遺失。 該 com.microsoft.pnp.TaxiCabReader 類別註冊一個 Apache Spark Accumulator,用於追蹤格式錯誤的票價和乘車記錄的數量:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark 使用 Dropwizard 庫傳送指標,且某些本機 Dropwizard 指標欄位與 Azure 記錄分析不相容。 因此,此參考架構包括自訂 Dropwizard 接收器和報告器。 它以 Azure 記錄分析所需的格式格式化指標。 當 Apache Spark 報告指標時,也會發送格式錯誤的行程和票價資料的自訂指標。

以下是可在 Azure 記錄分析工作區中用於監視流程作業執行情況的範例查詢。 每個查詢中的參數 ago(1d) 將傳回最後一天產生的所有記錄,並且可以調整以查看不同的時間段。

串流查詢執行期間記錄的異常

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

錯誤票價和乘車資料的累積

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

隨著時間的推移作業執行

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

有關詳細資訊,請參閱監視 Azure Databricks

DevOps

  • 為生產、開發和測試環境建立單獨的資源組。 單獨的資源群組可以更輕鬆地管理部署、刪除測試部署和指派存取權限。

  • 使用 Azure 資源管理器範本依照基礎架構即程式碼 (IaC) 流程部署 Azure 資源。 透過模板,使用 Azure DevOps 服務或其他 CI/CD 解決方案進行自動化部署變得更加容易。

  • 將每個工作負載放入單獨的部署範本中,並將資源儲存在來源控制系統中。 您可以將範本一起部署或單獨部署作為 CI/CD 流程的一部分,從而使自動化流程更加輕鬆。

    在此體系架構中,Azure 事件中心、記錄分析和 Azure Cosmos DB 被識別為單一工作負載。 這些資源包含在單一 ARM 範本中。

  • 考慮暫存您的工作負載。 部署到各個階段並在每個階段執行驗證檢查,然後再進入下一階段。 這樣您就可以以高度受控的方式將更新推送到生產環境,並最大限度地減少意外的部署問題。

    在此架構中存在多個部署階段。 考慮建立 Azure DevOps 管線並新增這些階段。 以下是您可以自動化的階段的一些範例:

    • 啟動 Databricks 叢集
    • 設定 Databricks CLI
    • 安裝 Scala 工具
    • 新增 Databricks 秘密

    另外,考慮編寫自動化整合測試來提高 Databricks 程式碼及其生命週期的品質和可靠性。

  • 考慮使用 Azure 監視器來分析流處理管線的效能。 有關詳細資訊,請參閱監視 Azure Databricks

有關詳細資訊,請參閱 Microsoft Azure 架構完善的框架中的 DevOps 部分。

成本最佳化

成本最佳化是關於考慮如何減少不必要的費用,並提升營運效率。 如需詳細資訊,請參閱成本最佳化支柱的概觀

使用 Azure 定價計算機來預估成本。 以下是此參考架構中使用的服務的一些注意事項。

事件中樞

此參考架構在標準層中部署事件中心。 定價模型基於輸送量單位、輸入事件和擷取事件。 「輸入事件」是 64 KB 或更小的資料單位。 較大訊息以 64 KB 的倍數計費。 您可以透過 Azure 入口網站或事件中心管理 API 指定輸送量單位。

如果您需要更多保留天數,請考慮專用層。 此層提供具有最嚴苛要求的單一租用戶部署。 此產品基於不受輸送量單位約束的容量單位 (CU) 建構集群。

標準層也根據輸入事件和輸送量單位進行計費。

有關事件中心定價的資訊,請參閱事件中心定價

Azure Databricks

Azure Databricks 提供標準版進階版兩層,每層支援三種工作負載。 此參考體系結構在高階層中部署 Azure Databricks 工作區。

資料工程資料工程輕量工作負載供資料工程師建置和執行作業。 資料分析工作負載旨在幫助資料科學家以互動方式探索、視覺化、操作和共享資料和深入解析。

Azure Databricks 提供多種定價模型。

  • 隨用隨付方案

    您需要根據所選的 VM 執行個體為叢集和 Databricks 單元 (DBU) 中設定的虛擬機器 (VM) 付費。 DBU 是處理能力的單位,以每秒使用量計費。 DBU 耗用量取決於執行 Azure Databricks 的執行個體大小和類型。 定價將取決於所選的工作負載和層級。

  • 購買方案

    您將 Azure Databricks 單位 (DBU) 作為 Databricks 提交單位 (DBCU) 承諾一年或三年。 與即用即付模式相比,您最多可以節省 37%。

有關詳細資訊,請參閱 Azure Databricks 定價

Azure Cosmos DB

在此體系結構中,Azure Databricks 作業將一系列記錄寫入 Azure Cosmos DB。 您需要為預留的用於執行插入操作的容量付費,以每秒要求單位 (RU/s) 表示。 計費單位為每小時 100 RU/秒。 例如,寫入 100 KB 專案的成本為 50 RUn。

對於寫入操作,請設定足夠的容量來支援每秒所需的寫入次數。 可以在執行寫入作業之前使用入口網站或 Azure CLI 增加預配輸送量,然後在這些作業完成後減少輸送量。 寫入期間的運送量是給定資料所需的最小運送量加上插入操作所需的運送量 (假設沒有其他工作負載正在執行)。

成本分析範例

假設您在容器上設定的吞吐量值為 1,000 RU/秒。 部署時間為 30 天 24 小時,共 720 小時。

容器以每小時 10 個單位、每小時 100 RU/秒計費。 10 個單位的收費為 0.008 美元 (每 100 RU/秒每小時),每小時收費 0.08 美元。

對於 720 小時或 7,200 個單位 (100 個 RU),您每月需要支付 57.60 美元。

儲存空間也按用於儲存資料和索引的每 GB 計費。 有關詳細資訊,請參閱 Azure Cosmos DB 定價模型

使用 Azure Cosmos DB 容量計算機來快速估計工作負載成本。

有關詳細資訊,請參閱 Microsoft Azure 架構完善的框架中的 DevOps 成本部分。

部署此案例

若要部署並執行參考實現,請按照 GitHub 自述文件中的步驟操作。

後續步驟