Upravit

Sdílet prostřednictvím


Zpracování streamů s využitím Azure Databricks

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

Tato referenční architektura ukazuje kompletní kanál zpracování datových proudů. Čtyři fáze tohoto kanálu jsou ingestování, zpracování, ukládání a analýza a sestava. V této referenční architektuře kanál ingestuje data ze dvou zdrojů, provádí spojení souvisejících záznamů z každého datového proudu, rozšiřuje výsledek a vypočítá průměr v reálném čase. Výsledky se pak uloží pro další analýzu.

Logo GitHubu Referenční implementace pro tuto architekturu je k dispozici na GitHubu.

Architektura

Diagram znázorňující referenční architekturu pro zpracování datových proudů pomocí Azure Databricks

Stáhněte si soubor Visio této architektury.

Workflow

Následující tok dat odpovídá předchozímu diagramu:

  1. V této architektuře existují dva zdroje dat, které generují datové proudy v reálném čase. První datový proud obsahuje informace o jízdě a druhý proud obsahuje informace o jízdě. Referenční architektura zahrnuje simulovaný generátor dat, který čte ze sady statických souborů a odesílá data do služby Azure Event Hubs. Zdroje dat v reálné aplikaci jsou zařízení nainstalovaná v taxislužbách.

  2. služba Event Hubs je služba pro příjem událostí. Tato architektura používá dvě instance centra událostí, jednu pro každý zdroj dat. Každý zdroj dat odešle datový proud do přidruženého centra událostí.

  3. Azure Databricks je analytická platforma založená na Apache Sparku, která je optimalizovaná pro platformu cloudových služeb Microsoft Azure. Azure Databricks se používá ke korelaci dat o jízdě taxíkem a jízdě a jízdě a obohacení korelovaných dat o data sousedství, která jsou uložená v systému souborů Azure Databricks.

  4. azure Cosmos DB je plně spravovaná databázová služba s více modely. Výstup úlohy Azure Databricks je řada záznamů, které se zapisují do služby Azure Cosmos DB pro Apache Cassandra. Azure Cosmos DB pro Apache Cassandra se používá, protože podporuje modelování dat časových řad.

  5. Log Analytics je nástroj ve službě Azure Monitor, který umožňuje dotazovat a analyzovat data protokolů z různých zdrojů. Data protokolu aplikací, která azure Monitor shromažďuje, jsou uložená v pracovním prostoru služby Log Analytics. Pomocí dotazů Log Analytics můžete analyzovat a vizualizovat metriky a kontrolovat zprávy protokolu a identifikovat problémy v aplikaci.

Podrobnosti scénáře

Taxislužba shromažďuje data o každé jízdě taxíkem. V tomto scénáři předpokládáme, že dvě samostatná zařízení odesílají data. Taxi má měřič, který odesílá informace o každé jízdě, včetně doby trvání, vzdálenosti a vyzvednutí a odkládacích míst. Samostatné zařízení přijímá platby od zákazníků a odesílá data o jízdné. Pokud chce taxislužba zjistit trendy ridershipu, chce vypočítat průměrný tip na míle řízený pro každou čtvrť v reálném čase.

Příjem dat

K simulaci zdroje dat tato referenční architektura používá datovou sadu dat taxislužby New York City1. Tato datová sada obsahuje data o jízdách taxíkem v New Yorku od roku 2010 do roku 2013. Obsahuje záznamy dat o jízdě i jízdě. Data o jízdě zahrnují dobu jízdy, vzdálenost jízdy a vyzvednutí a odkládací místa. Údaje o jízdné zahrnují ceny jízdné, daně a tipové částky. Pole v obou typech záznamů zahrnují číslo medailiónu, licenci hacku a ID dodavatele. Kombinace těchto tří polí jednoznačně identifikuje taxi a řidiče. Data se ukládají ve formátu CSV.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois ve společnosti Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Generátor dat je aplikace .NET Core, která čte záznamy a odesílá je do služby Event Hubs. Generátor odesílá data jízdy ve formátu JSON a data jízdného ve formátu CSV.

Služba Event Hubs používá k segmentování dat oddíly . Oddíly umožňují příjemci číst každý oddíl paralelně. Když odesíláte data do služby Event Hubs, můžete klíč oddílu zadat přímo. V opačném případě se záznamy přiřazují k oddílům způsobem kruhového dotazování.

