Cette architecture de référence présente un pipeline de traitement de flux de bout en bout. Ce type de pipeline comprend quatre étapes : ingestion, traitement, stockage, analyse et création de rapports. Pour cette architecture de référence, le pipeline ingère des données issues de deux sources, effectue une jonction sur les enregistrements apparentés de chaque flux, enrichit le résultat et calcule une moyenne en temps réel. Les résultats sont stockés en vue d’une analyse plus approfondie.
Une implémentation de référence de cette architecture est disponible sur GitHub.
Architecture
Téléchargez un fichier Visio de cette architecture.
Workflow
L’architecture est constituée des composants suivants :
Sources de données. Dans cette architecture, deux sources de données génèrent des flux de données en temps réel. Le premier flux de données contient des informations sur les courses, tandis que le second contient des informations sur les tarifs. L’architecture de référence comprend un générateur de données simulées qui lit le contenu d’un ensemble de fichiers statiques et envoie (push) les données vers Event Hubs. Les sources de données dans une application réelle seraient les appareils installés dans les taxis.
Azure Event Hubs. Event Hubs est un service d’ingestion d’événements. Cette architecture utilise deux instances d’Event Hub, à savoir une par source de données. Chaque source de données envoie un flux de données à l’Event Hub associé.
Azure Databricks. Databricks est une plateforme d’analytique basée sur Apache Spark et optimisée pour la plateforme de services cloud Microsoft Azure. Databricks est utilisé pour mettre en corrélation les données de courses et de tarifs des taxis, mais aussi pour enrichir les données mises en corrélation avec les données de quartiers stockées dans le système de fichiers Databricks.
Azure Cosmos DB. La sortie d’un travail Azure Databricks est une série d’enregistrements écrits dans Azure Cosmos DB for Apache Cassandra. Azure Cosmos DB for Apache Cassandra est utilisé, car il prend en charge la modélisation des données de série chronologique.
Azure Synapse Link pour Azure Cosmos DB vous permet d’effectuer de l’analytique en quasi temps réel sur les données opérationnelles dans Azure Cosmos DB, sans aucun impact en termes de perfomances ou de coûts sur votre charge de travail transactionnelle, en utilisant les deux moteurs d’analytique disponibles depuis votre espace de travail Azure Synapse : SQL Serverless et des pools Spark.
Microsoft Fabric Mirroring Azure Cosmos DB pour NoSQL vous permet d’intégrer des données Azure Cosmos DB avec le reste de vos données dans Microsoft Fabric.
Azure Log Analytics. Les données de journal d’application collectées par Azure Monitor sont stockées dans un espace de travail Log Analytics. Il est possible d’utiliser des requêtes Log Analytics pour analyser et visualiser les métriques et inspecter les messages de journal pour identifier les problèmes au sein de l’application.
Détails du scénario
Scénario : Une compagnie de taxis collecte des données sur chaque trajet effectué en taxi. Pour ce scénario, nous partons du principe que deux périphériques distincts envoient des données. Le taxi est équipé d’un compteur qui envoie les informations suivantes sur chaque course : durée, distance et lieux de prise en charge et de dépose. Un autre périphérique accepte les paiements des clients et envoie des données sur les tarifs. Afin de déterminer les tendances des usagers, la compagnie de taxis souhaite calculer le pourboire moyen par mile parcouru, en temps réel, pour chaque quartier.
Ingestion de données
Pour simuler une source de données, cette architecture de référence utilise les données des taxis de la ville de New York[1]. Ce jeu de données contient des données sur les courses de taxis à New York sur une période de quatre ans (2010 – 2013). Il contient deux types d’enregistrement : les données sur les courses et les données sur les tarifs. Les premières incluent la durée du trajet, la distance et les lieux de prise en charge et de dépose. Les secondes incluent le montant des tarifs des courses, des taxes et des pourboires. Les champs communs aux deux types d’enregistrement sont le numéro de médaillon (« taxi jaune »), le permis spécial et l’ID fournisseur. Ensemble ces trois champs identifient un taxi ainsi qu’un chauffeur. Les données sont stockées au format CSV.
[1] Donovan, Brian; Work, Dan (2016) : Données de trajet des taxis de New York (2010-2013). Université de l’Illinois, Urbana-Champaign. https://doi.org/10.13012/J8PN93H8
Le générateur de données est une application .NET Core qui lit les enregistrements et les envoie à Azure Event Hubs. Le générateur envoie les données des courses au format JSON et les données relatives aux tarifs au format CSV.
Le service Event Hubs utilise des partitions pour segmenter les données. Ce système de partition permet à un consommateur de lire chaque partition en parallèle. Lorsque vous envoyez des données à Event Hubs, vous pouvez spécifier explicitement la clé de partition. Sinon, les enregistrements sont affectés aux partitions de manière alternée.
Dans ce scénario, les données de courses et de tarifs doivent se retrouver avec le même ID de partition pour un taxi donné. Cela permet à Databricks d’appliquer un degré de parallélisme quand il met en corrélation les deux flux. Un enregistrement dans la partition n des données des courses correspond à un enregistrement de la partition n des données relatives aux tarifs.
Téléchargez un fichier Visio de cette architecture.
Dans le générateur de données, le modèle de données commun pour les deux types d’enregistrement comprend une propriété PartitionKey
, qui est la concaténation de Medallion
, HackLicense
et VendorId
.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Cette propriété est utilisée pour fournir une clé de partition explicite lors de l’envoi des données vers Event Hubs :
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Event Hubs
La capacité de débit du service Event Hubs est mesurée par les unités de débit. Vous pouvez mettre automatiquement à l’échelle un Event Hub en activant l’augmentation automatique, qui ajuste automatiquement les unités de débit en fonction du trafic, jusqu’à la limite configurée.
Traitement des flux de données
Dans Azure Databricks, le traitement des données est assuré par un travail. Le travail est attribué à un cluster, qui en assure l’exécution. Le travail peut être du code personnalisé écrit en Java ou un notebook Spark.
Dans cette architecture de référence, le travail est une archive Java avec des classes écrites en Java et Scala. Au moment de spécifier l’archive Java pour un travail Databricks, la classe doit être spécifiée en vue d’une exécution par le cluster Databricks. Ici, la méthode main
de la classe com.microsoft.pnp.TaxiCabReader
contient la logique de traitement des données.
Lecture du flux à partir des deux instances de hub d’événements
La logique de traitement des données utilise Spark Structured Streaming pour lire dans les deux instances de hub d’événements Azure :
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Enrichissement des données avec les informations de quartiers
Les données de courses comprennent les coordonnées de latitude et de longitude des lieux de prise en charge et de dépôt des clients. Même si ces coordonnées sont utiles, elles ne sont pas facilement exploitables pour l’analyse. Par conséquent, ces données sont enrichies avec les données de quartiers lues dans un fichier de forme.
De format binaire, le fichier de forme ne s’analyse pas facilement, mais la bibliothèque GeoTools propose des outils pour les données géospatiales qui utilisent le format de fichier de forme. Cette bibliothèque est utilisée dans la classe com.microsoft.pnp.GeoFinder
pour déterminer le nom du quartier en fonction des coordonnées de prise en charge et de dépôt.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Jonction des données de courses et de tarifs
Dans un premier temps, les données de courses et de tarifs sont transformées :
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
Les données de courses sont ensuite jointes aux données de tarifs :
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Traitement des données et insertion dans Azure Cosmos DB
Le montant du tarif moyen pour chaque quartier est calculé pour un intervalle de temps donné :
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
Il est ensuite inséré dans Azure Cosmos DB :
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Considérations
Ces considérations implémentent les piliers d’Azure Well-Architected Framework qui est un ensemble de principes directeurs qui permettent d’améliorer la qualité d’une charge de travail. Pour plus d’informations, consultez Microsoft Azure Well-Architected Framework.
Sécurité
La sécurité fournit des garanties contre les attaques délibérées, et contre l’utilisation abusive de vos données et systèmes importants. Pour plus d’informations, consultez liste de vérification de la révision de conception pour security.
L’accès à l’espace de travail Azure Databricks est contrôlé à l’aide de la console d’administration La console administrateur inclut des fonctionnalités permettant d’ajouter des utilisateurs, de gérer les autorisations utilisateur et de configurer l’authentification unique. Le contrôle d’accès pour les espaces de travail, les clusters, les travaux et les tables peut aussi être défini via la console administrateur.
Gestion des secrets
Azure Databricks inclut un secret store qui est utilisé pour stocker les informations d'identification et les référencer dans les notebooks et les tâches. Les secrets contenus dans le magasin des secrets Azure Databricks sont partitionnés par étendues (scope) :
databricks secrets create-scope --scope "azure-databricks-job"
Les secrets sont ajoutés au niveau de l’étendue :
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Remarque
Une étendue basée sur Azure Key Vault doit être utilisée à la place de l'étendue native Azure Databricks. Pour en savoir plus, consultez la page relative aux étendues adossées à Azure Key Vault.
Dans le code, les secrets sont accessibles via les utilitaires de secrets Azure Databricks.
Optimisation des coûts
L’optimisation des coûts consiste à réduire les dépenses inutiles et à améliorer l’efficacité opérationnelle. Pour plus d’informations, consultez liste de vérification de la révision de conception pour l’optimisation des coûts.
Utiliser la calculatrice de prix Azure pour estimer les coûts. Les considérations suivantes s'appliquent aux services utilisés dans cette architecture de référence.
Considérations relatives aux coûts Event Hubs
Cette architecture de référence déploie Event Hubs au niveau Standard. Le modèle de tarification repose sur les unités de débit, les événements d'entrée et les événements de capture. Un événement d'entrée est une unité de données de 64 Ko ou moins. Les messages plus volumineux sont facturés par multiples de 64 Ko. Vous pouvez spécifier les unités de débit par le biais du portail Azure ou des API de gestion Event Hubs.
Si vous avez besoin de plus de jours de conservation, envisagez le niveau Dedicated. Ce niveau offre des déploiements à un seul locataire répondant aux exigences les plus strictes. Cette offre constitue un cluster basé sur les unités de capacité (CU) qui n'est pas liée par des unités de débit.
Le niveau Standard est également facturé sur la base des événements d'entrée et des unités de débit.
Pour plus d'informations sur la tarification d'Event Hubs, consultez la rubrique Tarification d'Event Hubs.
Considérations relatives aux coûts Azure Databricks
Azure Databricks offre deux niveaux, Standard et Premium, chacun prenant en charge trois charges de travail. Cette architecture de référence déploie l'espace de travail Azure Databricks au niveau Premium.
Les charges de travail d’ingénierie des données doivent s’exécuter sur un cluster de travaux et sont destinées aux ingénieurs de données à créer et à exécuter des travaux. Les charges de travail d’analytique des données doivent s’exécuter sur un cluster à usage unique et sont destinées aux scientifiques des données pour explorer, visualiser, manipuler et partager des données et des insights de manière interactive.
Azure Databricks propose de nombreux modèles de tarification.
Plan de paiement à l'utilisation
Vous êtes facturé pour les machines virtuelles configurées en clusters et Unités Databricks (DBU) en fonction de l'instance de machine virtuelle sélectionnée. Une DBU est une unité de capacité de traitement facturée sur la base d'une utilisation par seconde. La consommation de DBU dépend de la taille et du type d'instance qui exécute Azure Databricks. La tarification dépendra de la charge de travail et du niveau sélectionnés.
Plan de pré-achat
Vous vous engagez sur Azure Databricks Units (DBU) en tant qu’unités de validation Databricks (DBCU) pendant un ou trois ans pour réduire le coût total de possession au cours de cette période par rapport au modèle de paiement à l’utilisation.
Pour plus d'informations, consultez Tarification d'Azure Databricks.
Considérations relatives aux coûts d’Azure Cosmos DB
Dans cette architecture, une série d’enregistrements est écrite dans Azure Cosmos DB par le travail Azure Databricks. Vous êtes facturé en fonction de la capacité que vous réservez, exprimée en unités de requête par seconde (RU/s), et qui est utilisée pour effectuer des opérations d'insertion. L'unité de facturation est de 100 RU/s par heure. Par exemple, le coût d'écriture d'éléments de 100 Ko est de 50 RU/s.
Pour les opérations d'écriture, configurez une capacité suffisante pour prendre en charge le nombre d'écritures requises par seconde. Vous pouvez augmenter le débit configuré à l'aide du portail ou d'Azure CLI avant d'effectuer les opérations d'écriture, puis réduire le débit une fois les écritures terminées. Le débit relatif à la période d'écriture correspond au débit minimal nécessaire pour les données spécifiées, auquel s'ajoute le débit nécessaire pour l'opération d'insertion, en supposant qu'aucune autre charge de travail n'est en cours d'exécution.
Exemple d’analyse des coûts
Supposons que vous configuriez une valeur de débit de 1000 RU/s sur un conteneur. Celui-ci est déployé 24 heures sur 24 pendant 30 jours, soit un total de 720 heures.
Le conteneur est facturé à 10 unités de 100 RU/s par heure pour chaque heure. 10 unités à 0,008 USD (100 RU/s par heure) sont facturées à 0,08 USD par heure.
Pour 720 heures ou 7200 unités (de 100 RU), vous êtes facturé 57,60 USD pour le mois.
Le stockage est également facturé pour chaque Go utilisé pour vos données et index stockés. Pour plus d’informations, consultez le modèle de tarification Azure Cosmos DB.
Utilisez la calculatrice de capacité Azure Cosmos DB pour obtenir une estimation rapide du coût de la charge de travail.
Excellence opérationnelle
L’excellence opérationnelle couvre les processus d’exploitation qui déploient une application et la conservent en production. Pour plus d’informations, consultez liste de vérification de la révision de conception pour l’excellence opérationnelle.
Surveillance
Azure Databricks est basé sur Apache Spark et tous deux utilisent log4j comme bibliothèque standard pour la journalisation. En plus de la journalisation par défaut fournie par Apache Spark, vous pouvez implémenter une journalisation dans Azure Log Analytics en procédant de la manière décrite dans l’article Supervision d’Azure Databricks.
Comme la classe com.microsoft.pnp.TaxiCabReader
traite les messages relatifs aux courses et aux tarifs, il est possible que l’un des deux soit mal formé et donc non valide. Dans un environnement de production, il est important d’analyser ces messages mal formés pour identifier un problème au niveau des sources de données et le résoudre rapidement pour éviter toute perte de données. La classe com.microsoft.pnp.TaxiCabReader
inscrit un accumulateur Apache Spark qui assure le suivi du nombre d’enregistrements relatifs aux courses et aux tarifs mal formés :
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Apache Spark utilise la bibliothèque Dropwizard pour envoyer des métriques, et certains champs de métriques Dropwizard natifs sont incompatibles avec Azure Log Analytics. Par conséquent, cette architecture de référence comprend un récepteur et un rapporteur Dropwizard personnalisés. Elle met en forme les métriques dans le format attendu par Azure Log Analytics. Quand Apache Spark transmet des métriques, les métriques personnalisées pour les données de course et de tarif malformées sont aussi envoyées.
Voici des exemples de requêtes que vous pouvez utiliser dans votre espace de travail Azure Log Analytics pour surveiller l’exécution du travail de diffusion en continu. L’argument ago(1d)
dans chaque requête retourne tous les enregistrements générés au cours de la dernière journée. Vous pouvez l’ajuster pour afficher une autre période.
Exceptions journalisées pendant l’exécution de requête de flux
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Accumulation de données de course et de tarif mal formées
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Exécution du travail au fil du temps
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Pour plus d'informations, consultez Supervision d'Azure Databricks.
Organisation et déploiements de ressources
Créez des groupes de ressources distincts pour les environnements de production, de développement et de test. Des groupes de ressources distincts simplifient la gestion des déploiements, la suppression des déploiements de tests et l’attribution des droits d’accès.
Utilisez le modèle Azure Resource Manager pour déployer les ressources Azure en suivant le processus IaC (Infrastructure as Code). Grâce aux modèles, il est plus facile d'automatiser les déploiements à l'aide d'Azure DevOps Services, ou d'autres solutions de CI/CD.
Placez chaque charge de travail dans un modèle de déploiement distinct et stockez les ressources dans des systèmes de contrôle de code source. Vous pouvez déployer les modèles ensemble ou individuellement dans le cadre d'un processus CI/CD pour faciliter le processus d'automatisation.
Dans cette architecture, Azure Event Hubs, Log Analytics et Azure Cosmos DB sont identifiés comme une charge de travail unique. Ces ressources sont incluses dans un même modèle ARM.
Envisagez d'échelonner vos charges de travail. Déployez à différentes étapes et effectuez des vérifications de la validation à chaque étape avant de passer à l'étape suivante. Vous pourrez ainsi envoyer (push) les mises à jour de vos environnements de production de manière hautement contrôlée et limiter les problèmes de déploiement imprévus.
Cette architecture comporte plusieurs étapes de déploiement. Envisagez de créer un pipeline Azure DevOps et d'ajouter ces étapes. Voici quelques exemples d'étapes que vous pouvez automatiser :
- Démarrer un cluster Databricks
- Configurer l'interface de ligne de commande Databricks
- Installer les outils Scala
- Ajouter les secrets Databricks
Envisagez également de créer des tests d'intégration automatisés pour améliorer la qualité et la fiabilité du code Databricks et de son cycle de vie.
Déployer ce scénario
Pour déployer et exécuter l’implémentation de référence, suivez les étapes du fichier Readme de GitHub.