Rediger

Del via


Stream processing with Azure Databricks

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

This reference architecture shows an end-to-end stream processing pipeline. The four stages of this pipeline are ingest, process, store, and analyze and report. For this reference architecture, the pipeline ingests data from two sources, performs a join on related records from each stream, enriches the result, and calculates an average in real time. The results are then stored for further analysis.

GitHub logo A reference implementation for this architecture is available on GitHub.

Architecture

Diagram that shows a reference architecture for stream processing with Azure Databricks.

Download a Visio file of this architecture.

Workflow

The following dataflow corresponds to the previous diagram:

  1. In this architecture, there are two data sources that generate data streams in real time. The first stream contains ride information, and the second stream contains fare information. The reference architecture includes a simulated data generator that reads from a set of static files and pushes the data to Azure Event Hubs. The data sources in a real application are devices installed in the taxi cabs.

  2. Event Hubs is an event ingestion service. This architecture uses two event hub instances, one for each data source. Each data source sends a stream of data to the associated event hub.

  3. Azure Databricks is an Apache Spark-based analytics platform that's optimized for the Microsoft Azure cloud services platform. Azure Databricks is used to correlate the taxi ride and fare data and to enrich the correlated data with neighborhood data that's stored in the Azure Databricks file system.

  4. Azure Cosmos DB is a fully managed, multiple-model database service. The output of an Azure Databricks job is a series of records, which are written to Azure Cosmos DB for Apache Cassandra. Azure Cosmos DB for Apache Cassandra is used because it supports time series data modeling.

  5. Log Analytics is a tool within Azure Monitor that allows you to query and analyze log data from various sources. Application log data that Azure Monitor collects is stored in a Log Analytics workspace. You can use Log Analytics queries to analyze and visualize metrics and inspect log messages to identify problems within the application.

Scenario details

A taxi company collects data about each taxi trip. For this scenario, we assume that two separate devices send data. The taxi has a meter that sends information about each ride, including the duration, distance, and pickup and drop-off locations. A separate device accepts payments from customers and sends data about fares. To spot ridership trends, the taxi company wants to calculate the average tip per mile driven for each neighborhood, in real time.

Data ingestion

To simulate a data source, this reference architecture uses the New York City taxi data dataset1. This dataset contains data about taxi trips in New York City from 2010 to 2013. It contains both ride and fare data records. Ride data includes trip duration, trip distance, and the pickup and drop-off locations. Fare data includes fare, tax, and tip amounts. Fields in both record types include medallion number, hack license, and vendor ID. The combination of these three fields uniquely identifies a taxi and a driver. The data is stored in CSV format.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

The data generator is a .NET Core application that reads the records and sends them to Event Hubs. The generator sends ride data in JSON format and fare data in CSV format.

Event Hubs uses partitions to segment the data. Partitions allow a consumer to read each partition in parallel. When you send data to Event Hubs, you can specify the partition key directly. Otherwise, records are assigned to partitions in round-robin fashion.

In this scenario, ride data and fare data should be assigned the same partition ID for a specific taxi cab. This assignment enables Databricks to apply a degree of parallelism when it correlates the two streams. For example, a record in partition n of the ride data matches a record in partition n of the fare data.

Diagram of stream processing with Azure Databricks and Event Hubs.

Download a Visio file of this architecture.

In the data generator, the common data model for both record types has a PartitionKey property that is the concatenation of Medallion, HackLicense, and VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

This property is used to provide an explicit partition key when it sends data to Event Hubs.

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

The throughput capacity of Event Hubs is measured in throughput units. You can autoscale an event hub by enabling auto-inflate. This feature automatically scales the throughput units based on traffic, up to a configured maximum.

Stream processing

In Azure Databricks, a job performs data processing. The job is assigned to a cluster and then runs on it. The job can be custom code written in Java or a Spark notebook.

