Redigera

Dela via


Strömbearbetning med Azure Databricks

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

Den här referensarkitekturen visar en dataströmbearbetningspipeline från slutpunkt till slutpunkt. Den här typen av pipeline har fyra steg: inmatning, process, lagring och analys och rapportering. För den här referensarkitekturen matar pipelinen in data från två källor, utför en koppling på relaterade poster från varje dataström, berikar resultatet och beräknar ett genomsnitt i realtid. Resultaten lagras för ytterligare analys.

GitHub-logotyp En referensimplementering för den här arkitekturen finns på GitHub.

Arkitektur

Diagram som visar en referensarkitektur för dataströmbearbetning med Azure Databricks.

Ladda ned en Visio-fil med den här arkitekturen.

Arbetsflöde

Arkitekturen består av följande komponenter:

Datakällor. I den här arkitekturen finns det två datakällor som genererar dataströmmar i realtid. Den första strömmen innehåller färdinformation och den andra innehåller information om priser. Referensarkitekturen innehåller en simulerad datagenerator som läser från en uppsättning statiska filer och skickar data till Event Hubs. Datakällorna i ett verkligt program skulle vara enheter installerade i taxibilarna.

Azure Event Hubs. Event Hubs är en händelseinmatningstjänst . Den här arkitekturen använder två händelsehubbinstanser, en för varje datakälla. Varje datakälla skickar en dataström till den associerade händelsehubben.

Azure Databricks. Databricks är en Apache Spark-baserad analysplattform som är optimerad för Microsoft Azure-molntjänstplattformen. Databricks används för att korrelera taxiresa och biljettdata, och även för att berika korrelerade data med grannskapsdata som lagras i Databricks-filsystemet.

Azure Cosmos DB. Utdata från ett Azure Databricks-jobb är en serie poster som skrivs till Azure Cosmos DB för Apache Cassandra. Azure Cosmos DB för Apache Cassandra används eftersom det stöder datamodellering för tidsserier.

Azure Log Analytics. Programloggdata som samlas in av Azure Monitor lagras på en Log Analytics-arbetsyta. Log Analytics-frågor kan användas för att analysera och visualisera mått och inspektera loggmeddelanden för att identifiera problem i programmet.

Information om scenario

Scenario: Ett taxiföretag samlar in data om varje taxiresa. I det här scenariot antar vi att det finns två separata enheter som skickar data. Taxin har en mätare som skickar information om varje resa – varaktighet, avstånd och upphämtnings- och avlämningsplatser. En separat enhet accepterar betalningar från kunder och skickar data om priser. För att upptäcka ridership-trender vill taxiföretaget beräkna den genomsnittliga dricksen per körd mil, i realtid, för varje grannskap.

Datainsamling

För att simulera en datakälla använder den här referensarkitekturen datauppsättningen New York City Taxi Data[1]. Den här datamängden innehåller data om taxiresor i New York City under en fyraårsperiod (2010–2013). Den innehåller två typer av poster: Ride-data och prisdata. Ride-data inkluderar resans varaktighet, reseavstånd och upphämtnings- och avlämningsplats. Prisdata inkluderar belopp för biljettpriser, skatter och tips. Vanliga fält i båda posttyperna är medaljongnummer, hacklicens och leverantörs-ID. Tillsammans identifierar dessa tre fält unikt en taxi plus en förare. Data lagras i CSV-format.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois vid Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Datageneratorn är ett .NET Core-program som läser posterna och skickar dem till Azure Event Hubs. Generatorn skickar kördata i JSON-format och prisdata i CSV-format.

Event Hubs använder partitioner för att segmentera data. Med partitioner kan en konsument läsa varje partition parallellt. När du skickar data till Event Hubs kan du uttryckligen ange partitionsnyckeln. I annat fall tilldelas poster till partitioner i resursallokering.

I det här scenariot bör kördata och prisdata få samma partitions-ID för en viss taxi. Detta gör det möjligt för Databricks att tillämpa en viss grad av parallellitet när det korrelerar de två strömmarna. En post i partition n av kördata matchar en post i partition n av prisdata.

