Ta architektura referencyjna przedstawia pełny potok przetwarzania strumienia. Ten typ potoku ma cztery etapy: pozyskiwanie, przetwarzanie, przechowywanie i analizowanie oraz raportowanie. W przypadku tej architektury referencyjnej potok pozyskuje dane z dwóch źródeł, wykonuje sprzężenie na powiązanych rekordach z każdego strumienia, wzbogaca wynik i oblicza średnią w czasie rzeczywistym. Wyniki są przechowywane do dalszej analizy.
Implementacja referencyjna dla tej architektury jest dostępna w usłudze GitHub.
Architektura
Pobierz plik programu Visio tej architektury.
Przepływ pracy
Niniejsza architektura zawiera następujące składniki:
Źródła danych. W tej architekturze istnieją dwa źródła danych, które generują strumienie danych w czasie rzeczywistym. Pierwszy strumień zawiera informacje o przejazdach, a drugi zawiera informacje o taryfie. Architektura referencyjna zawiera symulowany generator danych, który odczytuje z zestawu plików statycznych i wypycha dane do usługi Event Hubs. Źródła danych w rzeczywistej aplikacji będą urządzeniami zainstalowanymi w taksówkach.
Azure Event Hubs. Event Hubs to usługa pozyskiwania zdarzeń. Ta architektura używa dwóch wystąpień centrum zdarzeń, po jednym dla każdego źródła danych. Każde źródło danych wysyła strumień danych do skojarzonego centrum zdarzeń.
Azure Databricks. Databricks to oparta na platformie Apache Spark platforma analityczna zoptymalizowana pod kątem platformy usług w chmurze Microsoft Azure. Usługa Databricks służy do korelowania danych przejazdu taksówką i taryfy, a także do wzbogacania skorelowanych danych z danymi sąsiedztwa przechowywanymi w systemie plików usługi Databricks.
Azure Cosmos DB. Dane wyjściowe zadania usługi Azure Databricks to seria rekordów, które są zapisywane w usłudze Azure Cosmos DB dla usługi Apache Cassandra. Usługa Azure Cosmos DB dla systemu Apache Cassandra jest używana, ponieważ obsługuje modelowanie danych szeregów czasowych.
Usługa Azure Synapse Link dla usługi Azure Cosmos DB umożliwia uruchamianie analizy niemal w czasie rzeczywistym na danych operacyjnych w usłudze Azure Cosmos DB bez wpływu na wydajność i koszt obciążenia transakcyjnego przy użyciu dwóch aparatów analitycznych dostępnych w obszarze roboczym usługi Azure Synapse: SQL Serverless i Spark Pools.
usługa Microsoft Fabric Dublowanie usługi Azure Cosmos DB for NoSQL umożliwia integrację danych usługi Azure Cosmos DB z resztą danych w usłudze Microsoft Fabric.
Azure Log Analytics. Dane dziennika aplikacji zebrane przez usługę Azure Monitor są przechowywane w obszarze roboczym usługi Log Analytics. Zapytania usługi Log Analytics mogą służyć do analizowania i wizualizowania metryk oraz inspekcji komunikatów dziennika w celu identyfikowania problemów w aplikacji.
Szczegóły scenariusza
Scenariusz: Firma taksówkarska zbiera dane dotyczące każdej podróży taksówką. W tym scenariuszu przyjęto założenie, że istnieją dwa oddzielne urządzenia wysyłające dane. Taksówka ma miernik, który wysyła informacje o poszczególnych przejazdach — czas trwania, odległość oraz miejsca odbioru i odbioru. Oddzielne urządzenie akceptuje płatności od klientów i wysyła dane dotyczące taryf. Aby dostrzec trendy jazdy, firma taksówkarska chce obliczyć średnią wskazówkę na milę jazdy, w czasie rzeczywistym, dla każdej dzielnicy.
Pozyskiwanie danych
Aby zasymulować źródło danych, ta architektura referencyjna korzysta z zestawu danych danych taksówek w Nowym Jorku[1]. Ten zestaw danych zawiera dane dotyczące przejazdów taksówką w Nowym Jorku w okresie czterech lat (2010 – 2013). Zawiera dwa typy rekordów: dane przejazdu i dane taryfy. Dane przejazdu obejmują czas trwania podróży, odległość podróży oraz lokalizację odbioru i odbioru. Dane taryfy obejmują opłaty, podatki i kwoty porad. Typowe pola w obu typach rekordów obejmują numer medalionu, licencję hack i identyfikator dostawcy. Razem te trzy pola jednoznacznie identyfikują taksówkę i kierowcę. Dane są przechowywane w formacie CSV.
[1] Donovan, Brian; Praca, Dan (2016): New York City Taxi Trip Data (2010-2013). Uniwersytet Illinois w Urbana-Champaign. https://doi.org/10.13012/J8PN93H8
Generator danych to aplikacja platformy .NET Core, która odczytuje rekordy i wysyła je do usługi Azure Event Hubs. Generator wysyła dane przejazdu w formacie JSON i taryfy w formacie CSV.
Usługa Event Hubs używa partycji do segmentowania danych. Partycje umożliwiają użytkownikowi równoległe odczytywanie każdej partycji. Podczas wysyłania danych do usługi Event Hubs można jawnie określić klucz partycji. W przeciwnym razie rekordy są przypisywane do partycji w sposób okrężny.
W tym scenariuszu dane przejazdu i dane taryfy powinny zawierać ten sam identyfikator partycji dla danej taksówki. Dzięki temu usługa Databricks może zastosować stopień równoległości, gdy skoreluje te dwa strumienie. Rekord w partycji n danych przejazdu będzie pasował do rekordu w partycji n danych taryfy.
Pobierz plik programu Visio tej architektury.
W generatorze danych wspólny model danych dla obu typów rekordów ma PartitionKey
właściwość, która jest łączeniem Medallion
, HackLicense
i VendorId
.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Ta właściwość służy do udostępniania jawnego klucza partycji podczas wysyłania do usługi Event Hubs:
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Event Hubs
Pojemność przepływności usługi Event Hubs jest mierzona w jednostkach przepływności. Możesz automatycznie skalować centrum zdarzeń, włączając automatyczne rozszerzanie, które automatycznie skaluje jednostki przepływności na podstawie ruchu, do skonfigurowanej maksymalnej wartości.
Przetwarzanie strumieniowe
W usłudze Azure Databricks przetwarzanie danych jest wykonywane przez zadanie. Zadanie jest przypisywane do klastra i uruchamiane w klastrze. Zadanie może być kodem niestandardowym napisanym w języku Java lub notesem platformy Spark.
W tej architekturze referencyjnej zadanie jest archiwum Języka Java z klasami napisanymi zarówno w języku Java, jak i w języku Scala. Podczas określania archiwum Java dla zadania usługi Databricks klasa jest określana do wykonania przez klaster usługi Databricks.
main
Tutaj metoda com.microsoft.pnp.TaxiCabReader
klasy zawiera logikę przetwarzania danych.
Odczytywanie strumienia z dwóch wystąpień centrum zdarzeń
Logika przetwarzania danych używa przesyłania strumieniowego ze strukturą platformy Spark do odczytu z dwóch wystąpień centrum zdarzeń platformy Azure:
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Wzbogacanie danych za pomocą informacji o sąsiedztwie
Dane przejazdu obejmują współrzędne geograficzne i współrzędne geograficzne lokalizacji odbioru i upuszczania. Chociaż te współrzędne są przydatne, nie są łatwo używane do analizy. W związku z tym te dane są wzbogacone danymi sąsiedztwa odczytywanymi z pliku shape.
Format pliku shape jest binarny i nie jest łatwo analizowany, ale biblioteka GeoTools udostępnia narzędzia do danych geoprzestrzennych korzystających z formatu shapefile. Ta biblioteka jest używana w com.microsoft.pnp.GeoFinder
klasie do określania nazwy sąsiedztwa na podstawie współrzędnych odbioru i upuszczania.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Dołączanie do danych przejazdu i taryfy
Najpierw dane dotyczące przejazdu i taryfy są przekształcane:
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
Następnie dane dotyczące przejazdu są łączone z danymi taryfy:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Przetwarzanie danych i wstawianie do usługi Azure Cosmos DB
Średnia kwota taryfy dla każdej dzielnicy jest obliczana dla danego przedziału czasu:
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
Następnie wstawiono je do usługi Azure Cosmos DB:
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Kwestie wymagające rozważenia
Te zagadnienia implementują filary struktury Azure Well-Architected Framework, która jest zestawem wytycznych, które mogą służyć do poprawy jakości obciążenia. Aby uzyskać więcej informacji, zobacz Microsoft Azure Well-Architected Framework.
Zabezpieczenia
Zabezpieczenia zapewniają ochronę przed celowymi atakami i nadużyciami cennych danych i systemów. Aby uzyskać więcej informacji, zobacz Lista kontrolna przeglądu projektu dotyczącazabezpieczeń.
Dostęp do obszaru roboczego usługi Azure Databricks jest kontrolowany przy użyciu konsoli administratora
Zarządzanie wpisami tajnymi
Usługa Azure Databricks zawiera magazyn wpisów tajnych używany do przechowywania poświadczeń i odwoływać się do nich w notesach i zadaniach. Wpisy tajne w magazynie wpisów tajnych usługi Azure Databricks są partycjonowane według zakresów:
databricks secrets create-scope --scope "azure-databricks-job"
Wpisy tajne są dodawane na poziomie zakresu:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Uwaga
Zakres oparty na usłudze Azure Key Vault powinien być używany zamiast natywnego zakresu usługi Azure Databricks. Aby dowiedzieć się więcej, zobacz Zakresy oparte na usłudze Azure Key Vault.
W kodzie dostęp do wpisów tajnych jest uzyskiwany za pośrednictwem narzędzi wpisów tajnych usługi Azure Databricks.
Optymalizacja kosztów
Optymalizacja kosztów dotyczy sposobów zmniejszenia niepotrzebnych wydatków i poprawy wydajności operacyjnej. Aby uzyskać więcej informacji, zobacz Omówienie filaru optymalizacji kosztów.
Koszty możesz szacować za pomocą kalkulatora cen platformy Azure. Poniżej przedstawiono niektóre zagadnienia dotyczące usług używanych w tej architekturze referencyjnej.
Zagadnienia dotyczące kosztów usługi Event Hubs
Ta architektura referencyjna wdraża usługę Event Hubs w warstwie Standardowa . Model cen jest oparty na jednostkach przepływności, zdarzeniach ruchu przychodzącego i przechwytywaniu zdarzeń. Zdarzenie związane z transferem danych przychodzących to jednostka danych o rozmiarze nie większym niż 64 KB. Większe komunikaty są rozliczane jako wielokrotność 64 KB. Jednostki przepływności można określić za pośrednictwem witryny Azure Portal lub interfejsów API zarządzania usługi Event Hubs.
Jeśli potrzebujesz więcej dni przechowywania, rozważ warstwę Dedykowana. Ta warstwa oferuje wdrożenia z jedną dzierżawą z najbardziej wymagającymi wymaganiami. Ta oferta tworzy klaster na podstawie jednostek pojemności (CU), które nie są powiązane z jednostkami przepływności.
Warstwa Standardowa jest również rozliczana na podstawie zdarzeń ruchu przychodzącego i jednostek przepływności.
Aby uzyskać informacje o cenach usługi Event Hubs, zobacz cennik usługi Event Hubs.
Zagadnienia dotyczące kosztów usługi Azure Databricks
Usługa Azure Databricks oferuje dwie warstwy w warstwie Standardowa i Premium , z których każda obsługuje trzy obciążenia. Ta architektura referencyjna wdraża obszar roboczy usługi Azure Databricks w warstwie Premium .
Obciążenia inżynieryjne danych powinny być uruchamiane w klastrze zadań i są przeznaczone dla inżynierów danych do kompilowania i wykonywania zadań. Obciążenia analizy danych powinny być uruchamiane w klastrze ogólnego przeznaczenia i przeznaczone dla analityków danych do interaktywnego eksplorowania, wizualizowania, manipulowania nimi i udostępniania danych i szczegółowych informacji.
Usługa Azure Databricks oferuje wiele modeli cenowych.
Plan płatności zgodnie z rzeczywistym użyciem
Opłaty są naliczane za maszyny wirtualne aprowidowane w klastrach i jednostkach usługi Databricks (DBU) na podstawie wybranego wystąpienia maszyny wirtualnej. Jednostka DBU to jednostka możliwości przetwarzania rozliczana za użycie na sekundę. Użycie jednostek DBU zależy od rozmiaru i typu wystąpienia z uruchomioną usługą Azure Databricks. Ceny zależą od wybranego obciążenia i warstwy.
Plan przed zakupem
Możesz zatwierdzić jednostki usługi Azure Databricks (DBU) jako jednostki zatwierdzeń usługi Databricks (DBCU) przez jeden lub trzy lata, aby zmniejszyć całkowity koszt posiadania w tym okresie w porównaniu z modelem płatności zgodnie z rzeczywistym użyciem.
Aby uzyskać więcej informacji, zobacz Cennik usługi Azure Databricks.
Zagadnienia dotyczące kosztów usługi Azure Cosmos DB
W tej architekturze seria rekordów jest zapisywana w usłudze Azure Cosmos DB przez zadanie usługi Azure Databricks. Opłaty są naliczane za rezerwową pojemność wyrażoną w jednostkach żądań na sekundę (RU/s), używanej do wykonywania operacji wstawiania. Jednostka rozliczeń wynosi 100 RU/s na godzinę. Na przykład koszt pisania 100 KB elementów wynosi 50 RU/s.
W przypadku operacji zapisu aprowizuj wystarczającą pojemność, aby obsługiwać liczbę operacji zapisu na sekundę. Aprowizowaną przepływność można zwiększyć przy użyciu portalu lub interfejsu wiersza polecenia platformy Azure przed wykonaniem operacji zapisu, a następnie zmniejszyć przepływność po zakończeniu tych operacji. Przepływność okresu zapisu jest minimalną przepływnością wymaganą dla danych oraz przepływnością wymaganą dla operacji wstawiania przy założeniu, że żadne inne obciążenie nie jest uruchomione.
Przykładowa analiza kosztów
Załóżmy, że skonfigurujesz wartość przepływności 1000 RU/s w kontenerze. Jest on wdrażany przez 24 godziny przez 30 dni, łącznie 720 godzin.
Opłaty za kontener są naliczane na 10 jednostek 100 RU/s na godzinę dla każdej godziny. 10 jednostek na 0,008 USD (za 100 RU/s na godzinę) są naliczane opłaty w wysokości 0,08 USD za godzinę.
W przypadku 720 godzin lub 7200 jednostek (z 100 jednostek RU) opłaty są naliczane w wysokości 57,60 USD za miesiąc.
Opłaty za magazyn są również naliczane za każdy GB używany dla przechowywanych danych i indeksu. Aby uzyskać więcej informacji, zobacz Model cen usługi Azure Cosmos DB.
Skorzystaj z kalkulatora pojemności usługi Azure Cosmos DB, aby uzyskać szybkie oszacowanie kosztów obciążenia.
Doskonałość operacyjna
Doskonałość operacyjna obejmuje procesy operacyjne, które wdrażają aplikację i działają w środowisku produkcyjnym. Aby uzyskać więcej informacji, zobacz Lista kontrolna przeglądu projektu dotycząca doskonałości operacyjnej.
Monitorowanie
Usługa Azure Databricks jest oparta na platformie Apache Spark i używa biblioteki log4j jako standardowej biblioteki do rejestrowania. Oprócz domyślnego rejestrowania udostępnianego przez platformę Apache Spark można zaimplementować rejestrowanie w usłudze Azure Log Analytics, postępując zgodnie z artykułem Monitorowanie usługi Azure Databricks.
com.microsoft.pnp.TaxiCabReader
Ponieważ klasa przetwarza komunikaty jazdy i taryfy, możliwe, że jeden z nich może być źle sformułowany i dlatego nie jest prawidłowy. W środowisku produkcyjnym ważne jest analizowanie tych źle sformułowanych komunikatów w celu zidentyfikowania problemu ze źródłami danych, dzięki czemu można je szybko naprawić, aby zapobiec utracie danych. Klasa com.microsoft.pnp.TaxiCabReader
rejestruje akumulator platformy Apache Spark, który śledzi liczbę nieprawidłowo sformułowanych biletów i przejazdów:
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Platforma Apache Spark używa biblioteki Dropwizard do wysyłania metryk, a niektóre z natywnych pól metryk Dropwizard są niezgodne z usługą Azure Log Analytics. W związku z tym ta architektura referencyjna obejmuje niestandardowe ujście i reportera Dropwizard. Formatuje metryki w formacie oczekiwanym przez usługę Azure Log Analytics. W przypadku raportowania metryk platformy Apache Spark wysyłane są również metryki niestandardowe dla źle sformułowanych danych przejazdu i taryfy.
Poniżej przedstawiono przykładowe zapytania, których można użyć w obszarze roboczym usługi Azure Log Analytics do monitorowania wykonywania zadania przesyłania strumieniowego. Argument ago(1d)
w każdym zapytaniu zwróci wszystkie rekordy, które zostały wygenerowane w ciągu ostatniego dnia i można je dostosować, aby wyświetlić inny okres.
Wyjątki rejestrowane podczas wykonywania zapytań strumienia
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Akumulacja źle sformułowanych taryf i danych przejazdu
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Wykonywanie zadania w czasie
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Aby uzyskać więcej informacji, zobacz Monitorowanie usługi Azure Databricks.
Organizacja zasobów i wdrożenia
Utwórz oddzielne grupy zasobów dla środowisk produkcyjnych, programistycznych i testowych. Oddzielne grupy zasobów ułatwiają zarządzanie wdrożeniami, usuwanie wdrożeń testowych i przypisywanie praw dostępu.
Użyj szablonu usługi Azure Resource Manager, aby wdrożyć zasoby platformy Azure zgodnie z procesem infrastruktury jako kodu (IaC). Dzięki szablonom automatyzacja wdrożeń przy użyciu usług Azure DevOps Services lub innych rozwiązań ciągłej integracji/ciągłego wdrażania jest łatwiejsza.
Umieść każde obciążenie w osobnym szablonie wdrożenia i zapisz zasoby w systemach kontroli źródła. Szablony można wdrażać razem lub indywidualnie w ramach procesu ciągłej integracji/ciągłego wdrażania, co ułatwia proces automatyzacji.
W tej architekturze usługi Azure Event Hubs, Log Analytics i Azure Cosmos DB są identyfikowane jako pojedyncze obciążenie. Te zasoby są uwzględniane w jednym szablonie usługi ARM.
Rozważ przemieszczanie obciążeń. Wdróż na różnych etapach i uruchom testy weryfikacyjne na każdym etapie przed przejściem do następnego etapu. Dzięki temu można wypychać aktualizacje do środowisk produkcyjnych w sposób wysoce kontrolowany i zminimalizować nieprzewidziane problemy z wdrażaniem.
W tej architekturze istnieje wiele etapów wdrażania. Rozważ utworzenie potoku usługi Azure DevOps i dodanie tych etapów. Oto kilka przykładów etapów, które można zautomatyzować:
- Uruchamianie klastra usługi Databricks
- Konfigurowanie interfejsu wiersza polecenia usługi Databricks
- Instalowanie narzędzi Języka Scala
- Dodawanie wpisów tajnych usługi Databricks
Należy również rozważyć napisanie zautomatyzowanych testów integracji w celu poprawy jakości i niezawodności kodu usługi Databricks oraz jego cyklu życia.
Wdrażanie tego scenariusza
Aby wdrożyć i uruchomić implementację referencyjną, wykonaj kroki opisane w pliku readme usługi GitHub.