Udostępnij za pośrednictwem


Wzorce zadań replikacji zdarzeń

Omówienie federacji i omówienie funkcji replikatora wyjaśniają uzasadnienie i podstawowe elementy zadań replikacji. Zaleca się zapoznanie się z nimi przed kontynuowaniem pracy z tym artykułem.

W tym artykule szczegółowo przedstawiono wskazówki dotyczące implementacji kilku wzorców wyróżnionych w sekcji przeglądu.

Replikacja

Wzorzec replikacji kopiuje zdarzenia z jednego centrum zdarzeń do następnego lub z centrum zdarzeń do innego miejsca docelowego, takiego jak kolejka usługi Service Bus. Zdarzenia są przekazywane bez wprowadzania żadnych modyfikacji ładunku zdarzenia.

Implementacja tego wzorca jest objęta replikacją zdarzeń między usługą Event Hubs i replikacją zdarzeń między usługą Event Hubs i przykładami usługi Service Bus oraz samouczkiem Use Apache Kafka MirrorMaker with Event Hubs (Używanie narzędzia Apache Kafka MirrorMaker z usługą Event Hubs) w celu uzyskania konkretnego przypadku replikowania danych z brokera platformy Apache Kafka do usługi Event Hubs.

Strumienie i zachowanie porządku

Replikacja za pośrednictwem usługi Azure Functions lub Azure Stream Analytics nie ma na celu zapewnienia utworzenia dokładnych klonów 1:1 źródłowego centrum zdarzeń w docelowym centrum zdarzeń, ale koncentruje się na zachowaniu względnej kolejności zdarzeń, w których aplikacja tego wymaga. Aplikacja komunikuje się z tym przez grupowanie powiązanych zdarzeń przy użyciu tego samego klucza partycji, a usługa Event Hubs rozmieszcza komunikaty z tym samym kluczem partycji sekwencyjnie w tej samej partycji.

Ważne

Informacje "przesunięcie" są unikatowe dla każdego centrum zdarzeń i przesunięcia dla tych samych zdarzeń będą się różnić w przypadku wystąpień centrum zdarzeń. Aby zlokalizować pozycję w skopiowanym strumieniu zdarzeń, użyj przesunięć opartych na czasie i odwołaj się do propagowanych metadanych przypisanych do usługi.

Przesunięcia oparte na czasie uruchamiają odbiornik w określonym punkcie w czasie:

  • EventPosition.FromStart() — ponownie odczytaj wszystkie zachowane dane.
  • EventPosition.FromEnd() — odczyt wszystkich nowych danych z czasu połączenia.
  • EventPosition.FromEnqueuedTime(dateTime) — wszystkie dane rozpoczynające się od danej daty i godziny.

W elemecie EventProcessor należy ustawić pozycję za pośrednictwem elementu InitialOffsetProvider w obiekcie EventProcessorOptions. W przypadku innych interfejsów API odbiornika pozycja jest przekazywana przez konstruktor.

Wstępnie skompilowane narzędzia pomocnicze funkcji replikacji udostępniane jako przykłady używane w wytycznych opartych na usłudze Azure Functions zapewniają, że strumienie zdarzeń z tym samym kluczem partycji pobranym z partycji źródłowej są przesyłane do docelowego centrum zdarzeń jako partii w oryginalnym strumieniu i z tym samym kluczem partycji.

Jeśli liczba partycji źródłowego i docelowego centrum zdarzeń jest identyczna, wszystkie strumienie w obiekcie docelowym będą mapować na te same partycje, co w źródle. Jeśli liczba partycji jest inna, co ma znaczenie w niektórych dalszych wzorcach opisanych w poniższym, mapowanie będzie się różnić, ale strumienie są zawsze przechowywane razem i w kolejności.

Względna kolejność zdarzeń należących do różnych strumieni lub niezależnych zdarzeń bez klucza partycji w partycji docelowej może zawsze różnić się od partycji źródłowej.

