Editar

Compartir vía


Procesamiento de flujos de datos con Azure Databricks

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

Esta arquitectura de referencia muestra una canalización de procesamiento de flujos de datos de un extremo a otro. Este tipo de canalización tiene cuatro fases: ingesta, proceso, almacenamiento y análisis e informes. Para esta arquitectura de referencia, la canalización ingiere datos de dos orígenes, realiza una combinación de los registros relacionados de cada flujo, enriquece el resultado y calcula un promedio en tiempo real. Los resultados se almacenan para su posterior análisis.

GitHub logo Hay disponible una implementación de referencia de esta arquitectura en GitHub.

Architecture

Diagrama que muestra una arquitectura de referencia para el procesamiento de flujos con Azure Databricks.

Descargue un archivo de Visio de esta arquitectura.

Flujo de trabajo

La arquitectura consta de los siguientes componentes:

Orígenes de datos. En esta arquitectura, hay dos orígenes de datos que generan flujos de datos en tiempo real. El primer flujo de datos contiene información sobre la carrera y, el segundo, contiene información sobre las tarifas. La arquitectura de referencia incluye un generador de datos simulados que lee un conjunto de archivos estáticos e inserta los datos en Event Hubs. En una aplicación real, los orígenes de datos serían los dispositivos instalados en el taxi.

Azure Event Hubs. Event Hubs es un servicio de ingesta de eventos. Esta arquitectura emplea dos instancias de centro de eventos, uno para cada origen de datos. Cada origen de datos envía un flujo de datos al centro de eventos asociado.

Azure Databricks. Azure Databricks es una plataforma de análisis basada en Apache Spark optimizada para la plataforma de servicios en la nube de Microsoft Azure. Databricks se utiliza para correlacionar los datos de la carrera de taxi y las tarifas, y también para enriquecer los datos correlacionados con los datos de los barrios almacenados en el sistema de archivos de Databricks.

Azure Cosmos DB. La salida de un trabajo de Azure Databricks es una serie de registros, que se escriben en Azure Cosmos DB for Apache Cassandra. Azure Cosmos DB for Apache Cassandra se usa porque admite el modelado de datos de serie temporal.

Azure Log Analytics. Los datos de registro de aplicaciones que recopila Azure Monitor se almacenan en un área de trabajo de Log Analytics. Las consultas de Log Analytics pueden usarse para analizar y ver las métricas, e inspeccionar los mensajes de registro para identificar problemas dentro de la aplicación.

Detalles del escenario

Escenario: Una empresa de taxi recopila los datos acerca de cada carrera de taxi. En este escenario, se supone que hay dos dispositivos independientes que envían datos. El taxi tienen un medidor que envía la información acerca de cada carrera: duración, distancia y ubicaciones de recogida y destino. Un dispositivo independiente acepta los pagos de clientes y envía los datos sobre las tarifas. Con el fin de identificar las tendencias, la empresa de taxi desea calcular el promedio de propinas por milla conducida, en tiempo real, en cada barrio.

Ingesta de datos

Para simular un origen de datos, esta arquitectura de referencia usa el conjunto de datos New York City Taxi Data[1]. Este conjunto de datos contiene datos acerca de carreras de taxi en la ciudad de Nueva York durante un período de cuatro años (de 2010 a 2013). Contiene dos tipos de registros: datos de carreras y datos de tarifas. Los datos de carreras incluyen la duración del viaje, la distancia de viaje y la ubicación de recogida y destino. Los datos de tarifas incluyen las tarifas, los impuestos y las propinas. Los campos comunes en ambos tipos de registro son la placa y el número de licencia, y el identificador del proveedor. Juntos, estos tres campos identifican un taxi además del conductor. Los datos se almacenan en formato CSV.

[1] Donovan, Brian; Trabajo, Dan (2016): New York City Taxi Trip Data (2010-2013). Universidad de Illinois en Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

El generador de datos es una aplicación de .NET Core que lee los registros y los envía a Azure Event Hubs. El generador envía los datos de carreras en formato JSON y los datos de tarifas en formato CSV.

Event Hubs usa particiones para segmentar los datos. Las particiones permiten a los consumidores leer cada partición en paralelo. Cuando se envían datos a Event Hubs, puede especificar explícitamente la clave de partición. En caso contrario, los registros se asignan a las particiones en modo round-robin.