In this reference architecture, the job is a Java archive that has classes written in Java and Scala. When you specify the Java archive for a Databricks job, the Databricks cluster specifies the class for operation. Here, the main method of the com.microsoft.pnp.TaxiCabReader class contains the data processing logic.

Read the stream from the two event hub instances

The data processing logic uses Spark structured streaming to read from the two Azure event hub instances:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Enrich the data with the neighborhood information

The ride data includes the latitude and longitude coordinates of the pickup and drop-off locations. These coordinates are useful but not easily consumed for analysis. Therefore, this data is enriched with neighborhood data that's read from a shapefile.

The shapefile format is binary and not easily parsed. But the GeoTools library provides tools for geospatial data that use the shapefile format. This library is used in the com.microsoft.pnp.GeoFinder class to determine the neighborhood name based on the coordinates for pickup and drop-off locations.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Join the ride and fare data

First the ride and fare data is transformed:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Then the ride data is joined with the fare data:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Process the data and insert it into Azure Cosmos DB

The average fare amount for each neighborhood is calculated for a specific time interval:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

The average fare amount is then inserted into Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Considerations

These considerations implement the pillars of the Azure Well-Architected Framework, which is a set of guiding tenets that you can use to improve the quality of a workload. For more information, see Microsoft Azure Well-Architected Framework.

Security

Security provides assurances against deliberate attacks and the misuse of your valuable data and systems. For more information, see Design review checklist for Security.

Access to the Azure Databricks workspace is controlled by using the administrator console. The administrator console includes functionality to add users, manage user permissions, and set up single sign-on. Access control for workspaces, clusters, jobs, and tables can also be set through the administrator console.

Manage secrets

Azure Databricks includes a secret store that's used to store credentials and reference them in notebooks and jobs. Scopes partition secrets within the Azure Databricks secret store:

databricks secrets create-scope --scope "azure-databricks-job"

Secrets are added at the scope level:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Note

Use an Azure Key Vault-backed scope instead of the native Azure Databricks scope.

In code, secrets are accessed via the Azure Databricks secrets utilities.

Cost Optimization

Cost Optimization focuses on ways to reduce unnecessary expenses and improve operational efficiencies. For more information, see Design review checklist for Cost Optimization.

Use the Azure pricing calculator to estimate costs. Consider the following services used in this reference architecture.

Event Hubs cost considerations

This reference architecture deploys Event Hubs in the Standard tier. The pricing model is based on throughput units, ingress events, and capture events. An ingress event is a unit of data that's 64 KB or less. Larger messages are billed in multiples of 64 KB. You specify throughput units either through the Azure portal or Event Hubs management APIs.

If you need more retention days, consider the Dedicated tier. This tier provides single-tenant deployments that have stringent requirements. This offering builds a cluster that's based on capacity units and isn't dependent on throughput units. The Standard tier is also billed based on ingress events and throughput units.

For more information, see Event Hubs pricing.

Azure Databricks cost considerations

Azure Databricks provides the Standard tier and the Premium tier, both of which support three workloads. This reference architecture deploys an Azure Databricks workspace in the Premium tier.

Data engineering workloads should run on a job cluster. Data engineers use clusters to build and perform jobs. Data analytics workloads should run on an all-purpose cluster and are intended for data scientists to explore, visualize, manipulate, and share data and insights interactively.

Azure Databricks provides multiple pricing models.

  • Pay-as-you-go plan

    You're billed for virtual machines (VMs) provisioned in clusters and Azure Databricks units (DBUs) based on the chosen VM instance. A DBU is a unit of processing capability that's billed by usage per second. The DBU consumption depends on the size and type of instance that runs in Azure Databricks. Pricing depends on the chosen workload and tier.

  • Pre-purchase plan

    You commit to DBUs as Azure Databricks commit units for either one or three years to reduce the total cost of ownership over that time period when compared to the pay-as-you-go model.

For more information, see Azure Databricks pricing.

Azure Cosmos DB cost considerations

