Elaborazione di flussi con Azure Databricks

Azure Cosmos DB
Azure Databricks
Hub eventi di Azure
Azure Log Analytics
Monitoraggio di Azure

Questa architettura di riferimento illustra una pipeline di elaborazione di flussi end-to-end. Questo tipo di pipeline include quattro fasi: inserimento, processo, archiviazione, e analisi e creazione di report. Per questa architettura di riferimento, la pipeline inserisce i dati da due origini, esegue un join in record correlati da ogni flusso, arricchisce il risultato e calcola una media in tempo reale. I risultati vengono archiviati per analisi aggiuntive.

Logo di GitHub Un'implementazione di riferimento per questa architettura è disponibile in GitHub.

Architettura

Il diagramma mostra un'architettura di riferimento per l'elaborazione di flussi con Azure Databricks

Scaricare un file di Visio di questa architettura.

Workflow

Questa architettura è costituita dai componenti seguenti:

Origini dati. In questa architettura sono presenti due origini dati che generano flussi di dati in tempo reale. Il primo flusso contiene le informazioni sulla corsa e il secondo contiene le informazioni sui costi delle corse. L'architettura di riferimento include un generatore di dati simulato che legge dati da un set di file statici ed esegue il push dei dati in Hub eventi. Le origini dati in un'applicazione reale corrisponderebbero a dispositivi installati nei taxi.

Hub eventi di Azure. Hub eventi è un servizio di inserimento di eventi. Questa architettura usa due istanze di Hub eventi, una per ogni origine dati. Ogni origine dati invia un flusso di dati all'istanza associata di Hub eventi.

Azure Databricks. Databricks è una piattaforma di analisi basata su Apache Spark ottimizzata per la piattaforma dei servizi cloud di Microsoft Azure. Databricks viene usata per la correlazione dei dati su corse e tariffe dei taxi, nonché per migliorare i dati correlati con i dati sul quartiere archiviati nel file System di Databricks.

Azure Cosmos DB. L'output di un processo di Azure Databricks è una serie di record, che vengono scritti in Azure Cosmos DB per Apache Cassandra. Azure Cosmos DB per Apache Cassandra viene usato perché supporta la modellazione dei dati delle serie temporali.

Azure Log Analytics. I dati del log applicazioni raccolti da Monitoraggio di Azure vengono archiviati in un'area di lavoro Log Analytics. Le query di Log Analytics permettono di analizzare e visualizzare le metriche e ispezionare i messaggi di log allo scopo di identificare i problemi all'interno dell'applicazione.

Dettagli dello scenario

Scenario: una società di taxi raccoglie dati su ogni corsa. Per questo scenario si presuppone che siano presenti due dispositivi diversi che inviano dati. Il taxi ha un tassametro che invia informazioni su ogni corsa la durata, la distanza e le ubicazioni di salita e di discesa del cliente. Un dispositivo separato accetta i pagamenti dai clienti e invia dati sui prezzi delle corse. Per individuare le tendenze dell'utenza, la società di taxi vuole calcolare la mancia media per miglia guidate, in tempo reale, per ogni quartiere.

Inserimento dati

Per simulare un'origine dati, questa architettura di riferimento utilizza il set di dati New York City Taxi Data[1]. Questo set di dati contiene dati relativi alle corse dei taxi a New York City in un periodo di quattro anni (2010-2013). Contiene due tipi di record: i dati relativi alle corse e i dati relativi ai costi delle corse. I dati relativi alle corsa includono la durata della corsa, la distanza percorsa e le ubicazioni di salita e discesa del cliente. I dati relativi ai costi della corsa includono gli importi relativi a costo di base, imposte e mancia. I campi comuni in entrambi i tipi di record includono il numero di taxi, il numero di licenza e l'ID del fornitore. Questi tre campi identificano in modo univoco un taxi e un tassista. I dati vengono archiviati in formato CSV.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Il generatore di dati è un'applicazione .NET Core che legge i record e li invia a Hub eventi di Azure. Il generatore invia i dati relativi alle corse in formato JSON e i dati relativi ai costi in formato CSV.

Hub eventi usa partizioni per segmentare i dati. Le partizioni consentono a un consumer di leggere ogni partizione in parallelo. Quando si inviano dati a Hub eventi, è possibile specificare in modo esplicito la chiave di partizione. In caso contrario, i record vengono assegnati alle partizioni in modalità round-robin.

