Przetwarzanie strumieni za pomocą usługi Azure Databricks

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

Ta architektura referencyjna przedstawia pełny potok przetwarzania strumienia. Cztery etapy tego potoku są pozyskiwane, przetwarzane, przechowywane i analizowane i raportowane. W przypadku tej architektury referencyjnej potok pozyskuje dane z dwóch źródeł, wykonuje sprzężenie na powiązanych rekordach z każdego strumienia, wzbogaca wynik i oblicza średnią w czasie rzeczywistym. Wyniki są następnie przechowywane do dalszej analizy.

Logo usługi GitHub Implementacja referencyjna dla tej architektury jest dostępna w usłudze GitHub.

Architektura

Diagram przedstawiający architekturę referencyjną przetwarzania strumieniowego za pomocą usługi Azure Databricks.

Pobierz plik programu Visio dla tej architektury.

Przepływ pracy

Poniższy przepływ danych odpowiada poprzedniemu diagramowi:

  1. W tej architekturze istnieją dwa źródła danych, które generują strumienie danych w czasie rzeczywistym. Pierwszy strumień zawiera informacje o przejazdach, a drugi strumień zawiera informacje o taryfie. Architektura referencyjna zawiera symulowany generator danych, który odczytuje z zestawu plików statycznych i wypycha dane do usługi Azure Event Hubs. Źródła danych w rzeczywistej aplikacji to urządzenia zainstalowane w taksówkach.

  2. usługi Event Hubs jest usługą pozyskiwania zdarzeń. Ta architektura używa dwóch wystąpień centrum zdarzeń, po jednym dla każdego źródła danych. Każde źródło danych wysyła strumień danych do skojarzonego centrum zdarzeń.

  3. azure Databricks to platforma analityczna oparta na platformie Apache Spark zoptymalizowana pod kątem platformy usług w chmurze Microsoft Azure. Usługa Azure Databricks służy do korelowania danych przejazdu taksówką i taryfy oraz wzbogacania skorelowanych danych z danymi sąsiedztwa przechowywanymi w systemie plików usługi Azure Databricks.

  4. usługi Azure Cosmos DB jest w pełni zarządzaną usługą bazy danych z wieloma modelami. Dane wyjściowe zadania usługi Azure Databricks to seria rekordów, które są zapisywane w usłudze Azure Cosmos DB dla usługi Apache Cassandra. Usługa Azure Cosmos DB dla systemu Apache Cassandra jest używana, ponieważ obsługuje modelowanie danych szeregów czasowych.

  5. log analytics to narzędzie w usłudze Azure Monitor, które umożliwia wykonywanie zapytań i analizowanie danych dziennika z różnych źródeł. Dane dziennika aplikacji zbierane usługi Azure Monitor są przechowywane w obszarze roboczym usługi Log Analytics . Za pomocą zapytań usługi Log Analytics można analizować i wizualizować metryki oraz sprawdzać komunikaty dziennika w celu identyfikowania problemów w aplikacji.

Szczegóły scenariusza

Firma taksówkarska zbiera dane dotyczące każdej podróży taksówką. W tym scenariuszu przyjęto założenie, że dwa oddzielne urządzenia wysyłają dane. Taksówka ma miernik, który wysyła informacje o poszczególnych przejazdach, w tym czas trwania, odległość oraz miejsca odbioru i odbioru. Oddzielne urządzenie akceptuje płatności od klientów i wysyła dane dotyczące taryf. Aby dostrzec trendy jazdy, firma taksówkarska chce obliczyć średnią wskazówkę na milę napędzaną dla każdej dzielnicy, w czasie rzeczywistym.

Pozyskiwanie danych