Metadane przypisane przez usługę

Metadane przypisane przez usługę zdarzenia uzyskane ze źródłowego centrum zdarzeń, oryginalny czas kolejkowania, numer sekwencji i przesunięcie, są zastępowane nowymi wartościami przypisanymi przez usługę w docelowym centrum zdarzeń, ale z funkcjami pomocnika zadania replikacji podane w naszych przykładach oryginalne wartości są zachowywane we właściwościach użytkownika: repl-enqueue-time (ISO8601 ciąg), repl-sequence, repl-offset.

Te właściwości są typu ciąg i zawierają ciągyfikowaną wartość odpowiednich oryginalnych właściwości. Jeśli zdarzenie jest przekazywane wiele razy, metadane przypisane przez usługę natychmiastowego źródła są dołączane do już istniejących właściwości z wartościami rozdzielonymi średnikami.

Tryb failover

Jeśli używasz replikacji do celów odzyskiwania po awarii, aby chronić przed regionalnymi zdarzeniami dostępności w usłudze Event Hubs lub przed przerwami w działaniu sieci, każdy taki scenariusz awarii będzie wymagał przejścia w tryb failover z jednego centrum zdarzeń do następnego, informując producentów i/lub konsumentów o używaniu pomocniczego punktu końcowego.

W przypadku wszystkich scenariuszy trybu failover zakłada się, że wymagane elementy przestrzeni nazw są strukturalnie identyczne, co oznacza, że usługi Event Hubs i grupy odbiorców są identycznie nazwane i że reguły sygnatur dostępu współdzielonego i/lub reguły kontroli dostępu opartej na rolach są konfigurowane w taki sam sposób. Możesz utworzyć (i zaktualizować) pomocniczą przestrzeń nazw, postępując zgodnie ze wskazówkami dotyczącymi przenoszenia przestrzeni nazw i pomijania kroku oczyszczania.

Aby zmusić producentów i konsumentów do przełączenia, należy udostępnić informacje o przestrzeni nazw, która ma być używana do wyszukiwania w lokalizacji, która jest łatwa do uzyskania i zaktualizowania. Jeśli producenci lub konsumenci napotykają częste lub trwałe błędy, powinni skonsultować się z tym miejscem i dostosować konfigurację. Istnieje wiele sposobów udostępniania tej konfiguracji, ale wskazujemy dwa w następujących kwestiach: DNS i udziały plików.

Konfiguracja trybu failover oparta na systemie DNS

Jednym z kandydatów jest utrzymywanie informacji w rekordach SRV DNS w kontrolce DNS i wskazywaniu odpowiednich punktów końcowych centrum zdarzeń.

Ważne

Należy pamiętać, że usługa Event Hubs nie zezwala na bezpośrednie aliasy punktów końcowych z rekordami CNAME, co oznacza, że użyjesz systemu DNS jako odpornego mechanizmu wyszukiwania dla adresów końcowych, a nie do bezpośredniego rozpoznawania informacji o adresach IP.

Załóżmy, że jesteś właścicielem domeny example.com i dla aplikacji strefą test.example.com. W przypadku dwóch alternatywnych usługi Event Hubs utworzysz teraz dwie kolejne strefy zagnieżdżone i rekord SRV w każdym z nich.

Rekordy SRV są zgodne ze wspólną konwencją z prefiksem _azure_eventhubs._amqp i zawierają dwa rekordy punktu końcowego: jeden dla protokołu AMQP-over-TLS na porcie 5671 i jeden dla protokołu AMQP-over-WebSockets na porcie 443, zarówno wskazując punkt końcowy usługi Event Hubs przestrzeni nazw odpowiadającej strefie.

Strefa Rekord SRV
eh1.test.example.com _azure_servicebus._amqp.eh1.test.example.com
1 1 5671 eh1-test-example-com.servicebus.windows.net
2 2 443 eh1-test-example-com.servicebus.windows.net
eh2.test.example.com _azure_servicebus._amqp.eh2.test.example.com
1 1 5671 eh2-test-example-com.servicebus.windows.net
2 2 443 eh2-test-example-com.servicebus.windows.net