Diagram över dataströmbearbetning med Azure Databricks och Event Hubs.

Ladda ned en Visio-fil med den här arkitekturen.

I datageneratorn har den gemensamma datamodellen för båda posttyperna en PartitionKey egenskap som är sammanlänkningen av Medallion, HackLicenseoch VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Den här egenskapen används för att tillhandahålla en explicit partitionsnyckel när du skickar till Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Dataflödeskapaciteten för Event Hubs mäts i dataflödesenheter. Du kan autoskala en händelsehubb genom att aktivera automatisk blåsning, vilket automatiskt skalar dataflödesenheterna baserat på trafik, upp till ett konfigurerat maxvärde.

Dataströmbearbetning

I Azure Databricks utförs databearbetning av ett jobb. Jobbet tilldelas till och körs i ett kluster. Jobbet kan antingen vara anpassad kod skriven i Java eller en Spark-notebook-fil.

I den här referensarkitekturen är jobbet ett Java-arkiv med klasser skrivna i både Java och Scala. När du anger Java-arkivet för ett Databricks-jobb anges klassen för körning av Databricks-klustret. main Här innehåller -metoden för com.microsoft.pnp.TaxiCabReader klassen databearbetningslogik.

Läsa strömmen från de två händelsehubbinstanserna

Databearbetningslogik använder Spark-strukturerad strömning för att läsa från de två Azure-händelsehubbens instanser:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Utöka data med grannskapsinformationen

Kördata innehåller koordinaterna för latitud och longitud för upphämtnings- och avlämningsplatserna. Även om dessa koordinater är användbara är de inte lätta att använda för analys. Därför berikas dessa data med grannskapsdata som läss från en formfil.

Formfilformatet är binärt och inte enkelt parsat, men GeoTools-biblioteket innehåller verktyg för geospatiala data som använder formfilformatet. Det här biblioteket används i com.microsoft.pnp.GeoFinder klassen för att fastställa grannskapsnamnet baserat på koordinaterna för hämtning och avlämning.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Gå med i ride- och fare-data

Först transformeras ride- och fare-data:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Och sedan kopplas ride-data till prisdata:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Bearbeta data och infoga i Azure Cosmos DB

Det genomsnittliga prisbeloppet för varje grannskap beräknas för ett visst tidsintervall:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Som sedan infogas i Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Att tänka på

Dessa överväganden implementerar grundpelarna i Azure Well-Architected Framework, som är en uppsättning vägledande grundsatser som kan användas för att förbättra kvaliteten på en arbetsbelastning. Mer information finns i Microsoft Azure Well-Architected Framework.

Säkerhet

Säkerhet ger garantier mot avsiktliga attacker och missbruk av dina värdefulla data och system. Mer information finns i checklistan för Designgranskning för Security.

Åtkomst till Azure Databricks-arbetsytan styrs med hjälp av -administratörskonsolen Administratörskonsolen innehåller funktioner för att lägga till användare, hantera användarbehörigheter och konfigurera enkel inloggning. Åtkomstkontroll för arbetsytor, kluster, jobb och tabeller kan också anges via administratörskonsolen.

Hantera hemligheter

Azure Databricks innehåller ett hemligt arkiv som används för att lagra autentiseringsuppgifter och referera till dem i notebook-filer och jobb. Hemligheter i Azure Databricks-hemlighetslagret partitioneras av omfång:

databricks secrets create-scope --scope "azure-databricks-job"

Hemligheter läggs till på omfångsnivå:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Kommentar

Ett Azure Key Vault-backat omfång bör användas i stället för det interna Azure Databricks-omfånget. Mer information finns i Azure Key Vault-stödda omfång.

I kod används hemligheter via Azure Databricks-funktionerna för hemligheter.

Kostnadsoptimering

Kostnadsoptimering handlar om att titta på sätt att minska onödiga utgifter och förbättra drifteffektiviteten. Mer information finns i Översikt över kostnadsoptimeringspelare.