Aby zasymulować źródło danych, ta architektura referencyjna korzysta z zestawu danych danych taksówek w Nowym Jorku1. Ten zestaw danych zawiera dane dotyczące przejazdów taksówką w Nowym Jorku od 2010 do 2013 roku. Zawiera zarówno rekordy danych przejazdu, jak i taryfy. Dane dotyczące przejazdu obejmują czas trwania podróży, odległość podróży oraz lokalizacje odbioru i odbioru. Dane taryfy obejmują opłaty, podatki i kwoty porad. Pola w obu typach rekordów obejmują numer medalionu, licencję hack i identyfikator dostawcy. Połączenie tych trzech pól jednoznacznie identyfikuje taksówkę i kierowcę. Dane są przechowywane w formacie CSV.

[1] Donovan, Brian; Praca, Dan (2016): New York City Taxi Trip Data (2010-2013). Uniwersytet Illinois w Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Generator danych to aplikacja platformy .NET Core, która odczytuje rekordy i wysyła je do usługi Event Hubs. Generator wysyła dane przejazdu w formacie JSON i taryfy w formacie CSV.

Usługa Event Hubs używa partycji do segmentowania danych. Partycje umożliwiają użytkownikowi równoległe odczytywanie każdej partycji. Podczas wysyłania danych do usługi Event Hubs można bezpośrednio określić klucz partycji. W przeciwnym razie rekordy są przypisywane do partycji w sposób okrężny.

W tym scenariuszu dane przejazdu i dane taryfy powinny mieć przypisany ten sam identyfikator partycji dla określonej taksówki. To przypisanie umożliwia usłudze Databricks zastosowanie stopnia równoległości podczas korelowania dwóch strumieni. Na przykład rekord w partycji n danych przejazdu pasuje do rekordu w partycji n danych taryfy.

Diagram przetwarzania strumieniowego za pomocą usług Azure Databricks i Event Hubs.

Pobierz plik programu Visio tej architektury.

W generatorze danych wspólny model danych dla obu typów rekordów ma PartitionKey właściwość, która jest łączeniem Medallion, HackLicensei VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Ta właściwość służy do podawania jawnego klucza partycji podczas wysyłania danych do usługi Event Hubs.

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Pojemność przepływności usługi Event Hubs jest mierzona w jednostkach przepływności. Możesz automatycznie skalować centrum zdarzeń, włączając automatyczne rozszerzanie. Ta funkcja automatycznie skaluje jednostki przepływności na podstawie ruchu do skonfigurowanej maksymalnej wartości.

Przetwarzanie strumieniowe

W usłudze Azure Databricks zadanie wykonuje przetwarzanie danych. Zadanie jest przypisywane do klastra, a następnie uruchamiane na nim. Zadanie może być kodem niestandardowym napisanym w języku Java lub notesem platformy Spark .

W tej architekturze referencyjnej zadanie to archiwum Języka Java, które zawiera klasy napisane w językach Java i Scala. Po określeniu archiwum Java dla zadania usługi Databricks klaster usługi Databricks określa klasę dla operacji. main Tutaj metoda com.microsoft.pnp.TaxiCabReader klasy zawiera logikę przetwarzania danych.

Odczytywanie strumienia z dwóch wystąpień centrum zdarzeń

Logika przetwarzania danych używa przesyłania strumieniowego ze strukturą platformy Spark do odczytu z dwóch wystąpień centrum zdarzeń platformy Azure:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Wzbogacanie danych o informacje o sąsiedztwie

Dane dotyczące przejazdu obejmują współrzędne geograficzne i współrzędne geograficzne lokalizacji odbioru i odbioru. Te współrzędne są przydatne, ale nie są łatwe do analizy. W związku z tym te dane są wzbogacone o dane sąsiedztwa odczytywane z pliku kształtu .

Format pliku shape jest binarny i nie jest łatwo analizowany. Jednak biblioteka GeoTools udostępnia narzędzia do danych geoprzestrzennych korzystających z formatu pliku shape. Ta biblioteka jest używana w klasie com.microsoft.pnp.GeoFinder w celu określenia nazwy sąsiedztwa na podstawie współrzędnych dla lokalizacji odbioru i odbioru.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Dołączanie do danych dotyczących przejazdu i taryfy

Najpierw dane dotyczące przejazdu i taryfy są przekształcane:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Następnie dane dotyczące przejazdu są łączone z danymi taryfy:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Przetwarzanie danych i wstawianie ich do usługi Azure Cosmos DB

