Cette architecture de référence présente un pipeline de traitement de flux de bout en bout. Les quatre étapes de ce pipeline sont l’ingestion, le processus, le stockage et l’analyse et le rapport. Pour cette architecture de référence, le pipeline ingère des données issues de deux sources, effectue une jonction sur les enregistrements apparentés de chaque flux, enrichit le résultat et calcule une moyenne en temps réel. Les résultats sont ensuite stockés pour une analyse plus approfondie.
Une implémentation de référence de cette architecture est disponible sur GitHub.
Architecture
Téléchargez un fichier Visio de cette architecture.
Workflow
Le flux de données suivant correspond au diagramme précédent :
Dans cette architecture, deux sources de données génèrent des flux de données en temps réel. Le premier flux contient des informations sur les courses, et le deuxième flux contient des informations sur les tarifs. L’architecture de référence inclut un générateur de données simulé qui lit à partir d’un ensemble de fichiers statiques et envoie les données à Azure Event Hubs. Les sources de données d’une application réelle sont des appareils installés dans les taxis.
Event Hubs est un service d’ingestion d’événements. Cette architecture utilise deux instances d’Event Hub, à savoir une par source de données. Chaque source de données envoie un flux de données à l’Event Hub associé.
Azure Databricks est une plateforme d’analytique basée sur Apache Spark optimisée pour la plateforme de services cloud Microsoft Azure. Azure Databricks est utilisé pour mettre en corrélation les données de trajet et de tarifs de taxi et pour enrichir les données corrélées avec les données de voisinage stockées dans le système de fichiers Azure Databricks.
Azure Cosmos DB est un service de base de données multimodélise entièrement managé. La sortie d’un travail Azure Databricks est une série d’enregistrements écrits dans Azure Cosmos DB for Apache Cassandra. Azure Cosmos DB for Apache Cassandra est utilisé, car il prend en charge la modélisation des données de série chronologique.
Azure Synapse Link pour Azure Cosmos DB vous permet d’exécuter des analyses en quasi-temps réel sur des données opérationnelles dans Azure Cosmos DB, sans aucun impact sur les performances ou les coûts sur votre charge de travail transactionnelle. Vous pouvez obtenir ces résultats en utilisant pool SQL serverless et pools Spark. Ces moteurs d’analyse sont disponibles à partir de votre espace de travail Azure Synapse Analytics.
mise en miroir d’Azure Cosmos DB pour NoSQL dans Microsoft Fabric vous permet d’intégrer des données Azure Cosmos DB avec le reste de vos données dans Microsoft Fabric.
Log Analytics est un outil dans Azure Monitor qui vous permet d’interroger et d’analyser les données de journal à partir de différentes sources. Les données du journal des applications collectées Azure Monitor sont stockées dans un espace de travail Log Analytics . Vous pouvez utiliser des requêtes Log Analytics pour analyser et visualiser les métriques et inspecter les messages de journal pour identifier les problèmes au sein de l’application.
Détails du scénario
Une compagnie de taxi collecte des données sur chaque trajet en taxi. Pour ce scénario, nous partons du principe que deux appareils distincts envoient des données. Le taxi dispose d’un compteur qui envoie des informations sur chaque trajet, y compris la durée, la distance et l’enlèvement et les emplacements de dépôt. Un autre périphérique accepte les paiements des clients et envoie des données sur les tarifs. Pour repérer les tendances des coureurs, la compagnie de taxi veut calculer le pourboire moyen par mile piloté pour chaque quartier, en temps réel.
Ingestion de données
Pour simuler une source de données, cette architecture de référence utilise le jeu de données de données de taxi New York City1. Ce jeu de données contient des données sur les trajets de taxi à New York de 2010 à 2013. Il contient des enregistrements de données sur les courses et les tarifs. Les données de trajet incluent la durée du trajet, la distance de trajet et les emplacements de prise en charge et de dépôt. Les secondes incluent le montant des tarifs des courses, des taxes et des pourboires. Les champs des deux types d’enregistrements incluent le numéro de médaillon, la licence hack et l’ID du fournisseur. La combinaison de ces trois champs identifie de façon unique un taxi et un chauffeur. Les données sont stockées au format CSV.
[1] Donovan, Brian; Work, Dan (2016) : Données de trajet des taxis de New York (2010-2013). Université de l’Illinois, Urbana-Champaign. https://doi.org/10.13012/J8PN93H8
Le générateur de données est une application .NET Core qui lit les enregistrements et les envoie à Event Hubs. Le générateur envoie les données des courses au format JSON et les données relatives aux tarifs au format CSV.
Le service Event Hubs utilise des partitions pour segmenter les données. Ce système de partition permet à un consommateur de lire chaque partition en parallèle. Lorsque vous envoyez des données à Event Hubs, vous pouvez spécifier directement la clé de partition. Sinon, les enregistrements sont affectés aux partitions de manière alternée.
Dans ce scénario, les données de trajet et les données de tarif doivent être affectées au même ID de partition pour un taxi spécifique. Cette affectation permet à Databricks d’appliquer un degré de parallélisme lorsqu’il met en corrélation les deux flux. Par exemple, un enregistrement dans la partition n des données de trajet correspond à un enregistrement dans la partition n des données tarifaires.
Téléchargez un fichier Visio de cette architecture.
Dans le générateur de données, le modèle de données commun pour les deux types d’enregistrement comprend une propriété PartitionKey
, qui est la concaténation de Medallion
, HackLicense
et VendorId
.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Cette propriété est utilisée pour fournir une clé de partition explicite lorsqu’elle envoie des données à Event Hubs.
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Event Hubs
La capacité de débit du service Event Hubs est mesurée par les unités de débit. Vous pouvez mettre à l’échelle automatiquement un hub d’événements en activant automatiquement. Cette fonctionnalité met automatiquement à l’échelle les unités de débit en fonction du trafic, jusqu’à un maximum configuré.
Traitement des flux de données
Dans Azure Databricks, un travail effectue le traitement des données. Le travail est affecté à un cluster, puis s’exécute dessus. Le travail peut être du code personnalisé écrit en Java ou un notebook Spark .
Dans cette architecture de référence, le travail est une archive Java qui contient des classes écrites en Java et Scala. Lorsque vous spécifiez l’archive Java pour un travail Databricks, le cluster Databricks spécifie la classe pour l’opération. Ici, la méthode main
de la classe com.microsoft.pnp.TaxiCabReader
contient la logique de traitement des données.
Lire le flux à partir des deux instances event Hub
La logique de traitement des données utilise Spark Structured Streaming pour lire dans les deux instances de hub d’événements Azure :
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Enrichir les données avec les informations de voisinage
Les données de trajet incluent les coordonnées de latitude et de longitude des emplacements d’enlèvement et de dépôt. Ces coordonnées sont utiles, mais pas facilement consommées pour l’analyse. Par conséquent, ces données sont enrichies avec des données de voisinage lues à partir d’un fichier de forme .
Le format du fichier de forme est binaire et n’est pas facilement analysé. Mais la bibliothèque GeoTools fournit des outils pour les données géospatiales qui utilisent le format de fichier de forme. Cette bibliothèque est utilisée dans la classe com.microsoft.pnp.GeoFinder
pour déterminer le nom du quartier en fonction des coordonnées des emplacements d’enlèvement et de dépôt.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Joindre les données de trajet et de prix
Dans un premier temps, les données de courses et de tarifs sont transformées :
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
Ensuite, les données de trajet sont jointes aux données tarifaires :
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Traiter les données et les insérer dans Azure Cosmos DB
Le montant moyen des tarifs pour chaque quartier est calculé pour un intervalle de temps spécifique :
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
Le montant moyen du tarif est ensuite inséré dans Azure Cosmos DB :
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Considérations
Ces considérations implémentent les piliers d’Azure Well-Architected Framework, un ensemble de principes directeurs que vous pouvez utiliser pour améliorer la qualité d’une charge de travail. Pour plus d’informations, consultez Microsoft Azure Well-Architected Framework.
Sécurité
La sécurité offre des garanties contre les attaques délibérées et l’utilisation abusive de vos données et systèmes précieux. Pour plus d’informations, consultez liste de vérification de la révision de conception pour security.
L’accès à l’espace de travail Azure Databricks est contrôlé à l’aide de la console administrateur . La console administrateur inclut des fonctionnalités permettant d’ajouter des utilisateurs, de gérer les autorisations utilisateur et de configurer l’authentification unique. Le contrôle d’accès pour les espaces de travail, les clusters, les travaux et les tables peut aussi être défini via la console administrateur.
Gérer les secrets
Azure Databricks inclut un magasin de secrets utilisé pour stocker les informations d’identification et les référencer dans les notebooks et les travaux. Étendue les secrets de partition dans le magasin de secrets Azure Databricks :
databricks secrets create-scope --scope "azure-databricks-job"
Les secrets sont ajoutés au niveau de l’étendue :
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Remarque
Utilisez une étendue azure Key Vault au lieu de l’étendue Azure Databricks native.
Dans le code, les secrets sont accessibles via les utilitaires de secrets Azure Databricks.
Optimisation des coûts
L’optimisation des coûts se concentre sur les moyens de réduire les dépenses inutiles et d’améliorer l’efficacité opérationnelle. Pour plus d’informations, consultez liste de vérification de la révision de conception pour l’optimisation des coûts.
Utiliser la calculatrice de prix Azure pour estimer les coûts. Tenez compte des services suivants utilisés dans cette architecture de référence.
Considérations relatives aux coûts Event Hubs
Cette architecture de référence déploie Event Hubs dans le niveau Standard. Le modèle de tarification repose sur les unités de débit, les événements d'entrée et les événements de capture. Un événement d’entrée est une unité de données de 64 Ko ou moins. Les messages plus volumineux sont facturés par multiples de 64 Ko. Vous pouvez spécifier les unités de débit par le biais du portail Azure ou des API de gestion Event Hubs.
Si vous avez besoin de jours de rétention supplémentaires, tenez compte du niveau Dédié. Ce niveau fournit des déploiements à locataire unique qui ont des exigences strictes. Cette offre génère un cluster basé sur des unités de capacité et ne dépend pas des unités de débit. Le niveau Standard est également facturé en fonction des événements d’entrée et des unités de débit.
Pour plus d’informations, consultez tarification Event Hubs.
Considérations relatives aux coûts Azure Databricks
Azure Databricks fournit le niveau Standard et le niveau Premium, qui prennent tous deux en charge trois charges de travail. Cette architecture de référence déploie un espace de travail Azure Databricks dans le niveau Premium.
Les charges de travail d’ingénierie des données doivent s’exécuter sur un cluster de travaux. Les ingénieurs données utilisent des clusters pour créer et effectuer des travaux. Les charges de travail d’analyse des données doivent s’exécuter sur un cluster à usage unique et sont destinées aux scientifiques des données pour explorer, visualiser, manipuler et partager des données et des insights de manière interactive.
Azure Databricks fournit plusieurs modèles tarifaires.
plan de paiement à l’utilisation
Vous êtes facturé pour les machines virtuelles approvisionnées dans les clusters et les unités Azure Databricks (DBUs) basées sur l’instance de machine virtuelle choisie. Une DBU est une unité de capacité de traitement facturée par utilisation par seconde. La consommation de DBU dépend de la taille et du type d’instance qui s’exécutent dans Azure Databricks. La tarification dépend de la charge de travail et du niveau choisis.
plan de pré-achat
Vous vous engagez à utiliser des unités de validation Azure Databricks pendant un ou trois ans pour réduire le coût total de possession au cours de cette période par rapport au modèle de paiement à l’utilisation.
Pour plus d’informations, consultez tarification d’Azure Databricks.
Considérations relatives aux coûts d’Azure Cosmos DB
Dans cette architecture, le travail Azure Databricks écrit une série d’enregistrements dans Azure Cosmos DB. Vous êtes facturé pour la capacité que vous réservez, qui est mesurée en unités de requête par seconde (RU/s). Cette capacité est utilisée pour effectuer des opérations d’insertion. L’unité de facturation est de 100 RU/s par heure. Par exemple, le coût d'écriture d'éléments de 100 Ko est de 50 RU/s.
Pour les opérations d'écriture, configurez une capacité suffisante pour prendre en charge le nombre d'écritures requises par seconde. Vous pouvez augmenter le débit provisionné à l’aide du portail ou d’Azure CLI avant d’effectuer des opérations d’écriture, puis réduire le débit une fois ces opérations terminées. Votre débit pour la période d’écriture est la somme du débit minimal nécessaire pour les données spécifiques et le débit requis pour l’opération d’insertion. Ce calcul suppose qu’aucune autre charge de travail n’est en cours d’exécution.
Exemple d’analyse des coûts
Supposons que vous configurez une valeur de débit de 1 000 RU/s sur un conteneur. Il est déployé pendant 24 heures pendant 30 jours, pour un total de 720 heures.
Le conteneur est facturé à 10 unités de 100 RU/s par heure pour chaque heure. Dix unités à 0,008 $ (par 100 RU/s par heure) sont facturées à 0,08 $ par heure.
Pendant 720 heures ou 7 200 unités (de 100 RU), vous êtes facturé 57,60 $ pour le mois.
Le stockage est également facturé pour chaque Go utilisé pour vos données stockées et index. Pour plus d’informations, consultez le modèle de tarification Azure Cosmos DB.
Utilisez la calculatrice de capacité Azure Cosmos DB pour une estimation rapide du coût de la charge de travail.
Excellence opérationnelle
L’excellence opérationnelle couvre les processus d’exploitation qui déploient une application et la conservent en production. Pour plus d’informations, consultez liste de vérification de la révision de conception pour l’excellence opérationnelle.
Surveillance
Azure Databricks est basé sur Apache Spark. Azure Databricks et Apache Spark utilisent Apache Log4j comme bibliothèque standard pour la journalisation. Outre la journalisation par défaut fourni par Apache Spark, vous pouvez implémenter la journalisation dans Log Analytics. Pour plus d'informations, consultez Supervision d'Azure Databricks.
À mesure que la classe com.microsoft.pnp.TaxiCabReader
traite les messages de trajet et de prix, un message peut être mal formé et par conséquent non valide. Dans un environnement de production, il est important d’analyser ces messages mal formés pour identifier un problème avec les sources de données afin qu’il puisse être résolu rapidement pour éviter la perte de données. La classe com.microsoft.pnp.TaxiCabReader
inscrit un accumulateur Apache Spark qui suit le nombre d’enregistrements tarifaires incorrects et les enregistrements de courses :
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Apache Spark utilise la bibliothèque Dropwizard pour envoyer des métriques. Certains des champs de métriques Dropwizard natifs sont incompatibles avec Log Analytics, c’est pourquoi cette architecture de référence inclut un récepteur Dropwizard personnalisé et un reporter. Il met en forme les métriques au format attendu par Log Analytics. Quand Apache Spark transmet des métriques, les métriques personnalisées pour les données de course et de tarif malformées sont aussi envoyées.
Vous pouvez utiliser les exemples de requêtes suivants dans votre espace de travail Log Analytics pour surveiller l’opération du travail de streaming. L’argument ago(1d)
dans chaque requête retourne tous les enregistrements générés le dernier jour. Vous pouvez ajuster ce paramètre pour afficher une période différente.
Exceptions journalisées pendant l’opération de requête de flux
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Accumulation de données de course et de tarif mal formées
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Opération de travail au fil du temps
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Organisation et déploiements de ressources
Créez des groupes de ressources distincts pour les environnements de production, de développement et de test. Des groupes de ressources distincts simplifient la gestion des déploiements, la suppression des déploiements de tests et l’attribution des droits d’accès.
Utilisez le modèle Azure Resource Manager pour déployer les ressources Azure en fonction du processus d’infrastructure en tant que code. En utilisant des modèles, vous pouvez facilement automatiser les déploiements avec services Azure DevOps ou d’autres solutions d’intégration continue et de livraison continue (CI/CD).
Placez chaque charge de travail dans un modèle de déploiement distinct et stockez les ressources dans des systèmes de contrôle de code source. Vous pouvez déployer les modèles, ensemble ou individuellement, dans le cadre d’un processus CI/CD. Cette approche simplifie le processus d’automatisation.
Dans cette architecture, Event Hubs, Log Analytics et Azure Cosmos DB sont identifiés comme une charge de travail unique. Ces ressources sont incluses dans un modèle Azure Resource Manager unique.
Envisagez d'échelonner vos charges de travail. Déployez sur différentes étapes et exécutez des vérifications de validation à chaque étape avant de passer à la phase suivante. De cette façon, vous pouvez contrôler la façon dont vous envoyez des mises à jour à vos environnements de production et réduire les problèmes de déploiement imprévus.
Dans cette architecture, il existe plusieurs étapes de déploiement. Envisagez de créer un pipeline Azure DevOps et d’ajouter ces étapes. Vous pouvez automatiser les étapes suivantes :
- Démarrez un cluster Databricks.
- Configurez l’interface CLI Databricks.
- Installez les outils Scala.
- Ajoutez les secrets Databricks.
Envisagez d’écrire des tests d’intégration automatisés pour améliorer la qualité et la fiabilité du code Databricks et de son cycle de vie.
Déployer ce scénario
Pour déployer et exécuter l’implémentation de référence, suivez les étapes décrites dans le fichier lisez-moi GitHub .