你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 Azure Databricks 进行流处理

Azure Cosmos DB
Azure Databricks
Azure 事件中心
Azure Log Analytics
Azure Monitor

本参考体系结构演示一个端到端流处理管道。 此管道的四个阶段是引入、处理、存储和分析和报告。 本参考体系结构中的管道从两个源引入数据,针对每个流中的相关记录执行联接,丰富结果,然后实时计算平均值。 然后存储结果以供进一步分析。

GitHub logo GitHub 中提供了本体系结构的参考实现。

体系结构

关系图,其中显示了使用 Azure Databricks 进行流处理的参考体系结构。

下载该架构的 Visio 文件

工作流

以下数据流对应于上图:

  1. 在此体系结构中,有两个数据源实时生成数据流。 第一个流包含行程信息,第二个流包含票价信息。 参考体系结构包括一个模拟数据生成器,该生成器从一组静态文件读取数据并将数据推送到 Azure 事件中心。 实际应用程序中的数据源是在出租车上安装的设备。

  2. 事件中心 是事件引入服务。 此体系结构使用两个事件中心实例,每个数据源各对应一个。 每个数据源将数据流发送到关联的事件中心。

  3. Azure Databricks 是一个基于 Apache Spark 的分析平台,已针对 Microsoft Azure 云服务平台进行优化。 Azure Databricks 用于关联出租车行程和票价数据,并将相关数据与存储在 Azure Databricks 文件系统中的邻里数据进行扩充。

  4. Azure Cosmos DB 是完全托管的多模型数据库服务。 Azure Databricks 作业的输出是写入到 Azure Cosmos DB for Apache Cassandra 的一系列记录。 使用 Azure Cosmos DB for Apache Cassandra 是因为它支持时序数据建模。

    • Azure Synapse Link for Azure Cosmos DB 使你能够对 Azure Cosmos DB 中的作数据运行近实时分析,而不会影响事务工作负荷的性能或成本。 可以使用 无服务器 SQL 池Spark 池来实现这些结果。 这些分析引擎可从 Azure Synapse Analytics 工作区获取。

    • Microsoft Fabric 中的用于 NoSQL 的 Azure Cosmos DB 镜像,使你能够将 Azure Cosmos DB 数据与 Microsoft Fabric 中的其余数据集成。

  5. Log Analytics 是 Azure Monitor 中的一种工具,可用于查询和分析来自各种源的日志数据。 Azure Monitor 收集的应用程序日志数据存储在 Log Analytics 工作区中。 可以使用 Log Analytics 查询分析和可视化指标,并检查日志消息以识别应用程序中的问题。

方案详细信息

出租车公司收集有关每个出租车行程的数据。 对于此方案,我们假定两个单独的设备发送数据。 出租车有一个计量器,用于发送有关每辆车的信息,包括持续时间、距离和上车和下车地点。 有一个单独的设备接受客户的付款,并发送有关费用的数据。 为了发现骑手趋势,出租车公司希望实时计算每个街区每英里行驶的平均小费。

数据引入

为了模拟数据源,此参考体系结构使用 纽约市出租车数据数据集1。 此数据集包含 2010 年到 2013 年纽约市出租车行程的数据。 它包含行程和费用数据记录。 行程数据包括行程持续时间、行程距离以及上车和下车位置。 费用数据包括乘车费、税费和小费金额。 这两种记录类型的字段包括奖牌编号、黑客许可证和供应商 ID。 这三个字段的组合唯一标识出租车和司机。 数据以 CSV 格式存储。

[1] Donovan, Brian;Work, Dan (2016):纽约市出租车行程数据 (2010-2013)。 伊利诺伊大学厄巴纳-香槟分校。 https://doi.org/10.13012/J8PN93H8

数据生成器是一个 .NET Core 应用程序,用于读取记录并将其发送到事件中心。 该生成器发送 JSON 格式的行程数据以及 CSV 格式的费用数据。

事件中心使用分区将数据分段。 使用者可以通过分区功能并行读取每个分区。 将数据发送到事件中心时,可以直接指定分区键。 否则,记录将以循环方式分配到分区。

在此方案中,应为特定出租车分配相同的分区 ID 来分配行程数据和费用数据。 此分配使 Databricks 能够在将两个流关联时应用一定程度的并行度。 例如,行程数据的分区 n 中的记录与费用数据的分区 n 中的记录匹配。

使用 Azure Databricks 和事件中心进行的流处理的图。

下载此体系结构的 Visio 文件

在数据生成器中,这两种记录类型的通用数据模型具有 PartitionKey 属性,该属性是 MedallionHackLicenseVendorId 的串联形式。

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