V tomto scénáři by data jízdy a jízdné měly být přiřazeny stejnému ID oddílu pro konkrétní taxislužba. Toto přiřazení umožňuje Databricks použít stupeň paralelismu, když koreluje tyto dva datové proudy. Například záznam v oddílu n dat jízdy odpovídá záznamu v oddílu n dat jízdy.

Diagram zpracování datových proudů pomocí Azure Databricks a Event Hubs

Stáhněte si soubor Visia této architektury.

V generátoru dat má společný datový model pro oba typy PartitionKey záznamů vlastnost, která je zřetězením Medallion, HackLicensea VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Tato vlastnost slouží k poskytnutí explicitního klíče oddílu při odesílání dat do služby Event Hubs.

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Kapacita propustnosti služby Event Hubs se měří v jednotkách propustnosti. Centrum událostí můžete automaticky škálovat povolením automatického nafouknutí . Tato funkce automaticky škáluje jednotky propustnosti na základě provozu až na nakonfigurované maximum.

Zpracování datových proudů

V Azure Databricks provádí úloha zpracování dat. Úloha se přiřadí ke clusteru a pak na něm běží. Úloha může být vlastní kód napsaný v Javě nebo poznámkový blok Spark .

V této referenční architektuře je úloha archiv Jazyka Java, který obsahuje třídy napsané v Javě a Scala. Když zadáte archiv Javy pro úlohu Databricks, cluster Databricks určuje třídu pro operaci. main Zde metoda com.microsoft.pnp.TaxiCabReader třídy obsahuje logiku zpracování dat.

Čtení datového proudu ze dvou instancí centra událostí

Logika zpracování dat používá strukturované streamování Sparku ke čtení ze dvou instancí centra událostí Azure:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Obohacení dat informacemi o sousedství

Data o jízdě zahrnují souřadnice zeměpisné šířky a délky vyzvednutí a odkládacích míst. Tyto souřadnice jsou užitečné, ale nejsou snadno využité k analýze. Proto jsou tato data rozšířena o data sousedství, která se čtou ze souboru obrazce .

Formát souboru shapefile je binární a není snadno parsován. Knihovna GeoTools ale poskytuje nástroje pro geoprostorová data, která používají formát souboru shapefile. Tato knihovna se používá v com.microsoft.pnp.GeoFinder třídě k určení názvu sousedství na základě souřadnic pro vyzvednutí a odkládací umístění.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Připojte se k jízdě a datům o jízdě a jízdě

Nejprve se transformují data jízdy a jízdy:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Data jízdy se pak spojí s daty jízdného:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Zpracování dat a jejich vložení do služby Azure Cosmos DB

Průměrná částka jízdného pro každou čtvrť se vypočítá pro konkrétní časový interval:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Průměrná částka jízdy se pak vloží do služby Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Důležité informace

Tyto aspekty implementují pilíře dobře architektuře Azure, což je sada hlavních principů, které můžete použít ke zlepšení kvality úlohy. Další informace naleznete v tématu Microsoft Azure Well-Architected Framework.

Zabezpečení

Zabezpečení poskytuje záruky proti záměrným útokům a zneužití cenných dat a systémů. Další informace naleznete v tématu Kontrolní seznam pro kontrolu návrhu prozabezpečení .

Přístup k pracovnímu prostoru Azure Databricks se řídí pomocí konzoly správce . Konzola správce obsahuje funkce pro přidání uživatelů, správu uživatelských oprávnění a nastavení jednotného přihlašování. Řízení přístupu pro pracovní prostory, clustery, úlohy a tabulky je také možné nastavit prostřednictvím konzoly správce.

Správa tajných kódů

Azure Databricks obsahuje úložiště tajných kódů, které slouží k ukládání přihlašovacích údajů a odkazování na ně v poznámkových blocích a úlohách. Rozsahy tajných kódů oddílů v úložišti tajných kódů Azure Databricks:

databricks secrets create-scope --scope "azure-databricks-job"

Tajné kódy se přidají na úrovni oboru:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Poznámka:

Místo nativního oboru Azure Databricks použijte rozsah založený na službě Azure Key Vault.

V kódu jsou tajné kódy přístupné prostřednictvím nástrojů pro tajné kódy Azure Databricks.