Normalt beräknar du kostnader med hjälp av priskalkylatorn för Azure. Här följer några överväganden för tjänster som används i den här referensarkitekturen.

Kostnadsöverväganden för Event Hubs

Den här referensarkitekturen distribuerar Event Hubs på standardnivån . Prismodellen baseras på dataflödesenheter, ingresshändelser och avbildningshändelser. En ingångshändelse är en enhet med data som är 64 kB eller mindre. Större meddelanden faktureras i multipler av 64 kB. Du anger dataflödesenheter antingen via Azure Portal- eller Event Hubs-hanterings-API:er.

Om du behöver fler kvarhållningsdagar bör du överväga den dedikerade nivån. Den här nivån erbjuder distributioner med en enda klientorganisation med de mest krävande kraven. Det här erbjudandet bygger ett kluster baserat på kapacitetsenheter (CU) som inte är bundna av dataflödesenheter.

Standardnivån faktureras också baserat på ingresshändelser och dataflödesenheter.

Information om priser för Event Hubs finns i Event Hubs-prissättningen.

Kostnadsöverväganden för Azure Databricks

Azure Databricks erbjuder två nivåer Standard och Premium har vardera stöd för tre arbetsbelastningar. Den här referensarkitekturen distribuerar Azure Databricks-arbetsytan på Premium-nivån .

Datateknikarbetsbelastningar ska köras på ett jobbkluster och är till för att datatekniker ska kunna skapa och köra jobb. Arbetsbelastningar för dataanalys ska köras i ett kluster för alla syften och är avsedda för dataexperter att utforska, visualisera, manipulera och dela data och insikter interaktivt.

Azure Databricks erbjuder många prismodeller.

  • Betala per användning-abonnemang

    Du debiteras för virtuella datorer (VM) som etablerats i kluster och Databricks-enheter (DBUs) baserat på den valda VM-instansen. En DBU är en bearbetningsenhet som debiteras per sekund. DBU-förbrukningen beror på storleken och typen av instans som kör Azure Databricks. Prissättningen beror på den valda arbetsbelastningen och nivån.

  • Förköpsplan

    Du förbinder dig till Azure Databricks Units (DBU) som Databricks Commit Units (DBCU) i antingen ett eller tre år för att minska den totala ägandekostnaden under den tidsperioden jämfört med modellen betala per användning.

Mer information finns i Prissättning för Azure Databricks.

Kostnadsöverväganden för Azure Cosmos DB

I den här arkitekturen skrivs en serie poster till Azure Cosmos DB av Azure Databricks-jobbet. Du debiteras för den kapacitet som du reserverar, uttryckt i Enheter för begäran per sekund (RU/s), som används för att utföra infogningsåtgärder. Faktureringsenheten är 100 RU/sek per timme. Till exempel är kostnaden för att skriva 100 KB-objekt 50 RU/s.

För skrivåtgärder etablerar du tillräckligt med kapacitet för att stödja antalet skrivningar som behövs per sekund. Du kan öka det etablerade dataflödet med hjälp av portalen eller Azure CLI innan du utför skrivåtgärder och sedan minska dataflödet när dessa åtgärder har slutförts. Ditt dataflöde för skrivperioden är det minsta dataflöde som krävs för de angivna data plus det dataflöde som krävs för infogningsåtgärden förutsatt att ingen annan arbetsbelastning körs.

Exempel på kostnadsanalys

Anta att du konfigurerar ett dataflödesvärde på 1 000 RU/s i en container. Den distribueras i 24 timmar i 30 dagar, totalt 720 timmar.

Containern faktureras med 10 enheter på 100 RU/sek per timme för varje timme. 10 enheter till 0,008 USD (per 100 RU/sek per timme) debiteras 0,08 USD per timme.

För 720 timmar eller 7 200 enheter (av 100 RU:er) debiteras du 57,60 USD för månaden.

Lagring faktureras också för varje GB som används för dina lagrade data och index. Mer information finns i Prismodellen för Azure Cosmos DB.