此属性用于在将数据发送到事件中心时提供显式分区键。

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

事件中心

事件中心的吞吐量容量以吞吐量单位来度量。 可以通过启用 自动膨胀来自动缩放事件中心。 此功能根据流量自动缩放吞吐量单位,最大吞吐量单位数上限。

流处理

在 Azure Databricks 中,作业执行数据处理。 作业将分配到群集,然后在群集上运行。 该作业可以是用 Java 编写的自定义代码,也可以是 Spark 笔记本

在此参考体系结构中,作业是一个 Java 存档,其中包含用 Java 和 Scala 编写的类。 为 Databricks 作业指定 Java 存档时,Databricks 群集指定用于作的类。 此处,main 类的 com.microsoft.pnp.TaxiCabReader 方法包含数据处理逻辑。

从两个事件中心实例读取流

数据处理逻辑使用 Spark 结构化流从两个 Azure 事件中心实例读取数据:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

使用邻里信息丰富数据

行程数据包括取件和下车位置的纬度和经度坐标。 这些坐标非常有用,但不容易用于分析。 因此,此数据使用从 形状文件读取的邻里数据进行扩充。

shapefile 格式是二进制格式,并且不容易分析。 但是,GeoTools 库为使用形状文件格式的地理空间数据提供了工具。 此库用于 com.microsoft.pnp.GeoFinder 类,根据取件和下车位置的坐标确定邻里名称。

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

联接行程和费用数据

首先转换行程数据和费用数据:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

然后,行程数据与票价数据联接在一起:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

处理数据并将其插入 Azure Cosmos DB

每个邻里的平均票价金额是针对特定时间间隔计算的:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

然后,将平均费用金额插入 Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

注意事项

这些注意事项实施 Azure 架构良好的框架的支柱原则,即一套可用于改进工作负荷质量的指导原则。 有关详细信息,请参阅 Microsoft Azure 架构良好的框架

安全性

安全性提供针对故意攻击和滥用宝贵数据和系统的保证。 有关详细信息,请参阅 安全的设计评审清单。

使用 管理员控制台控制对 Azure Databricks 工作区的访问。 管理员控制台包括添加用户、管理用户权限和设置单一登录的功能。 还可以通过管理员控制台设置工作区、群集、作业和表的访问控制。

管理机密

Azure Databricks 包括一个 机密存储,用于存储凭据并将其引用在笔记本和作业中。 作用域 Azure Databricks 机密存储中的分区机密:

databricks secrets create-scope --scope "azure-databricks-job"

机密是在范围级别添加的:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

注意

使用 Azure Key Vault 支持的作用域 而不是本机 Azure Databricks 范围。

在代码中,可以通过 Azure Databricks 机密实用工具访问机密。

成本优化

成本优化侧重于减少不必要的开支和提高运营效率的方法。 有关详细信息,请参阅 成本优化的设计评审清单。

使用 Azure 定价计算器估算成本。 请考虑此参考体系结构中使用的以下服务。

事件中心成本注意事项

此参考体系结构在标准层中部署事件中心。 定价模型基于吞吐量单位、入口事件和捕获事件。 入口事件是 64 KB 或更少数据的单位。 较大的消息按 64 KB 的倍数计费。 可以通过 Azure 门户或事件中心管理 API 指定吞吐量单位。

如果需要更多保留日,请考虑专用层。 此层提供具有严格要求的单租户部署。 此产品/服务构建的群集基于容量单位,不依赖于吞吐量单位。 标准层也根据入口事件和吞吐量单位计费。

有关详细信息,请参阅 事件中心定价

Azure Databricks 成本注意事项

Azure Databricks 提供标准层和高级层,这两个层都支持三个工作负荷。 此参考体系结构在高级层中部署 Azure Databricks 工作区。

数据工程工作负荷应在作业群集上运行。 数据工程师使用群集来生成和执行作业。 数据分析工作负载应在通用群集上运行,旨在让数据科学家以交互方式浏览、可视化、作和共享数据和见解。

Azure Databricks 提供了多个定价模型。

  • 即用即付计划

    根据所选 VM 实例,针对群集和 Azure Databricks 单元(DBU)中预配的虚拟机(VM)计费。 DBU 是按每秒使用量计费的一项处理功能。 DBU 消耗取决于 Azure Databricks 中运行的实例的大小和类型。 定价取决于所选的工作负荷和层。

  • 预购计划

    将 DBU 作为 Azure Databricks 提交单位提交一年或三年,以降低与即用即付模型相比,在此期间的总拥有成本。