In questo scenario i dati relativi alle corse e i dati relativi ai costi devono avere lo stesso ID di partizione per un taxi specifico. Ciò consente a Databricks di applicare un certo livello di parallelismo durante la correlazione dei due flussi. Un record nella partizione n dei dati relativi alle corse corrisponderà a un record nella partizione n dei dati relativi ai costi.

Diagramma di elaborazione dei flussi con Azure Databricks e l'hub eventi di Azure

Scaricare un file di Visio di questa architettura.

Nel generatore di dati il modello di dati comune per entrambi i tipi di record ha una proprietà PartitionKey che corrisponde alla concatenazione di Medallion, HackLicense e VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Questa proprietà viene usata per fornire una chiave di partizione esplicita durante l'invio a Hub eventi:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Hub eventi di

La capacità di elaborazione di Hub eventi viene misurata in unità elaborate. È possibile ridimensionare automaticamente un hub eventi abilitando l'aumento automatico, che ridimensiona automaticamente le unità elaborate in base al traffico, fino a un limite massimo configurato.

Elaborazione dei flussi

In Azure Databricks, viene eseguita l'elaborazione dei dati da un processo. Il processo viene assegnato a e viene eseguito in un cluster. Il processo può essere codice personalizzato scritto in Java o un notebook Spark.

In questa architettura di riferimento, il processo è un file di archivio Java con classi scritte in Java e Scala. Quando si specifica il file di archivio Java per un processo di Databricks, la classe viene specificata per l'esecuzione da parte del cluster Databricks. In questo caso, il main metodo della classe contiene la logica di com.microsoft.pnp.TaxiCabReader elaborazione dei dati.

Lettura del flusso dalle due istanze dell'hub eventi

La logica di elaborazione dei dati usa lo streaming strutturato Spark per leggere dalle due istanze dell'hub eventi di Azure:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Arricchimento dei dati con le informazioni sul quartiere

I dati sulla corsa includono le coordinate di latitudine e longitudine dei punti di partenza e arrivo. Benché siano utili, queste coordinate non sono facilmente analizzabili. Di conseguenza, questi dati vengono arricchiti con i dati sul quartiere, letti da un file di forma.

Il formato di file di forma è binario e non facilmente analizzato, ma la libreria GeoTools fornisce strumenti per i dati geospaziali che usano il formato di file di forma. Questa libreria viene usata nella com.microsoft.pnp.GeoFinder classe com.microsoft.pnp.GeoFinder per determinare il nome del quartiere in base alle coordinate di partenza e arrivo.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Unione di dati su corse e tariffe

Prima di tutto i dati su corse e tariffe vengono trasformati:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Quindi, i dati sulla corsa vengono aggiunti ai dati sulle tariffe:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Elaborazione dati e inserimento in Azure Cosmos DB

L'importo tariffario medio per ogni quartiere viene calcolato per un determinato intervallo di tempo:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Che viene quindi inserito in Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Considerazioni

Queste considerazioni implementano i pilastri di Azure Well-Architected Framework, che è un set di principi guida che possono essere usati per migliorare la qualità di un carico di lavoro. Per altre informazioni, vedere Microsoft Azure Well-Architected Framework.

Sicurezza

La sicurezza offre garanzie contro attacchi intenzionali e l'abuso di dati e sistemi preziosi. Per altre informazioni, vedere Elenco di controllo per la revisione della progettazione per Security.

L'accesso all'area di lavoro di Azure Databricks viene controllato usando la console di amministrazione La console di amministrazione include funzionalità per aggiungere utenti, gestire le autorizzazioni utente e configurare l'accesso Single Sign-On. La console permette anche di impostare il controllo di accesso ad aree di lavoro, cluster, processi e tabelle.

Gestione dei segreti

Azure Databricks include un archivio di segreti che viene utilizzato per archiviare le credenziali e fare riferimento a esse in notebook e processi. I segreti all'interno dell'archivio segreto di Azure Databricks vengono partizionati per ambiti:

databricks secrets create-scope --scope "azure-databricks-job"

I segreti vengono aggiunti a livello ambito:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Nota

È possibile utilizzare un ambito di cui è stato eseguito il backup in Azure Key Vault invece dell'ambito nativo di Azure Databricks. Per altre informazioni, vedere Azure Key Vault-backed scopes ( Ambiti di cui è stato eseguito il backup in Azure Key Vault).

Nel codice, i segreti sono accessibili grazie alle utilità dei segreti di Azure Databricks.

Ottimizzazione dei costi

