Tato referenční architektura ukazuje kompletní kanál zpracování datových proudů. Čtyři fáze tohoto kanálu jsou ingestování, zpracování, ukládání a analýza a sestava. V této referenční architektuře kanál ingestuje data ze dvou zdrojů, provádí spojení souvisejících záznamů z každého datového proudu, rozšiřuje výsledek a vypočítá průměr v reálném čase. Výsledky se pak uloží pro další analýzu.
Referenční implementace pro tuto architekturu je k dispozici na GitHubu.
Architektura
Stáhněte si soubor Visio této architektury.
Workflow
Následující tok dat odpovídá předchozímu diagramu:
V této architektuře existují dva zdroje dat, které generují datové proudy v reálném čase. První datový proud obsahuje informace o jízdě a druhý proud obsahuje informace o jízdě. Referenční architektura zahrnuje simulovaný generátor dat, který čte ze sady statických souborů a odesílá data do služby Azure Event Hubs. Zdroje dat v reálné aplikaci jsou zařízení nainstalovaná v taxislužbách.
služba Event Hubs je služba pro příjem událostí. Tato architektura používá dvě instance centra událostí, jednu pro každý zdroj dat. Každý zdroj dat odešle datový proud do přidruženého centra událostí.
Azure Databricks je analytická platforma založená na Apache Sparku, která je optimalizovaná pro platformu cloudových služeb Microsoft Azure. Azure Databricks se používá ke korelaci dat o jízdě taxíkem a jízdě a jízdě a obohacení korelovaných dat o data sousedství, která jsou uložená v systému souborů Azure Databricks.
azure Cosmos DB je plně spravovaná databázová služba s více modely. Výstup úlohy Azure Databricks je řada záznamů, které se zapisují do služby Azure Cosmos DB pro Apache Cassandra. Azure Cosmos DB pro Apache Cassandra se používá, protože podporuje modelování dat časových řad.
Azure Synapse Link pro službu Azure Cosmos DB umožňuje spouštět analýzy provozních dat v téměř reálném čase ve službě Azure Cosmos DB bez jakéhokoliv výkonu nebo nákladů na transakční úlohy. Tyto výsledky můžete dosáhnout pomocí bezserverového fondu SQL a fondů Spark. Tyto analytické moduly jsou k dispozici v pracovním prostoru Služby Azure Synapse Analytics.
zrcadlení služby Azure Cosmos DB for NoSQL v Microsoft Fabric umožňuje integrovat data Azure Cosmos DB se zbývajícími daty v Microsoft Fabric.
Log Analytics je nástroj ve službě Azure Monitor, který umožňuje dotazovat a analyzovat data protokolů z různých zdrojů. Data protokolu aplikací, která azure Monitor shromažďuje, jsou uložená v pracovním prostoru služby Log Analytics. Pomocí dotazů Log Analytics můžete analyzovat a vizualizovat metriky a kontrolovat zprávy protokolu a identifikovat problémy v aplikaci.
Podrobnosti scénáře
Taxislužba shromažďuje data o každé jízdě taxíkem. V tomto scénáři předpokládáme, že dvě samostatná zařízení odesílají data. Taxi má měřič, který odesílá informace o každé jízdě, včetně doby trvání, vzdálenosti a vyzvednutí a odkládacích míst. Samostatné zařízení přijímá platby od zákazníků a odesílá data o jízdné. Pokud chce taxislužba zjistit trendy ridershipu, chce vypočítat průměrný tip na míle řízený pro každou čtvrť v reálném čase.
Příjem dat
K simulaci zdroje dat tato referenční architektura používá datovou sadu dat taxislužby New York City1. Tato datová sada obsahuje data o jízdách taxíkem v New Yorku od roku 2010 do roku 2013. Obsahuje záznamy dat o jízdě i jízdě. Data o jízdě zahrnují dobu jízdy, vzdálenost jízdy a vyzvednutí a odkládací místa. Údaje o jízdné zahrnují ceny jízdné, daně a tipové částky. Pole v obou typech záznamů zahrnují číslo medailiónu, licenci hacku a ID dodavatele. Kombinace těchto tří polí jednoznačně identifikuje taxi a řidiče. Data se ukládají ve formátu CSV.
[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois ve společnosti Urbana-Champaign. https://doi.org/10.13012/J8PN93H8
Generátor dat je aplikace .NET Core, která čte záznamy a odesílá je do služby Event Hubs. Generátor odesílá data jízdy ve formátu JSON a data jízdného ve formátu CSV.
Služba Event Hubs používá k segmentování dat oddíly . Oddíly umožňují příjemci číst každý oddíl paralelně. Když odesíláte data do služby Event Hubs, můžete klíč oddílu zadat přímo. V opačném případě se záznamy přiřazují k oddílům způsobem kruhového dotazování.
V tomto scénáři by data jízdy a jízdné měly být přiřazeny stejnému ID oddílu pro konkrétní taxislužba. Toto přiřazení umožňuje Databricks použít stupeň paralelismu, když koreluje tyto dva datové proudy. Například záznam v oddílu n dat jízdy odpovídá záznamu v oddílu n dat jízdy.
Stáhněte si soubor Visia této architektury.
V generátoru dat má společný datový model pro oba typy PartitionKey
záznamů vlastnost, která je zřetězením Medallion
, HackLicense
a VendorId
.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Tato vlastnost slouží k poskytnutí explicitního klíče oddílu při odesílání dat do služby Event Hubs.
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Event Hubs
Kapacita propustnosti služby Event Hubs se měří v jednotkách propustnosti. Centrum událostí můžete automaticky škálovat povolením automatického nafouknutí . Tato funkce automaticky škáluje jednotky propustnosti na základě provozu až na nakonfigurované maximum.
Zpracování datových proudů
V Azure Databricks provádí úloha zpracování dat. Úloha se přiřadí ke clusteru a pak na něm běží. Úloha může být vlastní kód napsaný v Javě nebo poznámkový blok Spark .
V této referenční architektuře je úloha archiv Jazyka Java, který obsahuje třídy napsané v Javě a Scala. Když zadáte archiv Javy pro úlohu Databricks, cluster Databricks určuje třídu pro operaci.
main
Zde metoda com.microsoft.pnp.TaxiCabReader
třídy obsahuje logiku zpracování dat.
Čtení datového proudu ze dvou instancí centra událostí
Logika zpracování dat používá strukturované streamování Sparku ke čtení ze dvou instancí centra událostí Azure:
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Obohacení dat informacemi o sousedství
Data o jízdě zahrnují souřadnice zeměpisné šířky a délky vyzvednutí a odkládacích míst. Tyto souřadnice jsou užitečné, ale nejsou snadno využité k analýze. Proto jsou tato data rozšířena o data sousedství, která se čtou ze souboru obrazce .
Formát souboru shapefile je binární a není snadno parsován. Knihovna GeoTools ale poskytuje nástroje pro geoprostorová data, která používají formát souboru shapefile. Tato knihovna se používá v com.microsoft.pnp.GeoFinder
třídě k určení názvu sousedství na základě souřadnic pro vyzvednutí a odkládací umístění.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Připojte se k jízdě a datům o jízdě a jízdě
Nejprve se transformují data jízdy a jízdy:
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
Data jízdy se pak spojí s daty jízdného:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Zpracování dat a jejich vložení do služby Azure Cosmos DB
Průměrná částka jízdného pro každou čtvrť se vypočítá pro konkrétní časový interval:
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
Průměrná částka jízdy se pak vloží do služby Azure Cosmos DB:
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Důležité informace
Tyto aspekty implementují pilíře dobře architektuře Azure, což je sada hlavních principů, které můžete použít ke zlepšení kvality úlohy. Další informace naleznete v tématu Microsoft Azure Well-Architected Framework.
Zabezpečení
Zabezpečení poskytuje záruky proti záměrným útokům a zneužití cenných dat a systémů. Další informace naleznete v tématu Kontrolní seznam pro kontrolu návrhu prozabezpečení .
Přístup k pracovnímu prostoru Azure Databricks se řídí pomocí konzoly správce . Konzola správce obsahuje funkce pro přidání uživatelů, správu uživatelských oprávnění a nastavení jednotného přihlašování. Řízení přístupu pro pracovní prostory, clustery, úlohy a tabulky je také možné nastavit prostřednictvím konzoly správce.
Správa tajných kódů
Azure Databricks obsahuje úložiště tajných kódů, které slouží k ukládání přihlašovacích údajů a odkazování na ně v poznámkových blocích a úlohách. Rozsahy tajných kódů oddílů v úložišti tajných kódů Azure Databricks:
databricks secrets create-scope --scope "azure-databricks-job"
Tajné kódy se přidají na úrovni oboru:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Poznámka:
Místo nativního oboru Azure Databricks použijte rozsah založený na službě Azure Key Vault.
V kódu jsou tajné kódy přístupné prostřednictvím nástrojů pro tajné kódy Azure Databricks.
Optimalizace nákladů
Optimalizace nákladů se zaměřuje na způsoby, jak snížit zbytečné výdaje a zlepšit efektivitu provozu. Další informace naleznete v tématu Kontrolní seznam pro kontrolu návrhu proOptimalizace nákladů .
K odhadu nákladů použijte cenovou kalkulačku Azure. Vezměte v úvahu následující služby používané v této referenční architektuře.
Důležité informace o nákladech služby Event Hubs
Tato referenční architektura nasadí službu Event Hubs na úrovni Standard. Cenový model je založený na jednotkách propustnosti, událostech příchozího přenosu dat a zachytávání událostí. Událost příchozího přenosu dat je jednotka dat, která jsou 64 kB nebo menší. Větší zprávy se účtují v násobcích po 64 kB. Jednotky propustnosti zadáte prostřednictvím webu Azure Portal nebo rozhraní API pro správu služby Event Hubs.
Pokud potřebujete více dnů uchovávání, zvažte úroveň Dedicated. Tato úroveň poskytuje nasazení s jedním tenantem, která mají přísné požadavky. Tato nabídka vytvoří cluster založený na jednotkách kapacity a není závislý na jednotkách propustnosti. Úroveň Standard se také účtuje na základě událostí příchozího přenosu dat a jednotek propustnosti.
Další informace najdete v tématu ceny služby Event Hubs.
Důležité informace o nákladech na Azure Databricks
Azure Databricks poskytuje úroveň Standard a úroveň Premium, z nichž obě podporují tři úlohy. Tato referenční architektura nasadí pracovní prostor Azure Databricks na úrovni Premium.
Úlohy přípravy dat by se měly spouštět v clusteru úloh. Datoví inženýři používají clustery k vytváření a provádění úloh. Úlohy analýzy dat by se měly spouštět v clusteru pro všechny účely a jsou určené pro datové vědce, aby mohli interaktivně zkoumat, vizualizovat, manipulovat s nimi a sdílet data a přehledy.
Azure Databricks poskytuje více cenových modelů.
plán průběžných plateb
Účtují se vám virtuální počítače zřízené v clusterech a jednotkách Azure Databricks (DBU) na základě zvolené instance virtuálního počítače. DBU je jednotka schopností zpracování, která se účtuje podle využití za sekundu. Spotřeba DBU závisí na velikosti a typu instance, která běží v Azure Databricks. Ceny závisí na zvolené úloze a úrovni.
předkupní plán
Do jednotek potvrzení Azure Databricks se zavazujete po dobu jednoho nebo tří let, abyste snížili celkové náklady na vlastnictví v daném časovém období v porovnání s modelem průběžných plateb.
Další informace najdete v tématu cenovýchAzure Databricks .
Důležité informace o nákladech služby Azure Cosmos DB
V této architektuře úloha Azure Databricks zapisuje do služby Azure Cosmos DB řadu záznamů. Účtuje se vám kapacita, kterou si rezervujete, která se měří v jednotkách žádostí za sekundu (RU/s). Tato kapacita se používá k provádění operací vložení. Jednotka pro fakturaci je 100 RU/s za hodinu. Například náklady na zápis 100 kB položek jsou 50 RU/s.
V případě operací zápisu zřiďte dostatečnou kapacitu pro podporu počtu zápisů potřebných za sekundu. Zřízenou propustnost můžete zvýšit pomocí portálu nebo Azure CLI před provedením operací zápisu a následným snížením propustnosti po dokončení těchto operací. Propustnost pro období zápisu je součet minimální propustnosti potřebné pro konkrétní data a propustnost potřebnou pro operaci vložení. Tento výpočet předpokládá, že není spuštěná žádná jiná úloha.
Příklad analýzy nákladů
Předpokládejme, že pro kontejner nakonfigurujete hodnotu propustnosti 1 000 RU/s. Je nasazená po dobu 24 hodin po dobu 30 dnů, celkem 720 hodin.
Kontejner se účtuje za 10 jednotek 100 RU/s za hodinu za každou hodinu. Deset jednotek v hodnotě 0,008 USD (za 100 RU/s za hodinu) se účtují v hodnotě 0,08 USD za hodinu.
Za 720 hodin nebo 7 200 jednotek (100 RU) se vám účtuje 57,60 USD za měsíc.
Úložiště se také účtuje za každou GB, která se používá pro uložená data a index. Další informace najdete v cenovém modelu služby Azure Cosmos DB.
K rychlému odhadu nákladů na úlohy použijte kalkulačku kapacity služby Azure Cosmos DB.
Efektivita provozu
Efektivita provozu se zabývá provozními procesy, které nasazují aplikaci a udržují ji spuštěnou v produkčním prostředí. Další informace naleznete v tématu kontrolní seznam pro kontrolu efektivity provozu.
Sledování
Azure Databricks je založen na Apache Sparku. Azure Databricks i Apache Spark používají Apache Log4j jako standardní knihovnu pro protokolování. Kromě výchozího protokolování, které Poskytuje Apache Spark, můžete implementovat protokolování v Log Analytics. Další informace najdete v tématu Monitorování Azure Databricks.
Protože třída com.microsoft.pnp.TaxiCabReader
zpracovává zprávy jízdy a jízdy, může být zpráva poškozena, a proto není platná. V produkčním prostředí je důležité analyzovat tyto poškozené zprávy a identifikovat problém se zdroji dat, aby bylo možné je rychle opravit, aby se zabránilo ztrátě dat. Třída com.microsoft.pnp.TaxiCabReader
zaregistruje akumulátor Apache Sparku, který sleduje počet poškozených záznamů jízdy a záznamů jízdy:
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Apache Spark používá k odesílání metrik knihovnu Dropwizard. Některá nativní pole metrik Dropwizard nejsou kompatibilní s Log Analytics, což je důvod, proč tato referenční architektura zahrnuje vlastní jímku Dropwizard a reporter. Formátuje metriky ve formátu, který Log Analytics očekává. Když Apache Spark hlásí metriky, posílají se také vlastní metriky pro špatně zformulovaná data jízdy a jízdného.
Pomocí následujících ukázkových dotazů v pracovním prostoru služby Log Analytics můžete monitorovat provoz úlohy streamování. Argument ago(1d)
v každém dotazu vrátí všechny záznamy, které byly vygenerovány za poslední den. Tento parametr můžete upravit, aby se zobrazilo jiné časové období.
Výjimky zaprotokolované během operace dotazu streamu
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Akumulace poškozených jízdných dat a dat o jízdě
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Operace úlohy v průběhu času
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Organizace prostředků a nasazení
Vytvořte samostatné skupiny prostředků pro produkční, vývojové a testovací prostředí. Samostatné skupiny prostředků usnadňují správu nasazení, odstraňování testovacích nasazení a přiřazování přístupových práv.
Pomocí šablony Azure Resource Manageru nasaďte prostředky Azure podle procesu infrastruktury jako kódu. Pomocí šablon můžete snadno automatizovat nasazení pomocí služeb Azure DevOps nebo jiných řešení kontinuální integrace a průběžného doručování (CI/CD).
Každou úlohu umístěte do samostatné šablony nasazení a uložte prostředky do systémů správy zdrojového kódu. Šablony můžete nasadit společně nebo jednotlivě jako součást procesu CI/CD. Tento přístup zjednodušuje proces automatizace.
V této architektuře jsou služba Event Hubs, Log Analytics a Azure Cosmos DB identifikovány jako jedna úloha. Tyto prostředky jsou součástí jedné šablony Azure Resource Manageru.
Zvažte přípravu úloh. Nasaďte je do různých fází a před přechodem do další fáze spusťte kontroly ověřování v každé fázi. Díky tomu můžete řídit způsob nabízení aktualizací do produkčních prostředí a minimalizovat neočekávané problémy s nasazením.
V této architektuře existuje několik fází nasazení. Zvažte vytvoření kanálu Azure DevOps a přidání těchto fází. Můžete automatizovat následující fáze:
- Spusťte cluster Databricks.
- Konfigurace rozhraní příkazového řádku Databricks
- Nainstalujte nástroje Scala.
- Přidejte tajné kódy Databricks.
Zvažte psaní automatizovaných integračních testů za účelem zlepšení kvality a spolehlivosti kódu Databricks a jeho životního cyklu.
Nasazení tohoto scénáře
Pokud chcete nasadit a spustit referenční implementaci, postupujte podle kroků v souboru readme GitHubu.