Optimalizace nákladů

Optimalizace nákladů se zaměřuje na způsoby, jak snížit zbytečné výdaje a zlepšit efektivitu provozu. Další informace naleznete v tématu Kontrolní seznam pro kontrolu návrhu proOptimalizace nákladů .

K odhadu nákladů použijte cenovou kalkulačku Azure. Vezměte v úvahu následující služby používané v této referenční architektuře.

Důležité informace o nákladech služby Event Hubs

Tato referenční architektura nasadí službu Event Hubs na úrovni Standard. Cenový model je založený na jednotkách propustnosti, událostech příchozího přenosu dat a zachytávání událostí. Událost příchozího přenosu dat je jednotka dat, která jsou 64 kB nebo menší. Větší zprávy se účtují v násobcích po 64 kB. Jednotky propustnosti zadáte prostřednictvím webu Azure Portal nebo rozhraní API pro správu služby Event Hubs.

Pokud potřebujete více dnů uchovávání, zvažte úroveň Dedicated. Tato úroveň poskytuje nasazení s jedním tenantem, která mají přísné požadavky. Tato nabídka vytvoří cluster založený na jednotkách kapacity a není závislý na jednotkách propustnosti. Úroveň Standard se také účtuje na základě událostí příchozího přenosu dat a jednotek propustnosti.

Další informace najdete v tématu ceny služby Event Hubs.

Důležité informace o nákladech na Azure Databricks

Azure Databricks poskytuje úroveň Standard a úroveň Premium, z nichž obě podporují tři úlohy. Tato referenční architektura nasadí pracovní prostor Azure Databricks na úrovni Premium.

Úlohy přípravy dat by se měly spouštět v clusteru úloh. Datoví inženýři používají clustery k vytváření a provádění úloh. Úlohy analýzy dat by se měly spouštět v clusteru pro všechny účely a jsou určené pro datové vědce, aby mohli interaktivně zkoumat, vizualizovat, manipulovat s nimi a sdílet data a přehledy.

Azure Databricks poskytuje více cenových modelů.

  • plán průběžných plateb

    Účtují se vám virtuální počítače zřízené v clusterech a jednotkách Azure Databricks (DBU) na základě zvolené instance virtuálního počítače. DBU je jednotka schopností zpracování, která se účtuje podle využití za sekundu. Spotřeba DBU závisí na velikosti a typu instance, která běží v Azure Databricks. Ceny závisí na zvolené úloze a úrovni.

  • předkupní plán

    Do jednotek potvrzení Azure Databricks se zavazujete po dobu jednoho nebo tří let, abyste snížili celkové náklady na vlastnictví v daném časovém období v porovnání s modelem průběžných plateb.

Další informace najdete v tématu cenovýchAzure Databricks .

Důležité informace o nákladech služby Azure Cosmos DB

V této architektuře úloha Azure Databricks zapisuje do služby Azure Cosmos DB řadu záznamů. Účtuje se vám kapacita, kterou si rezervujete, která se měří v jednotkách žádostí za sekundu (RU/s). Tato kapacita se používá k provádění operací vložení. Jednotka pro fakturaci je 100 RU/s za hodinu. Například náklady na zápis 100 kB položek jsou 50 RU/s.

V případě operací zápisu zřiďte dostatečnou kapacitu pro podporu počtu zápisů potřebných za sekundu. Zřízenou propustnost můžete zvýšit pomocí portálu nebo Azure CLI před provedením operací zápisu a následným snížením propustnosti po dokončení těchto operací. Propustnost pro období zápisu je součet minimální propustnosti potřebné pro konkrétní data a propustnost potřebnou pro operaci vložení. Tento výpočet předpokládá, že není spuštěná žádná jiná úloha.

Příklad analýzy nákladů

Předpokládejme, že pro kontejner nakonfigurujete hodnotu propustnosti 1 000 RU/s. Je nasazená po dobu 24 hodin po dobu 30 dnů, celkem 720 hodin.

Kontejner se účtuje za 10 jednotek 100 RU/s za hodinu za každou hodinu. Deset jednotek v hodnotě 0,008 USD (za 100 RU/s za hodinu) se účtují v hodnotě 0,08 USD za hodinu.

Za 720 hodin nebo 7 200 jednotek (100 RU) se vám účtuje 57,60 USD za měsíc.