Använd Kapacitetskalkylatorn för Azure Cosmos DB för att få en snabb uppskattning av arbetsbelastningskostnaden.

Operational Excellence

Operational Excellence omfattar de driftsprocesser som distribuerar ett program och håller det igång i produktion. Mer information finns i checklistan för Designgranskning för Operational Excellence.

Övervakning

Azure Databricks baseras på Apache Spark och båda använder log4j som standardbibliotek för loggning. Förutom standardloggningen som tillhandahålls av Apache Spark kan du implementera loggning till Azure Log Analytics genom att följa artikeln Övervakning av Azure Databricks.

com.microsoft.pnp.TaxiCabReader Eftersom klassen bearbetar ride- och fare-meddelanden är det möjligt att någon av dem kan vara felaktigt formaterad och därför inte giltig. I en produktionsmiljö är det viktigt att analysera dessa felaktiga meddelanden för att identifiera ett problem med datakällorna så att det snabbt kan åtgärdas för att förhindra dataförlust. Klassen com.microsoft.pnp.TaxiCabReader registrerar en Apache Spark Accumulator som håller reda på antalet felaktiga biljett- och åkrekord:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark använder Dropwizard-biblioteket för att skicka mått, och några av de interna Dropwizard-måttfälten är inte kompatibla med Azure Log Analytics. Därför innehåller den här referensarkitekturen en anpassad Dropwizard-mottagare och reporter. Den formaterar måtten i det format som förväntas av Azure Log Analytics. När Apache Spark rapporterar mått skickas också anpassade mått för den felaktiga resan och prisdata.

Följande är exempelfrågor som du kan använda på din Azure Log Analytics-arbetsyta för att övervaka körningen av strömningsjobbet. Argumentet ago(1d) i varje fråga returnerar alla poster som genererades under den senaste dagen och kan justeras för att visa en annan tidsperiod.

Undantag som loggas under körning av stream-frågor

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Ackumulering av felaktiga biljett- och kördata

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Jobbkörning över tid

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Mer information finns i Övervaka Azure Databricks.

Resursorganisation och distributioner

  • Skapa separata resursgrupper för produktions-, utvecklings- och testmiljöer. Med separata resursgrupper blir det enklare att hantera distributioner, ta bort testdistributioner och tilldela åtkomsträttigheter.

  • Använd Azure Resource Manager-mallen för att distribuera Azure-resurserna efter IaC-processen (infrastruktur som kod). Med mallar är det enklare att automatisera distributioner med Hjälp av Azure DevOps Services eller andra CI/CD-lösningar.

  • Placera varje arbetsbelastning i en separat distributionsmall och lagra resurserna i källkontrollsystemen. Du kan distribuera mallarna tillsammans eller individuellt som en del av en CI/CD-process, vilket gör automatiseringsprocessen enklare.

    I den här arkitekturen identifieras Azure Event Hubs, Log Analytics och Azure Cosmos DB som en enda arbetsbelastning. Dessa resurser ingår i en enda ARM-mall.

  • Överväg att mellanlagring av dina arbetsbelastningar. Distribuera till olika steg och kör valideringskontroller i varje steg innan du går vidare till nästa steg. På så sätt kan du push-överföra uppdateringar till dina produktionsmiljöer på ett mycket kontrollerat sätt och minimera oväntade distributionsproblem.

    I den här arkitekturen finns det flera distributionssteg. Överväg att skapa en Azure DevOps-pipeline och lägga till dessa steg. Här är några exempel på steg som du kan automatisera:

    • Starta ett Databricks-kluster
    • Konfigurera Databricks CLI
    • Installera Scala Tools
    • Lägg till Databricks-hemligheterna

    Överväg också att skriva automatiserade integreringstester för att förbättra kvaliteten och tillförlitligheten för Databricks-koden och dess livscykel.

Distribuera det här scenariot

Följ stegen i GitHub-readme för att distribuera och köra referensimplementeringen.

Gå vidare