Przetwarzanie strumieni za pomocą usługi Azure Databricks

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

Ta architektura referencyjna przedstawia pełny potok przetwarzania strumienia. Ten typ potoku ma cztery etapy: pozyskiwanie, przetwarzanie, przechowywanie i analizowanie oraz raportowanie. W przypadku tej architektury referencyjnej potok pozyskuje dane z dwóch źródeł, wykonuje sprzężenie na powiązanych rekordach z każdego strumienia, wzbogaca wynik i oblicza średnią w czasie rzeczywistym. Wyniki są przechowywane do dalszej analizy.

Logo usługi GitHub Implementacja referencyjna dla tej architektury jest dostępna w usłudze GitHub.

Architektura

Diagram przedstawiający architekturę referencyjną przetwarzania strumieniowego za pomocą usługi Azure Databricks.

Pobierz plik programu Visio tej architektury.

Przepływ pracy

Niniejsza architektura zawiera następujące składniki:

Źródła danych. W tej architekturze istnieją dwa źródła danych, które generują strumienie danych w czasie rzeczywistym. Pierwszy strumień zawiera informacje o przejazdach, a drugi zawiera informacje o taryfie. Architektura referencyjna zawiera symulowany generator danych, który odczytuje z zestawu plików statycznych i wypycha dane do usługi Event Hubs. Źródła danych w rzeczywistej aplikacji będą urządzeniami zainstalowanymi w taksówkach.

Azure Event Hubs. Event Hubs to usługa pozyskiwania zdarzeń. Ta architektura używa dwóch wystąpień centrum zdarzeń, po jednym dla każdego źródła danych. Każde źródło danych wysyła strumień danych do skojarzonego centrum zdarzeń.

Azure Databricks. Databricks to oparta na platformie Apache Spark platforma analityczna zoptymalizowana pod kątem platformy usług w chmurze Microsoft Azure. Usługa Databricks służy do korelowania danych przejazdu taksówką i taryfy, a także do wzbogacania skorelowanych danych z danymi sąsiedztwa przechowywanymi w systemie plików usługi Databricks.

Azure Cosmos DB. Dane wyjściowe zadania usługi Azure Databricks to seria rekordów, które są zapisywane w usłudze Azure Cosmos DB dla usługi Apache Cassandra. Usługa Azure Cosmos DB dla systemu Apache Cassandra jest używana, ponieważ obsługuje modelowanie danych szeregów czasowych.

Azure Log Analytics. Dane dziennika aplikacji zebrane przez usługę Azure Monitor są przechowywane w obszarze roboczym usługi Log Analytics. Zapytania usługi Log Analytics mogą służyć do analizowania i wizualizowania metryk oraz inspekcji komunikatów dziennika w celu identyfikowania problemów w aplikacji.

Szczegóły scenariusza

Scenariusz: Firma taksówkarska zbiera dane dotyczące każdej podróży taksówką. W tym scenariuszu przyjęto założenie, że istnieją dwa oddzielne urządzenia wysyłające dane. Taksówka ma miernik, który wysyła informacje o poszczególnych przejazdach — czas trwania, odległość oraz miejsca odbioru i odbioru. Oddzielne urządzenie akceptuje płatności od klientów i wysyła dane dotyczące taryf. Aby dostrzec trendy jazdy, firma taksówkarska chce obliczyć średnią wskazówkę na milę jazdy, w czasie rzeczywistym, dla każdej dzielnicy.

Pozyskiwanie danych

Aby zasymulować źródło danych, ta architektura referencyjna korzysta z zestawu danych danych taksówek w Nowym Jorku[1]. Ten zestaw danych zawiera dane dotyczące przejazdów taksówką w Nowym Jorku w okresie czterech lat (2010 – 2013). Zawiera dwa typy rekordów: dane przejazdu i dane taryfy. Dane przejazdu obejmują czas trwania podróży, odległość podróży oraz lokalizację odbioru i odbioru. Dane taryfy obejmują opłaty, podatki i kwoty porad. Typowe pola w obu typach rekordów obejmują numer medalionu, licencję hack i identyfikator dostawcy. Razem te trzy pola jednoznacznie identyfikują taksówkę i kierowcę. Dane są przechowywane w formacie CSV.

