Questa architettura di riferimento illustra una pipeline di elaborazione di flussi end-to-end. Le quattro fasi di questa pipeline sono inserimento, elaborazione, archiviazione e analisi e report. Per questa architettura di riferimento, la pipeline inserisce i dati da due origini, esegue un join in record correlati da ogni flusso, arricchisce il risultato e calcola una media in tempo reale. I risultati vengono quindi archiviati per ulteriori analisi.
Un'implementazione di riferimento per questa architettura è disponibile in GitHub.
Architettura
Scarica un file Visio di questa architettura.
Workflow
Il flusso di dati seguente corrisponde al diagramma precedente:
In questa architettura sono presenti due origini dati che generano flussi di dati in tempo reale. Il primo flusso contiene informazioni sulle corse e il secondo flusso contiene informazioni sulle tariffe. L'architettura di riferimento include un generatore di dati simulato che legge da un set di file statici ed esegue il push dei dati in Hub eventi di Azure. Le origini dati in un'applicazione reale sono dispositivi installati nei taxi.
hub eventi è un servizio di inserimento eventi. Questa architettura usa due istanze di Hub eventi, una per ogni origine dati. Ogni origine dati invia un flusso di dati all'istanza associata di Hub eventi.
azure Databricks è una piattaforma di analisi basata su Apache Spark ottimizzata per la piattaforma dei servizi cloud di Microsoft Azure. Azure Databricks viene usato per correlare i dati relativi alle corse e alle tariffe dei taxi e per arricchire i dati correlati con i dati di quartiere archiviati nel file system di Azure Databricks.
azure Cosmos DB è un servizio di database a più modelli completamente gestito. L'output di un processo di Azure Databricks è una serie di record, che vengono scritti in Azure Cosmos DB per Apache Cassandra. Azure Cosmos DB per Apache Cassandra viene usato perché supporta la modellazione dei dati delle serie temporali.
collegamento ad Azure Synapse per Azure Cosmos DB consente di eseguire analisi quasi in tempo reale sui dati operativi in Azure Cosmos DB, senza alcun effetto sulle prestazioni o sui costi per il carico di lavoro transazionale. È possibile ottenere questi risultati usando pool SQL serverless e pool spark . Questi motori di analisi sono disponibili nell'area di lavoro di Azure Synapse Analytics.
il mirroring di Azure Cosmos DB per NoSQL in Microsoft Fabric consente di integrare i dati di Azure Cosmos DB con il resto dei dati in Microsoft Fabric.
log analytics è uno strumento in Monitoraggio di Azure che consente di eseguire query e analizzare i dati di log da varie origini. I dati del log applicazioni raccolti monitoraggio di Azure vengono archiviati in un'area di lavoro Log Analytics . È possibile usare le query di Log Analytics per analizzare e visualizzare le metriche ed esaminare i messaggi di log per identificare i problemi all'interno dell'applicazione.
Dettagli dello scenario
Una società di taxi raccoglie i dati relativi a ogni corsa in taxi. Per questo scenario si presuppone che due dispositivi separati inviino dati. Il taxi ha un contatore che invia informazioni su ogni corsa, tra cui la durata, la distanza e le posizioni di ritiro e consegna. Un dispositivo separato accetta i pagamenti dai clienti e invia dati sui prezzi delle corse. Per individuare le tendenze dei piloti, l'azienda di taxi vuole calcolare la mancia media per miglio guidata per ogni quartiere, in tempo reale.
Inserimento dati
Per simulare un'origine dati, questa architettura di riferimento usa il set di dati dei taxi di New York City 1. Questo set di dati contiene dati sulle corse in taxi a New York Dal 2010 al 2013. Contiene sia record di dati relativi alle corse che alle tariffe. I dati sulle corse includono durata delle corse, distanza di viaggio e località di ritiro e consegna. I dati relativi ai costi della corsa includono gli importi relativi a costo di base, imposte e mancia. I campi in entrambi i tipi di record includono numero di medaglia, licenza hack e ID fornitore. La combinazione di questi tre campi identifica in modo univoco un taxi e un conducente. I dati vengono archiviati in formato CSV.
[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8
Il generatore di dati è un'applicazione .NET Core che legge i record e li invia a Hub eventi. Il generatore invia i dati relativi alle corse in formato JSON e i dati relativi ai costi in formato CSV.
Hub eventi usa partizioni per segmentare i dati. Le partizioni consentono a un consumer di leggere ogni partizione in parallelo. Quando si inviano dati a Hub eventi, è possibile specificare direttamente la chiave di partizione. In caso contrario, i record vengono assegnati alle partizioni in modalità round-robin.
In questo scenario, ai dati relativi alle corse e alle tariffe deve essere assegnato lo stesso ID di partizione per un taxi specifico. Questa assegnazione consente a Databricks di applicare un grado di parallelismo quando correla i due flussi. Ad esempio, un record nella partizione n dei dati delle corse corrisponde a un record nella partizione n dei dati delle tariffe.
Scaricare un file di Visio di questa architettura.
Nel generatore di dati il modello di dati comune per entrambi i tipi di record ha una proprietà PartitionKey
che corrisponde alla concatenazione di Medallion
, HackLicense
e VendorId
.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Questa proprietà viene usata per fornire una chiave di partizione esplicita quando invia dati a Hub eventi.
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Hub eventi di
La capacità di elaborazione di Hub eventi viene misurata in unità elaborate. È possibile ridimensionare automaticamente un hub eventi abilitando aumento automatico. Questa funzionalità ridimensiona automaticamente le unità elaborate in base al traffico, fino a un massimo configurato.
Elaborazione dei flussi
In Azure Databricks un processo esegue l'elaborazione dei dati. Il processo viene assegnato a un cluster e quindi viene eseguito su di esso. Il processo può essere codice personalizzato scritto in Java o in un notebook spark .
In questa architettura di riferimento il processo è un archivio Java con classi scritte in Java e Scala. Quando si specifica l'archivio Java per un processo di Databricks, il cluster Databricks specifica la classe per l'operazione. In questo caso, il main
metodo della classe contiene la logica di com.microsoft.pnp.TaxiCabReader
elaborazione dei dati.
Leggere il flusso dalle due istanze dell'hub eventi
La logica di elaborazione dei dati usa lo streaming strutturato Spark per leggere dalle due istanze dell'hub eventi di Azure:
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Arricchire i dati con le informazioni sul quartiere
I dati sulle corse includono le coordinate di latitudine e longitudine delle posizioni di ritiro e consegna. Queste coordinate sono utili ma non facilmente utilizzate per l'analisi. Di conseguenza, questi dati vengono arricchiti con i dati di quartiere letti da un file di forma .
Il formato del file di forma è binario e non è facile da analizzare. Tuttavia, la libreria GeoTools fornisce strumenti per i dati geospaziali che usano il formato shapefile. Questa libreria viene usata nella classe com.microsoft.pnp.GeoFinder
per determinare il nome del quartiere in base alle coordinate per le posizioni di ritiro e consegna.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Partecipare ai dati relativi alle corse e alle tariffe
Prima di tutto i dati su corse e tariffe vengono trasformati:
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
I dati delle corse vengono quindi uniti ai dati relativi alle tariffe:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Elaborare i dati e inserirli in Azure Cosmos DB
L'importo medio della tariffa per ogni quartiere viene calcolato per un intervallo di tempo specifico:
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
L'importo medio della tariffa viene quindi inserito in Azure Cosmos DB:
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Considerazioni
Queste considerazioni implementano i pilastri di Azure Well-Architected Framework, che è un set di principi guida che possono essere usati per migliorare la qualità di un carico di lavoro. Per altre informazioni, vedere Microsoft Azure Well-Architected Framework.
Sicurezza
La sicurezza offre garanzie contro attacchi intenzionali e l'uso improprio dei dati e dei sistemi preziosi. Per altre informazioni, vedere Elenco di controllo per la revisione della progettazione per Security.
L'accesso all'area di lavoro di Azure Databricks viene controllato usando la console di amministrazione . La console di amministrazione include funzionalità per aggiungere utenti, gestire le autorizzazioni utente e configurare l'accesso Single Sign-On. La console permette anche di impostare il controllo di accesso ad aree di lavoro, cluster, processi e tabelle.
Gestire i segreti
Azure Databricks include un archivio segreto usato per archiviare le credenziali e farvi riferimento in notebook e processi. Definire l'ambito dei segreti di partizione all'interno dell'archivio segreto di Azure Databricks:
databricks secrets create-scope --scope "azure-databricks-job"
I segreti vengono aggiunti a livello ambito:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Nel codice, i segreti sono accessibili grazie alle utilità dei segreti di Azure Databricks.
Ottimizzazione costi
L'ottimizzazione dei costi è incentrata sui modi per ridurre le spese non necessarie e migliorare l'efficienza operativa. Per altre informazioni, vedere Elenco di controllo per la revisione della progettazione per l'ottimizzazione dei costi.
Usare il calcolatore dei prezzi di Azure per stimare i costi. Considerare i servizi seguenti usati in questa architettura di riferimento.
Considerazioni sul costo di Hub eventi
Questa architettura di riferimento distribuisce Hub eventi nel livello Standard. Il modello di determinazione dei prezzi si basa su unità elaborate, eventi in ingresso ed eventi di acquisizione. Un evento in ingresso è un'unità di dati con dimensioni inferiori o inferiori a 64 KB. I messaggi di dimensioni maggiori sono fatturati in multipli di 64 KB. È possibile specificare le unità elaborate tramite le API di gestione di hub eventi o di portale di Azure.
Se sono necessari più giorni di conservazione, prendere in considerazione il livello Dedicato. Questo livello offre distribuzioni a tenant singolo con requisiti rigorosi. Questa offerta crea un cluster basato su unità di capacità e non dipende dalle unità elaborate. Il livello Standard viene fatturato anche in base agli eventi in ingresso e alle unità elaborate.
Per altre informazioni, vedere prezzi di Hub eventi.
Considerazioni sul costo di Azure Databricks
Azure Databricks offre il livello Standard e il livello Premium, entrambi supportati da tre carichi di lavoro. Questa architettura di riferimento distribuisce un'area di lavoro di Azure Databricks nel livello Premium.
I carichi di lavoro di progettazione dei dati devono essere eseguiti in un cluster di processi. I data engineer usano cluster per compilare ed eseguire processi. I carichi di lavoro di analisi dei dati devono essere eseguiti in un cluster all-purpose e sono destinati ai data scientist per esplorare, visualizzare, modificare e condividere dati e informazioni dettagliate in modo interattivo.
Azure Databricks offre più modelli di prezzi.
piano con pagamento in base al consumo
Viene addebitato il provisioning di macchine virtuali (VM) nei cluster e nelle unità di Azure Databricks (DBU) in base all'istanza di macchina virtuale scelta. Una DBU è un'unità di funzionalità di elaborazione fatturata per utilizzo al secondo. L'utilizzo di DBU dipende dalle dimensioni e dal tipo di istanza in esecuzione in Azure Databricks. I prezzi dipendono dal carico di lavoro e dal livello scelti.
piano di pre-acquisto
Si esegue il commit in unità di database come unità di commit di Azure Databricks per uno o tre anni per ridurre il costo totale di proprietà in tale periodo di tempo rispetto al modello con pagamento in base al consumo.
Per altre informazioni, vedere prezzi di Azure Databricks.
Considerazioni sul costo di Azure Cosmos DB
In questa architettura, il processo di Azure Databricks scrive una serie di record in Azure Cosmos DB. Viene addebitata la capacità che si riserva, misurata in unità richiesta al secondo (UR/sec). Questa capacità viene usata per eseguire operazioni di inserimento. L'unità per la fatturazione è di 100 UR/sec all'ora. Ad esempio, il costo di scrittura di elementi da 100 KB è di 50 UR/sec.
Per le operazioni di scrittura, predisporre una capacità sufficiente a supportare il numero di scritture necessarie al secondo. È possibile aumentare la velocità effettiva con provisioning usando il portale o l'interfaccia della riga di comando di Azure prima di eseguire operazioni di scrittura e quindi ridurre la velocità effettiva al termine di tali operazioni. La velocità effettiva per il periodo di scrittura è la somma della velocità effettiva minima necessaria per i dati specifici e la velocità effettiva necessaria per l'operazione di inserimento. Questo calcolo presuppone che non sia in esecuzione alcun altro carico di lavoro.
Analisi dei costi di esempio
Si supponga di configurare un valore di velocità effettiva di 1.000 UR/sec in un contenitore. Viene distribuito per 24 ore per 30 giorni, per un totale di 720 ore.
Il contenitore viene fatturato a 10 unità di 100 UR/sec all'ora per ogni ora. Dieci unità a $0,008 (per 100 UR/sec all'ora) vengono addebitate a $ 0,08 all'ora.
Per 720 ore o 7.200 unità (di 100 UR), viene fatturato $ 57,60 per il mese.
L'archiviazione viene fatturata anche per ogni GB usato per i dati archiviati e l'indice. Per altre informazioni, vedere Modello di prezzi di Azure Cosmos DB.
Usare il calcolatore della capacità di Azure Cosmos DB per una rapida stima del costo del carico di lavoro.
Eccellenza operativa
L'eccellenza operativa copre i processi operativi che distribuiscono un'applicazione e lo mantengono in esecuzione nell'ambiente di produzione. Per altre informazioni, vedere Elenco di controllo per la revisione della progettazione per l'eccellenza operativa.
Monitoraggio
Azure Databricks si basa su Apache Spark. Sia Azure Databricks che Apache Spark usano Apache Log4j come libreria standard per la registrazione. Oltre alla registrazione predefinita fornita da Apache Spark, è possibile implementare la registrazione in Log Analytics. Per ulteriori informazioni, vedere Monitoraggio di Azure Databricks.
Poiché la classe com.microsoft.pnp.TaxiCabReader
elabora i messaggi di corsa e di tariffa, un messaggio potrebbe non essere valido e pertanto non valido. In un ambiente di produzione, è importante analizzare questi messaggi in formato non valido per identificare un problema con le origini dati in modo che possa essere risolto rapidamente per evitare la perdita di dati. La classe com.microsoft.pnp.TaxiCabReader
registra un Apache Spark Enumerat che tiene traccia del numero di record di tariffe non validi e record di corsa:
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Apache Spark usa la libreria Dropwizard per inviare le metriche. Alcuni campi delle metriche dropwizard nativi non sono compatibili con Log Analytics, motivo per cui questa architettura di riferimento include un sink dropwizard personalizzato e un reporter. Formatta le metriche nel formato previsto da Log Analytics. Quando Apache Spark riporta le metriche, vengono inviate anche le metriche personalizzate per i dati di corse e tariffe in formato non valido.
È possibile usare le query di esempio seguenti nell'area di lavoro Log Analytics per monitorare il funzionamento del processo di streaming. L'argomento ago(1d)
in ogni query restituisce tutti i record generati nell'ultimo giorno. È possibile modificare questo parametro per visualizzare un periodo di tempo diverso.
Eccezioni registrate durante l'operazione di query di flusso
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Accumulo di dati su tariffe e corse in formato non valido
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Operazione del processo nel tempo
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Organizzazione delle risorse e distribuzioni
Creare gruppi di risorse separati per gli ambienti di produzione, sviluppo e test. L'uso di gruppi di risorse separati semplifica la gestione delle distribuzioni, l'eliminazione delle distribuzioni di test e l'assegnazione dei diritti di accesso.
Usare il modello di Azure Resource Manager per distribuire le risorse di Azure in base al processo di infrastruttura come codice. Usando i modelli, è possibile automatizzare facilmente le distribuzioni con servizi Azure DevOps o altre soluzioni di integrazione continua e recapito continuo (CI/CD).
Inserire ogni carico di lavoro in un modello di distribuzione separato e archiviare le risorse nei sistemi di controllo del codice sorgente. È possibile distribuire i modelli insieme o separatamente come parte di un processo CI/CD. Questo approccio semplifica il processo di automazione.
In questa architettura, Hub eventi, Log Analytics e Azure Cosmos DB vengono identificati come un singolo carico di lavoro. Queste risorse sono incluse in un singolo modello di Azure Resource Manager.
Prendere in considerazione la gestione temporanea dei carichi di lavoro. Eseguire la distribuzione in varie fasi ed eseguire i controlli di convalida in ogni fase prima di passare alla fase successiva. In questo modo è possibile controllare come eseguire il push degli aggiornamenti agli ambienti di produzione e ridurre al minimo i problemi di distribuzione imprevisti.
In questa architettura sono presenti più fasi di distribuzione. Prendere in considerazione la creazione di una pipeline di Azure DevOps e l'aggiunta di tali fasi. È possibile automatizzare le fasi seguenti:
- Avviare un cluster Databricks.
- Configurare l'interfaccia della riga di comando di Databricks.
- Installare gli strumenti Scala.
- Aggiungere i segreti di Databricks.
Prendere in considerazione la scrittura di test di integrazione automatizzati per migliorare la qualità e l'affidabilità del codice databricks e del relativo ciclo di vita.
Distribuire lo scenario
Per distribuire ed eseguire l'implementazione di riferimento, seguire la procedura descritta nel file leggimi di GitHub .