En este escenario, los datos de carreras y los datos de tarifas deben terminar con el mismo identificador de partición para un taxi determinado. Esto permite a Databricks aplicar un cierto paralelismo cuando se establece una correlación entre los dos flujos. Un registro en la partición n de los datos de carreras coincidirá con un registro en la partición de datos n de los datos de tarifas.

Diagrama de procesamiento de flujos de datos con Azure Databricks y Event Hubs.

Descargue un archivo de Visio de esta arquitectura.

En el generador de datos, el modelo de datos común para ambos tipos de registro tiene una propiedad PartitionKey que es la concatenación de Medallion, HackLicense y VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Esta propiedad se utiliza para proporcionar una clave de partición explícita cuando se realizan envíos a Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

La capacidad de procesamiento de Event Hubs se mide en unidades de procesamiento. Para escalar un centro de eventos automáticamente, puede habilitar el inflado automático, que permite escalar las unidades de procesamiento en función del tráfico, hasta un máximo configurado.

Procesamiento de flujos

En Azure Databricks, el procesamiento de datos se realiza mediante un trabajo. El trabajo está asignado a un clúster y se ejecuta en él. El trabajo puede ser código personalizado escrito en Java o un cuaderno de Spark.

En esta arquitectura de referencia, el trabajo es un archivo de Java con clases escritas tanto en Scala como en Java. Al especificar el archivo de Java para un trabajo de Databricks, se especifica la clase para que el clúster de Databricks lo ejecute. Aquí, el método main de la clase com.microsoft.pnp.TaxiCabReader contiene la lógica de procesamiento de datos.

Lectura del flujo de datos de las dos instancias del centro de eventos

La lógica de procesamiento de datos usa streaming estructurado de Spark para leer de las dos instancias de Azure Event Hub:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Enriquecimiento de los datos con la información de los barrios

Los datos de las carreras incluyen las coordenadas de latitud y longitud de las ubicaciones de origen y destino. Aunque estas coordenadas son útiles, no se consumen fácilmente en el análisis. Por lo tanto, estos datos se enriquecen con los datos de los barrios, que se leen desde un archivo de forma.

El formato del archivo de forma es binario y no se analiza fácilmente, pero la biblioteca GeoTools proporciona herramientas para que los datos geoespaciales usen el archivo de forma. Esta biblioteca se utiliza en la clase com.microsoft.pnp.GeoFinder para determinar el nombre del barrio en función de las coordenadas de origen y destino.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Combinación de los datos de carreras y tarifas

En primer lugar, se transforman los datos de carreras y tarifas:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Y, a continuación, los datos de las carreras se combinan con los datos de las tarifas:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Procesamiento de los datos e inserción en Azure Cosmos DB

La tarifa media para cada barrio se calcula para un intervalo de tiempo determinado:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Después, se inserta en Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Consideraciones

Estas consideraciones implementan los pilares del marco de buena arquitectura de Azure, que es un conjunto de principios guía que se pueden usar para mejorar la calidad de una carga de trabajo. Para más información, consulte Marco de buena arquitectura de Microsoft Azure.

Seguridad

La seguridad proporciona garantías contra ataques deliberados y el abuso de datos y sistemas valiosos. Para obtener más información, vea Lista de comprobación de revisión de diseño para security.

El acceso al área de trabajo de Azure Databricks se controla mediante la consola de administrador de La consola de administrador incluye funcionalidad para agregar usuarios, administrar permisos de usuario y configurar el inicio de sesión único. El control del acceso para las áreas de trabajo, los clústeres, los trabajos y las tablas también se puede establecer en la consola de administrador.

Administración de secretos

Azure Databricks incluye un almacén de secretos que se usa para almacenar credenciales y hacer referencia a ellas en cuadernos y trabajos. Los secretos del almacén de secretos de Azure Databricks se particionan por ámbitos:

databricks secrets create-scope --scope "azure-databricks-job"

Los secretos se agregan en el nivel de ámbito:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Nota:

Se debería utilizar un ámbito respaldado por Azure Key Vault en lugar del ámbito nativo de Azure Databricks. Para más información, consulte el artículo sobre los ámbitos respaldados por Azure Key Vault.