Średnia kwota taryfy dla każdej dzielnicy jest obliczana dla określonego przedziału czasu:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Średnia kwota taryfy jest następnie wstawiana do usługi Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Kwestie wymagające rozważenia

Te zagadnienia implementują filary platformy Azure Well-Architected Framework, która jest zestawem wytycznych, których można użyć do poprawy jakości obciążenia. Aby uzyskać więcej informacji, zobacz Microsoft Azure Well-Architected Framework.

Zabezpieczenia

Zabezpieczenia zapewniają ochronę przed celowymi atakami i nieprawidłowym użyciem cennych danych i systemów. Aby uzyskać więcej informacji, zobacz Lista kontrolna przeglądu projektu dotyczącazabezpieczeń.

Dostęp do obszaru roboczego usługi Azure Databricks jest kontrolowany przy użyciu konsoli administratora . Konsola administratora obejmuje funkcje dodawania użytkowników, zarządzania uprawnieniami użytkowników i konfigurowania logowania jednokrotnego. Kontrola dostępu dla obszarów roboczych, klastrów, zadań i tabel można również ustawić za pomocą konsoli administratora.

Zarządzanie wpisami tajnymi

Usługa Azure Databricks zawiera magazyn wpisów tajnych używany do przechowywania poświadczeń i odwołowania się do nich w notesach i zadaniach. Zakresy partycjonowania wpisów tajnych w magazynie wpisów tajnych usługi Azure Databricks:

databricks secrets create-scope --scope "azure-databricks-job"

Wpisy tajne są dodawane na poziomie zakresu:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Uwaga

Użyj zakresu opartego na usłudze Azure Key Vault zamiast natywnego zakresu usługi Azure Databricks.

W kodzie dostęp do wpisów tajnych jest uzyskiwany za pośrednictwem narzędzi wpisów tajnych usługi Azure Databricks.

Optymalizacja kosztów

Optymalizacja kosztów koncentruje się na sposobach zmniejszenia niepotrzebnych wydatków i poprawy wydajności operacyjnej. Aby uzyskać więcej informacji, zobacz Lista kontrolna przeglądu projektu dlaoptymalizacji kosztów.

Koszty możesz szacować za pomocą kalkulatora cen platformy Azure. Rozważ następujące usługi używane w tej architekturze referencyjnej.

Zagadnienia dotyczące kosztów usługi Event Hubs

Ta architektura referencyjna wdraża usługę Event Hubs w warstwie Standardowa. Model cen jest oparty na jednostkach przepływności, zdarzeniach ruchu przychodzącego i przechwytywaniu zdarzeń. Zdarzenie ruchu przychodzącego to jednostka danych o rozmiarze 64 KB lub mniej. Większe komunikaty są rozliczane jako wielokrotność 64 KB. Jednostki przepływności można określić za pośrednictwem witryny Azure Portal lub interfejsów API zarządzania usługi Event Hubs.

Jeśli potrzebujesz więcej dni przechowywania, rozważ warstwę Dedykowana. Ta warstwa zapewnia wdrożenia z jedną dzierżawą, które mają rygorystyczne wymagania. Ta oferta tworzy klaster oparty na jednostkach pojemności i nie jest zależny od jednostek przepływności. Warstwa Standardowa jest również rozliczana na podstawie zdarzeń ruchu przychodzącego i jednostek przepływności.

Aby uzyskać więcej informacji, zobacz cennik usługi Event Hubs.

Zagadnienia dotyczące kosztów usługi Azure Databricks

Usługa Azure Databricks zapewnia warstwę Standardowa i warstwę Premium, z których obie obsługują trzy obciążenia. Ta architektura referencyjna wdraża obszar roboczy usługi Azure Databricks w warstwie Premium.