In this architecture, the Azure Databricks job writes a series of records to Azure Cosmos DB. You're charged for the capacity that you reserve, which is measured in Request Units per second (RU/s). This capacity is used to perform insert operations. The unit for billing is 100 RU/s per hour. For example, the cost of writing 100-KB items is 50 RU/s.

For write operations, provision enough capacity to support the number of writes needed per second. You can increase the provisioned throughput by using the portal or Azure CLI before you perform write operations and then reducing the throughput after those operations are complete. Your throughput for the write period is the sum of the minimum throughput needed for the specific data and the throughput required for the insert operation. This calculation assumes that there's no other workload running.

Example cost analysis

Suppose you configure a throughput value of 1,000 RU/s on a container. It's deployed for 24 hours for 30 days, for a total of 720 hours.

The container is billed at 10 units of 100 RU/s per hour for each hour. Ten units at $0.008 (per 100 RU/s per hour) are charged at $0.08 per hour.

For 720 hours or 7,200 units (of 100 RUs), you're billed $57.60 for the month.

Storage is also billed for each GB that's used for your stored data and index. For more information, see Azure Cosmos DB pricing model.

Use the Azure Cosmos DB capacity calculator for a quick estimate of the workload cost.

Operational Excellence

Operational Excellence covers the operations processes that deploy an application and keep it running in production. For more information, see Design review checklist for Operational Excellence.

Monitoring

Azure Databricks is based on Apache Spark. Both Azure Databricks and Apache Spark use Apache Log4j as the standard library for logging. In addition to the default logging that Apache Spark provides, you can implement logging in Log Analytics. For more information, see Monitoring Azure Databricks.

As the com.microsoft.pnp.TaxiCabReader class processes ride and fare messages, a message might be malformed and therefore not valid. In a production environment, it's important to analyze these malformed messages to identify a problem with the data sources so that it can be fixed quickly to prevent data loss. The com.microsoft.pnp.TaxiCabReader class registers an Apache Spark Accumulator that tracks the number of malformed fare records and ride records:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark uses the Dropwizard library to send metrics. Some of the native Dropwizard metrics fields are incompatible with Log Analytics, which is why this reference architecture includes a custom Dropwizard sink and reporter. It formats the metrics in the format that Log Analytics expects. When Apache Spark reports metrics, the custom metrics for the malformed ride and fare data are also sent.

You can use the following example queries in your Log Analytics workspace to monitor the operation of the streaming job. The argument ago(1d) in each query returns all records that were generated in the last day. You can adjust this parameter to view a different time period.

Exceptions logged during stream query operation

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Accumulation of malformed fare and ride data

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Job operation over time

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Resource organization and deployments

  • Create separate resource groups for production, development, and test environments. Separate resource groups make it easier to manage deployments, delete test deployments, and assign access rights.

  • Use the Azure Resource Manager template to deploy the Azure resources according to the infrastructure-as-code process. By using templates, you can easily automate deployments with Azure DevOps services or other continuous integration and continuous delivery (CI/CD) solutions.

  • Put each workload in a separate deployment template and store the resources in source control systems. You can deploy the templates together or individually as part of a CI/CD process. This approach simplifies the automation process.

    In this architecture, Event Hubs, Log Analytics, and Azure Cosmos DB are identified as a single workload. These resources are included in a single Azure Resource Manager template.

  • Consider staging your workloads. Deploy to various stages and run validation checks at each stage before you move to the next stage. That way you can control how you push updates to your production environments and minimize unanticipated deployment problems.

    In this architecture, there are multiple deployment stages. Consider creating an Azure DevOps pipeline and adding those stages. You can automate the following stages:

    • Start a Databricks cluster.
    • Configure Databricks CLI.
    • Install Scala tools.
    • Add the Databricks secrets.

    Consider writing automated integration tests to improve the quality and reliability of the Databricks code and its lifecycle.

Deploy this scenario

To deploy and run the reference implementation, follow the steps in the GitHub readme.

Next step