W strefie aplikacji utworzysz wpis CNAME wskazujący strefę podrzędną odpowiadającą głównemu centrum zdarzeń:

Rekord CNAME Alias
eventhub.test.example.com eh1.test.example.com

Korzystając z klienta DNS, który umożliwia jawne wykonywanie zapytań dotyczących rekordów CNAME i SRV (wbudowanych klientów języka Java i platformy .NET zezwala tylko na proste rozpoznawanie nazw na adresy IP), można następnie rozpoznać żądany punkt końcowy. Na przykład w przypadku DnsClient.NET funkcja wyszukiwania to:

static string GetEventHubName(string aliasName)
{
    const string SrvRecordPrefix = "_azure_eventhub._amqp.";
    LookupClient lookup = new LookupClient();

    return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
            from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
            where srv.Port == 5671
            select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}

Funkcja zwraca docelową nazwę hosta zarejestrowaną dla portu 5671 strefy, która jest obecnie aliasem cNAME, jak pokazano powyżej.

Przejście w tryb failover wymaga edytowania rekordu CNAME i wskazywania go w strefie alternatywnej.

Zaletą korzystania z usługi DNS, a w szczególności usługi Azure DNS, jest to, że informacje usługi Azure DNS są globalnie replikowane i dlatego odporne na awarie w jednym regionie.

Ta procedura jest podobna do sposobu działania geo-odzyskiwania po awarii usługi Event Hubs, ale w pełni pod własną kontrolą, a także działa w scenariuszach aktywnych/aktywnych.

Konfiguracja trybu failover opartego na udziale plików

Najprostszą alternatywą dla używania systemu DNS do udostępniania informacji o punkcie końcowym jest umieszczenie nazwy podstawowego punktu końcowego w pliku zwykłego tekstu i udostępnienie pliku z infrastruktury, która jest niezawodna w przypadku awarii i nadal zezwala na aktualizacje.

Jeśli już uruchomiono infrastrukturę witryn sieci Web o wysokiej dostępności z globalną dostępnością i replikacją zawartości, dodaj tam taki plik i opublikuj go ponownie, jeśli jest potrzebny przełącznik.

Uwaga

W ten sposób należy opublikować tylko nazwę punktu końcowego, a nie pełną parametry połączenia zawierającą wpisy tajne.

Dodatkowe zagadnienia dotyczące przełączania użytkowników w tryb failover

W przypadku użytkowników usługi Event Hub dalsze zagadnienia dotyczące strategii trybu failover zależą od potrzeb procesora zdarzeń.

Jeśli wystąpi awaria, która wymaga ponownego skompilowania systemu, w tym baz danych, z danych kopii zapasowej, a bazy danych są przekazywane bezpośrednio lub za pośrednictwem pośredniego przetwarzania ze zdarzeń przechowywanych w centrum zdarzeń, należy przywrócić kopię zapasową, a następnie uruchomić ponowne odtwarzanie zdarzeń do systemu od momentu utworzenia kopii zapasowej bazy danych, a nie od momentu zniszczenia oryginalnego systemu.

Jeśli awaria ma wpływ tylko na fragment systemu, a nawet tylko pojedyncze centrum zdarzeń, które stało się niedostępne, prawdopodobnie zechcesz kontynuować przetwarzanie zdarzeń z około tej samej pozycji, w której przetwarzanie zostało przerwane.

Aby zrealizować dowolny scenariusz i użyć procesora zdarzeń odpowiedniego zestawu Azure SDK, utworzysz nowy magazyn punktów kontrolnych i udostępnisz początkową pozycję partycji na podstawie znacznika czasu, z którego chcesz wznowić przetwarzanie.