[1] Donovan, Brian; Praca, Dan (2016): New York City Taxi Trip Data (2010-2013). Uniwersytet Illinois w Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Generator danych to aplikacja platformy .NET Core, która odczytuje rekordy i wysyła je do usługi Azure Event Hubs. Generator wysyła dane przejazdu w formacie JSON i taryfy w formacie CSV.

Usługa Event Hubs używa partycji do segmentowania danych. Partycje umożliwiają użytkownikowi równoległe odczytywanie każdej partycji. Podczas wysyłania danych do usługi Event Hubs można jawnie określić klucz partycji. W przeciwnym razie rekordy są przypisywane do partycji w sposób okrężny.

W tym scenariuszu dane przejazdu i dane taryfy powinny zawierać ten sam identyfikator partycji dla danej taksówki. Dzięki temu usługa Databricks może zastosować stopień równoległości, gdy skoreluje te dwa strumienie. Rekord w partycji n danych przejazdu będzie pasował do rekordu w partycji n danych taryfy.

Diagram przetwarzania strumieniowego za pomocą usług Azure Databricks i Event Hubs.

Pobierz plik programu Visio tej architektury.

W generatorze danych wspólny model danych dla obu typów rekordów ma PartitionKey właściwość, która jest łączeniem Medallion, HackLicensei VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Ta właściwość służy do udostępniania jawnego klucza partycji podczas wysyłania do usługi Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Pojemność przepływności usługi Event Hubs jest mierzona w jednostkach przepływności. Możesz automatycznie skalować centrum zdarzeń, włączając automatyczne rozszerzanie, które automatycznie skaluje jednostki przepływności na podstawie ruchu, do skonfigurowanej maksymalnej wartości.

Przetwarzanie strumieniowe

W usłudze Azure Databricks przetwarzanie danych jest wykonywane przez zadanie. Zadanie jest przypisywane do klastra i uruchamiane w klastrze. Zadanie może być kodem niestandardowym napisanym w języku Java lub notesem platformy Spark.

W tej architekturze referencyjnej zadanie jest archiwum Języka Java z klasami napisanymi zarówno w języku Java, jak i w języku Scala. Podczas określania archiwum Java dla zadania usługi Databricks klasa jest określana do wykonania przez klaster usługi Databricks. main Tutaj metoda com.microsoft.pnp.TaxiCabReader klasy zawiera logikę przetwarzania danych.

Odczytywanie strumienia z dwóch wystąpień centrum zdarzeń

Logika przetwarzania danych używa przesyłania strumieniowego ze strukturą platformy Spark do odczytu z dwóch wystąpień centrum zdarzeń platformy Azure:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Wzbogacanie danych za pomocą informacji o sąsiedztwie

Dane przejazdu obejmują współrzędne geograficzne i współrzędne geograficzne lokalizacji odbioru i upuszczania. Chociaż te współrzędne są przydatne, nie są łatwo używane do analizy. W związku z tym te dane są wzbogacone danymi sąsiedztwa odczytywanymi z pliku shape.

Format pliku shape jest binarny i nie jest łatwo analizowany, ale biblioteka GeoTools udostępnia narzędzia do danych geoprzestrzennych korzystających z formatu shapefile. Ta biblioteka jest używana w com.microsoft.pnp.GeoFinder klasie do określania nazwy sąsiedztwa na podstawie współrzędnych odbioru i upuszczania.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Dołączanie do danych przejazdu i taryfy

Najpierw dane dotyczące przejazdu i taryfy są przekształcane:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Następnie dane dotyczące przejazdu są łączone z danymi taryfy:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Przetwarzanie danych i wstawianie do usługi Azure Cosmos DB

Średnia kwota taryfy dla każdej dzielnicy jest obliczana dla danego przedziału czasu:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Następnie wstawiono je do usługi Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Kwestie wymagające rozważenia