L'ottimizzazione dei costi riguarda l'analisi dei modi per ridurre le spese non necessarie e migliorare l'efficienza operativa. Per altre informazioni, vedere Panoramica del pilastro di ottimizzazione dei costi.

Usare il calcolatore dei prezzi di Azure per stimare i costi. Ecco alcune considerazioni per i servizi usati in questa architettura di riferimento.

Considerazioni sul costo di Hub eventi

Questa architettura di riferimento distribuisce Hub eventi nel livello Standard . Il modello di determinazione dei prezzi si basa su unità elaborate, eventi in ingresso ed eventi di acquisizione. Un evento in ingresso è un'unità di dati da 64 KB o di dimensioni inferiori. I messaggi di dimensioni maggiori sono fatturati in multipli di 64 KB. È possibile specificare le unità elaborate tramite le API di gestione di hub eventi o di portale di Azure.

Se sono necessari più giorni di conservazione, prendere in considerazione il livello Dedicato . Questo livello offre distribuzioni a tenant singolo con requisiti più impegnativi. Questa offerta crea un cluster basato sulle unità di capacità (CU) non vincolato da unità elaborate.

Il livello Standard viene fatturato anche in base agli eventi in ingresso e alle unità elaborate.

Per informazioni sui prezzi di Hub eventi, vedere Prezzi di Hub eventi.

Considerazioni sul costo di Azure Databricks

Azure Databricks offre due livelli Standard e Premium ognuno supporta tre carichi di lavoro. Questa architettura di riferimento distribuisce l'area di lavoro di Azure Databricks nel livello Premium .

I carichi di lavoro di progettazione dei dati devono essere eseguiti in un cluster di processi e sono destinati ai data engineer per compilare ed eseguire processi. I carichi di lavoro di analisi dei dati devono essere eseguiti in un cluster all-purpose ed è destinato ai data scientist per esplorare, visualizzare, modificare e condividere dati e informazioni dettagliate in modo interattivo.

Azure Databricks offre molti modelli di prezzi.

  • Piano con pagamento in base al consumo

    Vengono fatturate le macchine virtuali (VM) di cui è stato effettuato il provisioning nei cluster e nelle unità Databricks in base all'istanza di macchina virtuale selezionata. Un'unità Databricks è un'unità di capacità del processo, fatturata in base all'utilizzo al secondo. Il consumo di unità Databricks dipende dalle dimensioni e dal tipo di istanza che esegue Azure Databricks. I prezzi dipendono dal carico di lavoro e dal livello selezionati.

  • Piano di pre-acquisto

    Si esegue il commit in unità DBU (Azure Databricks Unit) come unità di commit di Databricks (DBCU) per uno o tre anni per ridurre il costo totale di proprietà in tale periodo di tempo rispetto al modello con pagamento in base al consumo.

Per altre informazioni, vedere Prezzi di Azure Databricks.

Considerazioni sul costo di Azure Cosmos DB

In questa architettura, una serie di record viene scritta in Azure Cosmos DB dal processo di Azure Databricks. Viene addebitata la capacità che si riserva, espressa in Unità richiesta al secondo (UR/sec), usata per eseguire operazioni di inserimento. L'unità per la fatturazione è di 100 UR/sec all'ora. Ad esempio, il costo di scrittura di elementi da 100 KB è di 50 UR/sec.

Per le operazioni di scrittura, predisporre una capacità sufficiente a supportare il numero di scritture necessarie al secondo. È possibile aumentare la velocità effettiva fornita tramite il portale o l'interfaccia della riga di comando di Azure prima di eseguire operazioni di scrittura e quindi ridurre la velocità effettiva al termine di tali operazioni. La produttività per il periodo di scrittura è la produttività minima necessaria per i dati specificati, più la produttività richiesta per l'operazione di inserimento, supponendo che non sia in esecuzione nessun altro carico di lavoro.

Analisi dei costi di esempio

Si supponga di configurare un valore di velocità effettiva di 1.000 UR/sec in un contenitore. Viene distribuito per 24 ore per 30 giorni, un totale di 720 ore.

Il contenitore viene fatturato a 10 unità di 100 UR/sec all'ora per ogni ora. 10 unità a $ 0,008 (per 100 UR/sec all'ora) vengono addebitati $ 0,08 all'ora.

Per 720 ore o 7.200 unità (di 100 UR), vengono fatturati $ 57,60 per il mese.

Viene fatturato anche lo spazio di archiviazione per ogni GB utilizzato per i dati e l'indice archiviati. Per altre informazioni, vedere Modello di prezzi di Azure Cosmos DB.