Jeśli nadal masz dostęp do magazynu punktów kontrolnych centrum zdarzeń, z którego się odejdziesz, propagowane metadane omówione powyżej ułatwią pominięcie zdarzeń, które zostały już obsłużone i wznowione dokładnie z miejsca, w którym ostatnio zostało przerwane.

Scal

Wzorzec scalania zawiera co najmniej jedno zadanie replikacji wskazujące jeden element docelowy, prawdopodobnie współbieżnie z regularnymi producentami, którzy również wysyłają zdarzenia do tego samego celu.

Odmiany tych patterów to:

  • Co najmniej dwie funkcje replikacji jednocześnie uzyskują zdarzenia z oddzielnych źródeł i wysyłają je do tego samego obiektu docelowego.
  • Jeszcze jedna funkcja replikacji uzyskująca zdarzenia ze źródła, podczas gdy element docelowy jest również używany bezpośrednio przez producentów.
  • Poprzedni wzorzec, ale dublowany między co najmniej dwoma centrami zdarzeń, co powoduje, że te centra zdarzeń zawierają te same strumienie, niezależnie od tego, gdzie są generowane zdarzenia.

Pierwsze dwie odmiany wzorca są proste i nie różnią się od zwykłych zadań replikacji.

Ostatni scenariusz wymaga ponownego zreplikowania zreplikowanych zdarzeń. Technika została pokazana i wyjaśniona w przykładzie EventHubToEventHubMerge .

Redaktor

Wzorzec edytora opiera się na wzorcu replikacji , ale komunikaty są modyfikowane przed ich przekazaniem.

Przykłady takich modyfikacji to:

  • Transkodowanie — jeśli zawartość zdarzenia (nazywana również zawartością "treść" lub "ładunek") dociera ze źródła zakodowanego przy użyciu formatu Apache Avro lub zastrzeżonego formatu serializacji, ale oczekiwanie systemu będącego właścicielem obiektu docelowego jest dla zawartości zakodowanej w formacie JSON, zadanie replikacji transkodowania najpierw zdeserializuje ładunek z apache Avro do grafu obiektu w pamięci, a następnie serializuje ten graf do formatu JSON format zdarzenia, które jest przekazywane. Transkodowanie obejmuje również zadania kompresji i dekompresacji zawartości.
  • Przekształcanie — zdarzenia zawierające dane ustrukturyzowane mogą wymagać zmiany tych danych w celu łatwiejszego użycia przez odbiorców podrzędnych. Może to obejmować pracę, taką jak spłaszczanie zagnieżdżonych struktur, oczyszczanie nadmiarowych elementów danych lub zmiana ładunku w taki sposób, aby dokładnie pasował do danego schematu.
  • Przetwarzanie wsadowe — zdarzenia mogą być odbierane w partiach (wiele zdarzeń w jednym transferze) ze źródła, ale muszą być przekazywane pojedynczo do miejsca docelowego lub odwrotnie. W związku z tym zadanie może przekazywać wiele zdarzeń na podstawie jednego transferu zdarzeń wejściowych lub agregować zestaw zdarzeń, które następnie są przesyłane razem.
  • Walidacja — dane zdarzeń ze źródeł zewnętrznych często muszą być sprawdzane pod kątem zgodności z zestawem reguł przed ich przekazaniem. Reguły mogą być wyrażane przy użyciu schematów lub kodu. Zdarzenia, które nie są zgodne, mogą zostać porzucone, z problemem zanotowanym w dziennikach lub mogą być przekazywane do specjalnego docelowego miejsca docelowego, aby obsłużyć je dalej.
  • Wzbogacanie — dane zdarzeń pochodzące z niektórych źródeł mogą wymagać wzbogacania z dalszym kontekstem, aby można je było używać w systemach docelowych. Może to obejmować wyszukanie danych referencyjnych i osadzanie tych danych ze zdarzeniem lub dodanie informacji o źródle, które jest znane zadaniu replikacji, ale nie zawarte w zdarzeniach.
  • Filtrowanie — niektóre zdarzenia pochodzące ze źródła mogą być wstrzymane z obiektu docelowego na podstawie określonej reguły. Filtr testuje zdarzenie względem reguły i odrzuca zdarzenie, jeśli zdarzenie nie jest zgodne z regułą. Filtrowanie zduplikowanych zdarzeń przez obserwowanie określonych kryteriów i porzucanie kolejnych zdarzeń o tych samych wartościach jest formą filtrowania.
  • Kryptografia — zadanie replikacji może wymagać odszyfrowywania zawartości pochodzącej ze źródła i/lub szyfrowania zawartości przesyłanej dalej do obiektu docelowego i/lub może być konieczne zweryfikowanie integralności zawartości i metadanych względem podpisu przenoszonego w przypadku lub dołączenia takiego podpisu.
  • Zaświadczanie — zadanie replikacji może dołączać metadane, potencjalnie chronione przez podpis cyfrowy, do zdarzenia, które potwierdza, że zdarzenie zostało odebrane za pośrednictwem określonego kanału lub w określonym czasie.
  • Łańcuch — zadanie replikacji może stosować podpisy do strumieni zdarzeń, tak aby integralność strumienia strumienia jest chroniona i można wykryć brakujące zdarzenia.