Te zagadnienia implementują filary struktury Azure Well-Architected Framework, która jest zestawem wytycznych, które mogą służyć do poprawy jakości obciążenia. Aby uzyskać więcej informacji, zobacz Microsoft Azure Well-Architected Framework.

Zabezpieczenia

Zabezpieczenia zapewniają ochronę przed celowymi atakami i nadużyciami cennych danych i systemów. Aby uzyskać więcej informacji, zobacz Lista kontrolna przeglądu projektu dotyczącazabezpieczeń.

Dostęp do obszaru roboczego usługi Azure Databricks jest kontrolowany przy użyciu konsoli administratora konsola administratora zawiera funkcje dodawania użytkowników, zarządzania uprawnieniami użytkowników i konfigurowania logowania jednokrotnego. Kontrola dostępu dla obszarów roboczych, klastrów, zadań i tabel można również ustawić za pomocą konsoli administratora.

Zarządzanie wpisami tajnymi

Usługa Azure Databricks zawiera magazyn wpisów tajnych używany do przechowywania poświadczeń i odwoływać się do nich w notesach i zadaniach. Wpisy tajne w magazynie wpisów tajnych usługi Azure Databricks są partycjonowane według zakresów:

databricks secrets create-scope --scope "azure-databricks-job"

Wpisy tajne są dodawane na poziomie zakresu:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Uwaga

Zakres oparty na usłudze Azure Key Vault powinien być używany zamiast natywnego zakresu usługi Azure Databricks. Aby dowiedzieć się więcej, zobacz Zakresy oparte na usłudze Azure Key Vault.

W kodzie dostęp do wpisów tajnych jest uzyskiwany za pośrednictwem narzędzi wpisów tajnych usługi Azure Databricks.

Optymalizacja kosztów

Optymalizacja kosztów dotyczy sposobów zmniejszenia niepotrzebnych wydatków i poprawy wydajności operacyjnej. Aby uzyskać więcej informacji, zobacz Omówienie filaru optymalizacji kosztów.

Koszty możesz szacować za pomocą kalkulatora cen platformy Azure. Poniżej przedstawiono niektóre zagadnienia dotyczące usług używanych w tej architekturze referencyjnej.

Zagadnienia dotyczące kosztów usługi Event Hubs

Ta architektura referencyjna wdraża usługę Event Hubs w warstwie Standardowa . Model cen jest oparty na jednostkach przepływności, zdarzeniach ruchu przychodzącego i przechwytywaniu zdarzeń. Zdarzenie związane z transferem danych przychodzących to jednostka danych o rozmiarze nie większym niż 64 KB. Większe komunikaty są rozliczane jako wielokrotność 64 KB. Jednostki przepływności można określić za pośrednictwem witryny Azure Portal lub interfejsów API zarządzania usługi Event Hubs.

Jeśli potrzebujesz więcej dni przechowywania, rozważ warstwę Dedykowana. Ta warstwa oferuje wdrożenia z jedną dzierżawą z najbardziej wymagającymi wymaganiami. Ta oferta tworzy klaster na podstawie jednostek pojemności (CU), które nie są powiązane z jednostkami przepływności.

Warstwa Standardowa jest również rozliczana na podstawie zdarzeń ruchu przychodzącego i jednostek przepływności.

Aby uzyskać informacje o cenach usługi Event Hubs, zobacz cennik usługi Event Hubs.

Zagadnienia dotyczące kosztów usługi Azure Databricks

Usługa Azure Databricks oferuje dwie warstwy w warstwie Standardowa i Premium , z których każda obsługuje trzy obciążenia. Ta architektura referencyjna wdraża obszar roboczy usługi Azure Databricks w warstwie Premium .

Obciążenia inżynieryjne danych powinny być uruchamiane w klastrze zadań i są przeznaczone dla inżynierów danych do kompilowania i wykonywania zadań. Obciążenia analizy danych powinny być uruchamiane w klastrze ogólnego przeznaczenia i przeznaczone dla analityków danych do interaktywnego eksplorowania, wizualizowania, manipulowania nimi i udostępniania danych i szczegółowych informacji.