En el código, se obtiene acceso a los secretos mediante las utilidades de secretos de Azure Databricks.

Optimización de costos

La optimización de costos trata de buscar formas de reducir los gastos innecesarios y mejorar las eficiencias operativas. Para más información, vea Información general del pilar de optimización de costos.

Puede usar la calculadora de precios de Azure para calcular los costos. Estas son algunas consideraciones sobre los servicios que se usan en esta arquitectura de referencia.

Consideraciones sobre los costos de Event Hubs

Esta arquitectura de referencia implementa Event Hubs en el nivel Estándar. El modelo de precios se basa en las unidades de procesamiento, los eventos de entrada y los eventos de captura. Un evento de entrada es una unidad de datos de 64 KB o menos. Los mensajes más grandes se facturan en múltiplos de 64 KB. Las unidades de procesamiento se especifican a través de Azure Portal o de las API de administración de Events Hubs.

Si necesita más días de retención, considere la posibilidad de suscribirse al nivel Dedicado. Este nivel ofrece implementaciones de un solo inquilino con los requisitos más exigentes. Esta oferta crea un clúster basado en unidades de capacidad que no está limitado por las unidades de procesamiento.

El nivel Estándar se factura también en función de los eventos de entrada y las unidades de procesamiento.

Para más información sobre los precios de Event Hubs, consulte los precios de Event Hubs.

Consideraciones sobre los costos de Azure Databricks

Azure Databricks ofrece dos niveles, Estándar y Premium, y cada uno admite tres cargas de trabajo. Esta arquitectura de referencia implementa el área de trabajo de Azure Databricks en el nivel Premium.

Las cargas de trabajo de ingeniería de datos deben ejecutarse en un clúster de trabajos y son para que los ingenieros de datos compilen y ejecuten trabajos. Las cargas de trabajo de análisis de datos deben ejecutarse en un clúster de uso completo y están diseñadas para que los científicos de datos exploren, visualicen, manipulen y compartan datos e información de forma interactiva.

Azure Databricks ofrece muchos modelos de precios.

  • Plan de pago por uso

    Se le facturan las máquinas virtuales (VM) aprovisionadas en clústeres y unidades de Databricks (dBu) basadas en la instancia de máquina virtual seleccionada. Una DBU es una unidad de capacidad de procesamiento, facturada en un uso por segundo. El consumo de DBU depende del tamaño y el tipo de instancia que se ejecuta Azure Databricks. Los precios dependerán de la carga de trabajo y el nivel seleccionados.

  • Plan de compra previa

    Se compromete a unidades de Azure Databricks (DBU) como unidades de confirmación de Databricks (DBCU) durante uno o tres años para reducir el costo total de propiedad durante ese período de tiempo en comparación con el modelo de pago por uso.

Para más información, consulte Precios de Azure Databricks.

Consideraciones sobre los costos de Azure Cosmos DB

En esta arquitectura, el trabajo de Azure Databricks escribe una serie de registros en Azure Cosmos DB. Se le cobra por la capacidad reservada, expresada en unidades de solicitud por segundo (RU/s), que se usa para realizar operaciones de inserción. La unidad de facturación es 100 RU/s por hora. Por ejemplo, el costo de escribir elementos de 100 KB es de 50 RU/s.

En cuanto a operaciones de escritura, aprovisione capacidad suficiente para admitir el número de escrituras necesarias por segundo. Puede aumentar el rendimiento aprovisionado mediante el portal o la CLI de Azure antes de realizar operaciones de escritura y, a continuación, reducir el rendimiento una vez finalizadas esas operaciones. El rendimiento durante el período de escritura es el rendimiento mínimo necesario para los datos especificados más el rendimiento necesario para la operación de inserción, suponiendo que no haya otras cargas de trabajo en ejecución.

Ejemplo de análisis de costos

Supongamos que configura un valor de rendimiento de 1000 RU/s en un contenedor. Se implementa durante 24 horas durante 30 días, un total de 720 horas.

El contenedor se factura a 10 unidades de 100 RU/s por hora para cada hora. 10 unidades a 0,008 $ (por 100 RU/s por hora) se cobran a 0,08 $ por hora.

En 720 horas o 7200 unidades (de 100 RU), se le cobrarán 57,60 $ al mes.