Usare il calcolatore della capacità di Azure Cosmos DB per ottenere una stima rapida del costo del carico di lavoro.

Eccellenza operativa

L'eccellenza operativa copre i processi operativi che distribuiscono un'applicazione e lo mantengono in esecuzione nell'ambiente di produzione. Per altre informazioni, vedere Elenco di controllo per la revisione della progettazione per l'eccellenza operativa.

Monitoraggio

Azure Databricks si basa su Apache Spark, e entrambi usano log4j come libreria standard per la registrazione. Oltre alla registrazione predefinita fornita da Apache Spark, è possibile implementare la registrazione in Azure Log Analytics seguendo l'articolo Monitoraggio di Azure Databricks.

Dal momento che com.microsoft.pnp.TaxiCabReader la classe com.microsoft.pnp.TaxiCabReader elabora i messaggi relativi a corse e tariffe, è possibile che il formato di uno dei due non sia valido. In un ambiente di produzione, è importante analizzare questi messaggi in formato non valido per identificare un problema con le origini dati in modo da risolverlo rapidamente per evitare la perdita di dati. La com.microsoft.pnp.TaxiCabReader classe com.microsoft.pnp.TaxiCabReader registra un accumulatore Apache Spark che tiene traccia del numero di record su tariffe e corse in formato non valido:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark usa la libreria Dropwizard per inviare metriche e alcuni dei campi metrici nativi di Dropwizard non sono compatibili con Azure Log Analytics. Di conseguenza, questa architettura di riferimento include un sink e un reporter di Dropwizard personalizzati. Formatta le metriche nel formato previsto da Azure Log Analytics. Quando Apache Spark riporta le metriche, vengono inviate anche le metriche personalizzate per i dati di corse e tariffe in formato non valido.

Di seguito sono riportate query di esempio che è possibile usare nell'area di lavoro Log Analytics di Azure per monitorare l'esecuzione del processo di streaming. L'argomento ago(1d) in ogni query restituirà tutti i record generati nell'ultimo giorno e può essere modificato per visualizzare un periodo di tempo diverso.

Eccezioni registrate durante l'esecuzione di query di flusso

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Accumulo di dati su tariffe e corse in formato non valido

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Esecuzione del lavoro nel tempo

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Per ulteriori informazioni, vedere Monitoraggio di Azure Databricks.

Organizzazione delle risorse e distribuzioni

  • Creare gruppi di risorse separati per gli ambienti di produzione, sviluppo e test. L'uso di gruppi di risorse separati semplifica la gestione delle distribuzioni, l'eliminazione delle distribuzioni di test e l'assegnazione dei diritti di accesso.

  • Usare il modello di Azure Resource Manager per distribuire le risorse di Azure seguendo il processo IaC (Infrastructure as Code). Con i modelli, l'automazione delle distribuzioni con Azure DevOps Services o altre soluzioni CI/CD è più semplice.

  • Inserire ogni carico di lavoro in un modello di distribuzione separato e archiviare le risorse nei sistemi di controllo del codice sorgente. È possibile distribuire i modelli insieme o separatamente come parte di un processo CI/CD, rendendo il processo di automazione più semplice.

    In questa architettura, Hub eventi di Azure, Log Analytics e Azure Cosmos DB vengono identificati come un singolo carico di lavoro. Queste risorse sono incluse in un singolo modello di Resource Manager.

  • Prendere in considerazione la gestione temporanea dei carichi di lavoro. Eseguire la distribuzione in varie fasi ed eseguire controlli di convalida in ogni fase prima di passare alla fase successiva. In questo modo è possibile eseguire il push degli aggiornamenti negli ambienti di produzione in modo altamente controllato e ridurre al minimo i problemi di distribuzione imprevisti.

    In questa architettura sono presenti più fasi di distribuzione. Prendere in considerazione la creazione di una pipeline di Azure DevOps e l'aggiunta di tali fasi. Ecco alcuni esempi di fasi che è possibile automatizzare:

    • Avviare un cluster di Databricks
    • Configurare l'interfaccia della riga di comando di Databricks
    • Installa gli strumenti Scala
    • Aggiungere i segreti di Databricks

    Valutare anche la possibilità di scrivere test di integrazione automatizzati per migliorare la qualità e l'affidabilità del codice databricks e del ciclo di vita.

Distribuire lo scenario

Per distribuire ed eseguire l'implementazione di riferimento, seguire la procedura illustrata nel file README in GitHub.

Passaggio successivo