Usługa Azure Databricks oferuje wiele modeli cenowych.

  • Plan płatności zgodnie z rzeczywistym użyciem

    Opłaty są naliczane za maszyny wirtualne aprowidowane w klastrach i jednostkach usługi Databricks (DBU) na podstawie wybranego wystąpienia maszyny wirtualnej. Jednostka DBU to jednostka możliwości przetwarzania rozliczana za użycie na sekundę. Użycie jednostek DBU zależy od rozmiaru i typu wystąpienia z uruchomioną usługą Azure Databricks. Ceny zależą od wybranego obciążenia i warstwy.

  • Plan przed zakupem

    Możesz zatwierdzić jednostki usługi Azure Databricks (DBU) jako jednostki zatwierdzeń usługi Databricks (DBCU) przez jeden lub trzy lata, aby zmniejszyć całkowity koszt posiadania w tym okresie w porównaniu z modelem płatności zgodnie z rzeczywistym użyciem.

Aby uzyskać więcej informacji, zobacz Cennik usługi Azure Databricks.

Zagadnienia dotyczące kosztów usługi Azure Cosmos DB

W tej architekturze seria rekordów jest zapisywana w usłudze Azure Cosmos DB przez zadanie usługi Azure Databricks. Opłaty są naliczane za rezerwową pojemność wyrażoną w jednostkach żądań na sekundę (RU/s), używanej do wykonywania operacji wstawiania. Jednostka rozliczeń wynosi 100 RU/s na godzinę. Na przykład koszt pisania 100 KB elementów wynosi 50 RU/s.

W przypadku operacji zapisu aprowizuj wystarczającą pojemność, aby obsługiwać liczbę operacji zapisu na sekundę. Aprowizowaną przepływność można zwiększyć przy użyciu portalu lub interfejsu wiersza polecenia platformy Azure przed wykonaniem operacji zapisu, a następnie zmniejszyć przepływność po zakończeniu tych operacji. Przepływność okresu zapisu jest minimalną przepływnością wymaganą dla danych oraz przepływnością wymaganą dla operacji wstawiania przy założeniu, że żadne inne obciążenie nie jest uruchomione.

Przykładowa analiza kosztów

Załóżmy, że skonfigurujesz wartość przepływności 1000 RU/s w kontenerze. Jest on wdrażany przez 24 godziny przez 30 dni, łącznie 720 godzin.

Opłaty za kontener są naliczane na 10 jednostek 100 RU/s na godzinę dla każdej godziny. 10 jednostek na 0,008 USD (za 100 RU/s na godzinę) są naliczane opłaty w wysokości 0,08 USD za godzinę.

W przypadku 720 godzin lub 7200 jednostek (z 100 jednostek RU) opłaty są naliczane w wysokości 57,60 USD za miesiąc.

Opłaty za magazyn są również naliczane za każdy GB używany dla przechowywanych danych i indeksu. Aby uzyskać więcej informacji, zobacz Model cen usługi Azure Cosmos DB.

Skorzystaj z kalkulatora pojemności usługi Azure Cosmos DB, aby uzyskać szybkie oszacowanie kosztów obciążenia.

Doskonałość operacyjna

Doskonałość operacyjna obejmuje procesy operacyjne, które wdrażają aplikację i działają w środowisku produkcyjnym. Aby uzyskać więcej informacji, zobacz Lista kontrolna przeglądu projektu dotycząca doskonałości operacyjnej.

Monitorowanie

Usługa Azure Databricks jest oparta na platformie Apache Spark i używa biblioteki log4j jako standardowej biblioteki do rejestrowania. Oprócz domyślnego rejestrowania udostępnianego przez platformę Apache Spark można zaimplementować rejestrowanie w usłudze Azure Log Analytics, postępując zgodnie z artykułem Monitorowanie usługi Azure Databricks.