El almacenamiento se factura también por cada GB usado para los datos e índice almacenados. Para más información, consulte el modelo de precios de Azure Cosmos DB.

Use la calculadora de capacidad de Azure Cosmos DB para obtener una estimación rápida del costo de la carga de trabajo.

Excelencia operativa

La excelencia operativa abarca los procesos de operaciones que implementan una aplicación y lo mantienen en ejecución en producción. Para obtener más información, vea Lista de comprobación de revisión de diseño para la excelencia operativa.

Supervisión

Azure Databricks se basa en Apache Spark, y ambos usan log4j como la biblioteca estándar para el registro. Además del registro predeterminado proporcionado por Apache Spark, puede implementar el registro en Azure Log Analytics siguiendo el artículo Supervisión de Azure Databricks.

Como la clase com.microsoft.pnp.TaxiCabReader procesa los mensajes de carreras y tarifas, es posible que alguno sea incorrecto y, por tanto, no sea válido. En un entorno de producción, es importante analizar estos mensajes incorrectamente formados para identificar un problema con los orígenes de datos, para poder corregirlos rápidamente y evitar la pérdida de datos. La clase com.microsoft.pnp.TaxiCabReader registra un acumulador de Apache Spark que controla el número de registros de carreras y tarifas con formato incorrecto:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark utiliza la biblioteca Dropwizard para enviar métricas, y algunos de los campos de métricas nativos de Dropwizard no son compatibles con Azure Log Analytics. Por lo tanto, esta arquitectura de referencia incluye un receptor y un notificador de Dropwizard personalizados. Da formato a las métricas con el formato que Azure Log Analytics espera. Cuando Apache Spark informa de las métricas, también se envían las métricas personalizadas de los datos de carreras y tarifas con formato incorrecto

A continuación, se muestra un ejemplo de consultas que puede usar en el área de trabajo de Azure Log Analytics para supervisar la ejecución del trabajo de streaming. El argumento ago(1d) de cada consulta devolverá todos los registros generados en el último día y se puede ajustar para ver un período de tiempo diferente.

Excepciones registradas durante la ejecución de consultas de flujos de datos

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Acumulación de datos de carrera y tarifas con formato incorrecto

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Ejecución de trabajos a lo largo del tiempo

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Para obtener más información, consulte Supervisión de Azure Databricks.

Organización e implementaciones de recursos

  • Cree grupos de recursos independientes para entornos de producción, desarrollo y pruebas. Los grupos de recursos independientes facilitan la administración de implementaciones, la eliminación de implementaciones de prueba y la asignación de derechos de acceso.

  • Use la plantilla de Azure Resource Manager para implementar los recursos de Azure según el proceso de infraestructura como código (IaC). Con las plantillas, es más fácil automatizar las implementaciones mediante Azure DevOps Services u otras soluciones de CI/CD.

  • Coloque cada carga de trabajo en una plantilla de implementación independiente y almacene los recursos en los sistemas de control de código fuente. Puede implementar las plantillas en conjunto o por separado como parte de un proceso de CI/CD, lo que facilita el proceso de automatización.

    En esta arquitectura, Azure Event Hubs, Log Analytics y Azure Cosmos DB se identifican como una sola carga de trabajo. Estos recursos se incluyen en una sola plantilla de Resource Manager.

  • Considere la posibilidad de almacenar provisionalmente las cargas de trabajo. Realice la implementación en varias fases y ejecute comprobaciones de validación en cada fase antes de pasar a la siguiente fase. De este modo, puede enviar actualizaciones a los entornos de producción de una manera muy controlada y minimizar los problemas de implementación imprevistos.

    En esta arquitectura hay varias fases de implementación. Considere la posibilidad de crear una canalización de Azure DevOps y agregar esas fases. Estos son algunos ejemplos de fases que se pueden automatizar:

    • Inicio de un clúster de Databricks
    • Configuración de la CLI de Databricks
    • Instalación de herramientas de Scala
    • Incorporación de los secretos de Databricks

    Además, considere la posibilidad de escribir pruebas automatizadas de integración para mejorar la calidad y la confiabilidad del código de Databricks y su ciclo de vida.

Implementación de este escenario

Para la implementación y la ejecución de la implementación de referencia, siga los pasos del archivo Léame de GitHub.

Paso siguiente