有关详细信息,请参阅 Azure Databricks 定价

Azure Cosmos DB 成本注意事项

在此体系结构中,Azure Databricks 作业将一系列记录写入 Azure Cosmos DB。 你对预留的容量收费,该容量以每秒请求单位(RU/秒)为单位。 此容量用于执行插入作。 计费单位为每小时 100 RU/秒。 例如,写入 100 KB 项的成本为 50 RU/秒。

对于写入操作,预配足够的容量以支持每秒所需的写入次数。 在执行写入作之前,可以使用门户或 Azure CLI 来增加预配的吞吐量,然后在完成这些作后减少吞吐量。 写入周期的吞吐量是特定数据所需的最小吞吐量和插入作所需的吞吐量之和。 此计算假定没有其他正在运行的工作负荷。

成本分析示例

假设在容器上配置吞吐量值 1,000 RU/秒。 它部署了 24 小时 30 天,总共 720 小时。

该容器按每小时 10 个单位 100 RU/秒计费,每小时计费。 每小时 0.008 美元(每 100 RU/秒)的 10 个单位按每小时 0.08 美元收费。

对于 720 小时或 7,200 个单位(共 100 个 RU),你本月的账单为 57.60 美元。

存储也针对用于存储数据和索引的每个 GB 计费。 有关详细信息,请参阅 Azure Cosmos DB 定价模型

使用 Azure Cosmos DB 容量计算器 快速估算工作负荷成本。

卓越运营

卓越运营涵盖部署应用程序并使其在生产环境中运行的运营流程。 有关详细信息,请参阅 卓越运营的设计评审清单。

监视

Azure Databricks 基于 Apache Spark。 Azure Databricks 和 Apache Spark 都使用 Apache Log4j 作为用于日志记录的标准库。 除了 Apache Spark 提供的默认日志记录之外,还可以在 Log Analytics 中实现日志记录。 有关详细信息,请参阅监视 Azure Databricks

由于 com.microsoft.pnp.TaxiCabReader 类处理行程和票价消息,因此消息的格式可能不正确,因此无效。 在生产环境中,必须分析这些格式不正确的消息,以确定数据源的问题,以便可以快速修复,以防止数据丢失。 com.microsoft.pnp.TaxiCabReader 类注册了一个 Apache Spark 累积器,用于跟踪格式不正确的票价记录和行程记录的数量:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark 使用 Dropwizard 库发送指标。 某些本机 Dropwizard 指标字段与 Log Analytics 不兼容,这就是为什么此参考体系结构包含自定义 Dropwizard 接收器和记者的原因。 它采用 Log Analytics 预期格式设置指标的格式。 当 Apache Spark 报告指标时,也会发送格式不当的行程数据和费用数据的自定义指标。

可以在 Log Analytics 工作区中使用以下示例查询来监视流式处理作业的作。 每个查询中的参数 ago(1d) 返回最后一天生成的所有记录。 可以调整此参数以查看不同的时间段。

流查询作期间记录的异常

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

格式不当的费用和行程数据的累积数目

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

一段时间内的作业作

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

资源组织和部署

  • 为生产、开发和测试环境创建单独的资源组。 使用单独的资源组可以更方便地管理部署、删除测试部署,以及分配访问权限。

  • 使用 Azure 资源管理器模板 根据基础结构即代码过程部署 Azure 资源。 通过使用模板,可以轻松地使用 Azure DevOps services 或其他持续集成和持续交付(CI/CD)解决方案自动执行部署。

  • 将每个工作负载放在单独的部署模板中,并将资源存储在源代码管理系统中。 可以在 CI/CD 过程中统一或者逐个部署这些模板。 此方法简化了自动化过程。

    在此体系结构中,事件中心、Log Analytics 和 Azure Cosmos DB 标识为单个工作负荷。 这些资源包含在单个 Azure 资源管理器模板中。

  • 请考虑暂存工作负载。 部署到各个阶段并在每个阶段运行验证检查,然后再移动到下一阶段。 这样,便可以控制如何将更新推送到生产环境,并最大程度地减少意外的部署问题。

    在此体系结构中,有多个部署阶段。 请考虑创建 Azure DevOps 管道并添加这些阶段。 可以自动执行以下阶段:

    • 启动 Databricks 群集。
    • 配置 Databricks CLI。
    • 安装 Scala 工具。
    • 添加 Databricks 机密。

    请考虑编写自动化集成测试以提高 Databricks 代码及其生命周期的质量和可靠性。

部署此方案

若要部署和运行引用实现,请按照 GitHub 自述文件中的步骤作。

下一步