Wzorce przekształcania, dzielenia na partie i wzbogacania są zwykle najlepiej implementowane za pomocą zadań usługi Azure Stream Analytics .

Wszystkie te wzorce można zaimplementować przy użyciu usługi Azure Functions, używając wyzwalacza usługi Event Hubs do uzyskiwania zdarzeń i powiązania wyjściowego centrum zdarzeń na potrzeby ich dostarczania.

Routing

Wzorzec routingu opiera się na wzorcu replikacji, ale zamiast jednego źródła i jednego miejsca docelowego zadanie replikacji ma wiele obiektów docelowych, zilustrowanych tutaj w języku C#:

[FunctionName("EH2EH")]
public static async Task Run(
    [EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
    [EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
    [EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
    ILogger log)
{
    foreach (EventData eventData in events)
    {
        // send to output1 and/or output2 based on criteria
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
        });
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
        });
    }
}

Funkcja routingu będzie uwzględniać metadane komunikatów i/lub ładunek komunikatu, a następnie wybrać jedno z dostępnych miejsc docelowych do wysłania.

W usłudze Azure Stream Analytics można osiągnąć to samo, definiując wiele danych wyjściowych, a następnie wykonując zapytanie na dane wyjściowe.

select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2

Projekcja dziennika

Wzorzec projekcji dziennika spłaszcza strumień zdarzeń na indeksowaną bazę danych, a zdarzenia stają się rekordami w bazie danych. Zazwyczaj zdarzenia są dodawane do tej samej kolekcji lub tabeli, a klucz partycji centrum zdarzeń staje się częścią klucza podstawowego, który szuka unikatowego rekordu.

Projekcja dziennika może wygenerować historyk szeregów czasowych danych zdarzeń lub zwarty widok, w którym tylko najnowsze zdarzenie jest zachowywane dla każdego klucza partycji. Kształt docelowej bazy danych zależy ostatecznie od Ciebie i potrzeb twojej aplikacji. Ten wzorzec jest również określany jako "określanie źródła zdarzeń".

Napiwek

Projekcje dzienników można łatwo tworzyć w usługach Azure SQL Database i Azure Cosmos DB w usłudze Azure Stream Analytics. Warto też wybrać tę opcję.

Poniższa funkcja platformy Azure projektuje zawartość centrum zdarzeń skompaktowanych do kolekcji usługi Azure Cosmos DB.

[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
    [EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
    [CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
    ILogger log)
{
    foreach (var ev in input)
    {
        if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
        {
            var record = new
            {
                id = ev.SystemProperties.PartitionKey,
                data = JsonDocument.Parse(ev.Body),
                properties = ev.Properties
            };
            await output.AddAsync(record);
        }
    }
}

Następne kroki