Obciążenia inżynieryjne danych powinny być uruchamiane w klastrze zadań. Inżynierowie danych używają klastrów do kompilowania i wykonywania zadań. Obciążenia analizy danych powinny być uruchamiane w klastrze ogólnego przeznaczenia i przeznaczone dla analityków danych do interaktywnego eksplorowania, wizualizowania, manipulowania nimi i udostępniania danych i szczegółowych informacji.

Usługa Azure Databricks udostępnia wiele modeli cenowych.

  • planu z płatnością zgodnie z rzeczywistym użyciem

    Opłaty są naliczane za maszyny wirtualne aprowidowane w klastrach i jednostkach usługi Azure Databricks (DBU) na podstawie wybranego wystąpienia maszyny wirtualnej. Jednostka DBU to jednostka możliwości przetwarzania, która jest rozliczana według użycia na sekundę. Użycie jednostek DBU zależy od rozmiaru i typu wystąpienia uruchomionego w usłudze Azure Databricks. Ceny zależą od wybranego obciążenia i warstwy.

  • planu przed zakupem

    Zatwierdzasz jednostki DBU jako jednostki zatwierdzeń usługi Azure Databricks przez co najmniej trzy lata, aby zmniejszyć całkowity koszt posiadania w tym okresie w porównaniu z modelem płatności zgodnie z rzeczywistym użyciem.

Aby uzyskać więcej informacji, zobacz cennik usługi Azure Databricks.

Zagadnienia dotyczące kosztów usługi Azure Cosmos DB

W tej architekturze zadanie usługi Azure Databricks zapisuje serię rekordów w usłudze Azure Cosmos DB. Opłaty są naliczane za rezerwową pojemność mierzoną w jednostkach żądań na sekundę (RU/s). Ta pojemność służy do wykonywania operacji wstawiania. Jednostka rozliczeń wynosi 100 RU/s na godzinę. Na przykład koszt pisania 100 KB elementów wynosi 50 RU/s.

W przypadku operacji zapisu aprowizuj wystarczającą pojemność, aby obsługiwać liczbę operacji zapisu na sekundę. Aprowizowaną przepływność można zwiększyć przy użyciu portalu lub interfejsu wiersza polecenia platformy Azure przed wykonaniem operacji zapisu, a następnie zmniejszyć przepływność po zakończeniu tych operacji. Przepływność okresu zapisu jest sumą minimalnej przepływności wymaganej dla określonych danych i przepływności wymaganej dla operacji wstawiania. W tym obliczeniu przyjęto założenie, że nie jest uruchomione żadne inne obciążenie.

Przykładowa analiza kosztów

Załóżmy, że skonfigurujesz wartość przepływności 1000 RU/s w kontenerze. Jest wdrażana przez 24 godziny przez 30 dni przez łącznie 720 godzin.

Opłaty za kontener są naliczane na 10 jednostek 100 RU/s na godzinę dla każdej godziny. Dziesięć jednostek na 0,008 USD (za 100 RU/s na godzinę) są naliczane na 0,08 USD za godzinę.

W przypadku 720 godzin lub 7200 jednostek (z 100 jednostek RU) opłaty są naliczane w wysokości 57,60 USD za miesiąc.

Opłaty za magazyn są również naliczane za każdy GB używany dla przechowywanych danych i indeksu. Aby uzyskać więcej informacji, zobacz Model cen usługi Azure Cosmos DB.

Skorzystaj z kalkulatora pojemności usługi Azure Cosmos DB , aby szybko oszacować koszt obciążenia.

Doskonałość operacyjna

Doskonałość operacyjna obejmuje procesy operacyjne, które wdrażają aplikację i działają w środowisku produkcyjnym. Aby uzyskać więcej informacji, zobacz Lista kontrolna przeglądu projektu dotycząca doskonałości operacyjnej.

Monitorowanie

Usługa Azure Databricks jest oparta na platformie Apache Spark. Zarówno usługa Azure Databricks, jak i platforma Apache Spark używają Apache Log4j jako standardowej biblioteki rejestrowania. Oprócz domyślnego rejestrowania zapewnianych przez platformę Apache Spark można zaimplementować rejestrowanie w usłudze Log Analytics. Aby uzyskać więcej informacji, zobacz Monitorowanie usługi Azure Databricks.