Úložiště se také účtuje za každou GB, která se používá pro uložená data a index. Další informace najdete v cenovém modelu služby Azure Cosmos DB.

K rychlému odhadu nákladů na úlohy použijte kalkulačku kapacity služby Azure Cosmos DB.

Efektivita provozu

Efektivita provozu se zabývá provozními procesy, které nasazují aplikaci a udržují ji spuštěnou v produkčním prostředí. Další informace naleznete v tématu kontrolní seznam pro kontrolu efektivity provozu.

Sledování

Azure Databricks je založen na Apache Sparku. Azure Databricks i Apache Spark používají Apache Log4j jako standardní knihovnu pro protokolování. Kromě výchozího protokolování, které Poskytuje Apache Spark, můžete implementovat protokolování v Log Analytics. Další informace najdete v tématu Monitorování Azure Databricks.

Protože třída com.microsoft.pnp.TaxiCabReader zpracovává zprávy jízdy a jízdy, může být zpráva poškozena, a proto není platná. V produkčním prostředí je důležité analyzovat tyto poškozené zprávy a identifikovat problém se zdroji dat, aby bylo možné je rychle opravit, aby se zabránilo ztrátě dat. Třída com.microsoft.pnp.TaxiCabReader zaregistruje akumulátor Apache Sparku, který sleduje počet poškozených záznamů jízdy a záznamů jízdy:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark používá k odesílání metrik knihovnu Dropwizard. Některá nativní pole metrik Dropwizard nejsou kompatibilní s Log Analytics, což je důvod, proč tato referenční architektura zahrnuje vlastní jímku Dropwizard a reporter. Formátuje metriky ve formátu, který Log Analytics očekává. Když Apache Spark hlásí metriky, posílají se také vlastní metriky pro špatně zformulovaná data jízdy a jízdného.

Pomocí následujících ukázkových dotazů v pracovním prostoru služby Log Analytics můžete monitorovat provoz úlohy streamování. Argument ago(1d) v každém dotazu vrátí všechny záznamy, které byly vygenerovány za poslední den. Tento parametr můžete upravit, aby se zobrazilo jiné časové období.

Výjimky zaprotokolované během operace dotazu streamu

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Akumulace poškozených jízdných dat a dat o jízdě

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Operace úlohy v průběhu času

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Organizace prostředků a nasazení

  • Vytvořte samostatné skupiny prostředků pro produkční, vývojové a testovací prostředí. Samostatné skupiny prostředků usnadňují správu nasazení, odstraňování testovacích nasazení a přiřazování přístupových práv.

  • Pomocí šablony Azure Resource Manageru nasaďte prostředky Azure podle procesu infrastruktury jako kódu. Pomocí šablon můžete snadno automatizovat nasazení pomocí služeb Azure DevOps nebo jiných řešení kontinuální integrace a průběžného doručování (CI/CD).

  • Každou úlohu umístěte do samostatné šablony nasazení a uložte prostředky do systémů správy zdrojového kódu. Šablony můžete nasadit společně nebo jednotlivě jako součást procesu CI/CD. Tento přístup zjednodušuje proces automatizace.

    V této architektuře jsou služba Event Hubs, Log Analytics a Azure Cosmos DB identifikovány jako jedna úloha. Tyto prostředky jsou součástí jedné šablony Azure Resource Manageru.

  • Zvažte přípravu úloh. Nasaďte je do různých fází a před přechodem do další fáze spusťte kontroly ověřování v každé fázi. Díky tomu můžete řídit způsob nabízení aktualizací do produkčních prostředí a minimalizovat neočekávané problémy s nasazením.

    V této architektuře existuje několik fází nasazení. Zvažte vytvoření kanálu Azure DevOps a přidání těchto fází. Můžete automatizovat následující fáze:

    • Spusťte cluster Databricks.
    • Konfigurace rozhraní příkazového řádku Databricks
    • Nainstalujte nástroje Scala.
    • Přidejte tajné kódy Databricks.

    Zvažte psaní automatizovaných integračních testů za účelem zlepšení kvality a spolehlivosti kódu Databricks a jeho životního cyklu.

Nasazení tohoto scénáře

Pokud chcete nasadit a spustit referenční implementaci, postupujte podle kroků v souboru readme GitHubu.

Další krok