com.microsoft.pnp.TaxiCabReader Ponieważ klasa przetwarza komunikaty jazdy i taryfy, możliwe, że jeden z nich może być źle sformułowany i dlatego nie jest prawidłowy. W środowisku produkcyjnym ważne jest analizowanie tych źle sformułowanych komunikatów w celu zidentyfikowania problemu ze źródłami danych, dzięki czemu można je szybko naprawić, aby zapobiec utracie danych. Klasa com.microsoft.pnp.TaxiCabReader rejestruje akumulator platformy Apache Spark, który śledzi liczbę nieprawidłowo sformułowanych biletów i przejazdów:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Platforma Apache Spark używa biblioteki Dropwizard do wysyłania metryk, a niektóre z natywnych pól metryk Dropwizard są niezgodne z usługą Azure Log Analytics. W związku z tym ta architektura referencyjna obejmuje niestandardowe ujście i reportera Dropwizard. Formatuje metryki w formacie oczekiwanym przez usługę Azure Log Analytics. W przypadku raportowania metryk platformy Apache Spark wysyłane są również metryki niestandardowe dla źle sformułowanych danych przejazdu i taryfy.

Poniżej przedstawiono przykładowe zapytania, których można użyć w obszarze roboczym usługi Azure Log Analytics do monitorowania wykonywania zadania przesyłania strumieniowego. Argument ago(1d) w każdym zapytaniu zwróci wszystkie rekordy, które zostały wygenerowane w ciągu ostatniego dnia i można je dostosować, aby wyświetlić inny okres.

Wyjątki rejestrowane podczas wykonywania zapytań strumienia

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Akumulacja źle sformułowanych taryf i danych przejazdu

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Wykonywanie zadania w czasie

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Aby uzyskać więcej informacji, zobacz Monitorowanie usługi Azure Databricks.

Organizacja zasobów i wdrożenia

  • Utwórz oddzielne grupy zasobów dla środowisk produkcyjnych, programistycznych i testowych. Oddzielne grupy zasobów ułatwiają zarządzanie wdrożeniami, usuwanie wdrożeń testowych i przypisywanie praw dostępu.

  • Użyj szablonu usługi Azure Resource Manager, aby wdrożyć zasoby platformy Azure zgodnie z procesem infrastruktury jako kodu (IaC). Dzięki szablonom automatyzacja wdrożeń przy użyciu usług Azure DevOps Services lub innych rozwiązań ciągłej integracji/ciągłego wdrażania jest łatwiejsza.

  • Umieść każde obciążenie w osobnym szablonie wdrożenia i zapisz zasoby w systemach kontroli źródła. Szablony można wdrażać razem lub indywidualnie w ramach procesu ciągłej integracji/ciągłego wdrażania, co ułatwia proces automatyzacji.

    W tej architekturze usługi Azure Event Hubs, Log Analytics i Azure Cosmos DB są identyfikowane jako pojedyncze obciążenie. Te zasoby są uwzględniane w jednym szablonie usługi ARM.

  • Rozważ przemieszczanie obciążeń. Wdróż na różnych etapach i uruchom testy weryfikacyjne na każdym etapie przed przejściem do następnego etapu. Dzięki temu można wypychać aktualizacje do środowisk produkcyjnych w sposób wysoce kontrolowany i zminimalizować nieprzewidziane problemy z wdrażaniem.

    W tej architekturze istnieje wiele etapów wdrażania. Rozważ utworzenie potoku usługi Azure DevOps i dodanie tych etapów. Oto kilka przykładów etapów, które można zautomatyzować:

    • Uruchamianie klastra usługi Databricks
    • Konfigurowanie interfejsu wiersza polecenia usługi Databricks
    • Instalowanie narzędzi Języka Scala
    • Dodawanie wpisów tajnych usługi Databricks

    Należy również rozważyć napisanie zautomatyzowanych testów integracji w celu poprawy jakości i niezawodności kodu usługi Databricks oraz jego cyklu życia.

Wdrażanie tego scenariusza

Aby wdrożyć i uruchomić implementację referencyjną, wykonaj kroki opisane w pliku readme usługi GitHub.

Następny krok