Ponieważ klasa com.microsoft.pnp.TaxiCabReader przetwarza komunikaty dotyczące przejazdu i taryfy, komunikat może być źle sformułowany i dlatego nieprawidłowy. W środowisku produkcyjnym ważne jest analizowanie tych źle sformułowanych komunikatów w celu zidentyfikowania problemu ze źródłami danych, dzięki czemu można je szybko naprawić, aby zapobiec utracie danych. Klasa com.microsoft.pnp.TaxiCabReader rejestruje akumulator platformy Apache Spark, który śledzi liczbę nieprawidłowo sformułowanych rekordów taryf i rekordów jazdy:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Platforma Apache Spark używa biblioteki Dropwizard do wysyłania metryk. Niektóre z natywnych pól metryk Dropwizard są niezgodne z usługą Log Analytics, dlatego ta architektura referencyjna zawiera niestandardowy ujście i reporter dropwizard. Formatuje metryki w formacie oczekiwanym przez usługę Log Analytics. W przypadku raportowania metryk platformy Apache Spark wysyłane są również metryki niestandardowe dla źle sformułowanych danych przejazdu i taryfy.

Aby monitorować działanie zadania przesyłania strumieniowego, możesz użyć następujących przykładowych zapytań w obszarze roboczym usługi Log Analytics. Argument ago(1d) w każdym zapytaniu zwraca wszystkie rekordy, które zostały wygenerowane w ciągu ostatniego dnia. Możesz dostosować ten parametr, aby wyświetlić inny okres.

Wyjątki rejestrowane podczas operacji zapytania strumienia

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Akumulacja źle sformułowanych taryf i danych przejazdu

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Operacja zadania w czasie

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Organizacja zasobów i wdrożenia

  • Utwórz oddzielne grupy zasobów dla środowisk produkcyjnych, programistycznych i testowych. Oddzielne grupy zasobów ułatwiają zarządzanie wdrożeniami, usuwanie wdrożeń testowych i przypisywanie praw dostępu.

  • Użyj szablonu usługi Azure Resource Manager, aby wdrożyć zasoby platformy Azure zgodnie z procesem infrastruktury jako kodu. Za pomocą szablonów można łatwo zautomatyzować wdrożenia za pomocą usług Azure DevOps services lub innych rozwiązań ciągłej integracji i ciągłego dostarczania (CI/CD).

  • Umieść każde obciążenie w osobnym szablonie wdrożenia i zapisz zasoby w systemach kontroli źródła. Szablony można wdrażać razem lub indywidualnie w ramach procesu ciągłej integracji/ciągłego wdrażania. Takie podejście upraszcza proces automatyzacji.

    W tej architekturze usługi Event Hubs, Log Analytics i Azure Cosmos DB są identyfikowane jako pojedyncze obciążenie. Te zasoby są uwzględniane w jednym szablonie usługi Azure Resource Manager.

  • Rozważ przemieszczanie obciążeń. Przed przejściem do następnego etapu wdróż na różnych etapach i uruchom testy weryfikacyjne. Dzięki temu możesz kontrolować sposób wypychania aktualizacji do środowisk produkcyjnych i minimalizowania nieprzewidzianych problemów z wdrażaniem.

    W tej architekturze istnieje wiele etapów wdrażania. Rozważ utworzenie potoku usługi Azure DevOps i dodanie tych etapów. Możesz zautomatyzować następujące etapy:

    • Uruchom klaster usługi Databricks.
    • Konfigurowanie interfejsu wiersza polecenia usługi Databricks.
    • Zainstaluj narzędzia języka Scala.
    • Dodaj wpisy tajne usługi Databricks.

    Rozważ napisanie zautomatyzowanych testów integracji, aby poprawić jakość i niezawodność kodu usługi Databricks i jego cyklu życia.

Wdrażanie tego scenariusza

Aby wdrożyć i uruchomić implementację referencyjną, wykonaj kroki opisane w pliku readme usługi GitHub.

Następny krok