Udostępnij za pośrednictwem


Subskrybowanie zdarzeń

Napiwek

Ta zawartość jest fragmentem książki eBook, architektury mikrousług platformy .NET dla konteneryzowanych aplikacji platformy .NET dostępnych na platformie .NET Docs lub jako bezpłatnego pliku PDF, który można odczytać w trybie offline.

.NET Microservices Architecture for Containerized .NET Applications eBook cover thumbnail.

Pierwszym krokiem korzystania z magistrali zdarzeń jest subskrybowanie mikrousług do zdarzeń, które chcą odbierać. Ta funkcja powinna być wykonywana w mikrousługach odbiorcy.

Poniższy prosty kod pokazuje, co każda mikrousługa odbiorcy musi zaimplementować podczas uruchamiania usługi (czyli w Startup klasie), aby subskrybować zdarzenia, których potrzebuje. W takim przypadku mikrousługa basket-api musi subskrybować ProductPriceChangedIntegrationEvent i OrderStartedIntegrationEvent komunikaty.

Na przykład podczas subskrybowania ProductPriceChangedIntegrationEvent zdarzenia mikrousługi koszyka informuje o wszelkich zmianach w cenie produktu i umożliwia użytkownikowi ostrzeżenie o zmianie, jeśli ten produkt znajduje się w koszyku użytkownika.

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

Po uruchomieniu tego kodu mikrousługę subskrybenta będzie nasłuchiwać za pośrednictwem kanałów RabbitMQ. Po nadejściu dowolnego komunikatu typu ProductPriceChangedIntegrationEvent kod wywołuje program obsługi zdarzeń przekazywany do niego i przetwarza zdarzenie.

Publikowanie zdarzeń za pośrednictwem magistrali zdarzeń

Na koniec nadawca komunikatu (mikrousługa pochodzenia) publikuje zdarzenia integracji z kodem podobnym do poniższego przykładu. (Takie podejście jest uproszczonym przykładem, który nie uwzględnia niepodzielności). Można zaimplementować podobny kod za każdym razem, gdy zdarzenie musi być propagowane w wielu mikrousługach, zwykle bezpośrednio po zatwierdzeniu danych lub transakcji z mikrousługi pochodzenia.

Najpierw obiekt implementacji magistrali zdarzeń (oparty na RabbitMQ lub oparty na magistrali usług) zostanie wstrzyknięty do konstruktora kontrolera, jak w poniższym kodzie:

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

Następnie użyjesz go z metod kontrolera, takich jak w metodzie UpdateProduct:

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

W takim przypadku, ponieważ mikrousługa pochodzenia jest prostą mikrousługą CRUD, kod jest umieszczany bezpośrednio w kontrolerze internetowego interfejsu API.

W bardziej zaawansowanych mikrousługach, takich jak w przypadku korzystania z metod CQRS, można ją zaimplementować w klasie w CommandHandler ramach Handle() metody .

Projektowanie niepodzielności i odporności podczas publikowania w magistrali zdarzeń

Podczas publikowania zdarzeń integracji za pośrednictwem rozproszonego systemu obsługi komunikatów, takiego jak magistrala zdarzeń, występuje problem niepodzielnego aktualizowania oryginalnej bazy danych i publikowania zdarzenia (czyli zarówno operacji zakończonych, jak i żadnego z nich). Na przykład w uproszczonym przykładzie przedstawionym wcześniej kod zatwierdza dane do bazy danych po zmianie ceny produktu, a następnie publikuje komunikat ProductPriceChangedIntegrationEvent. Początkowo może się wydawać, że te dwie operacje mogą być wykonywane niepodziealnie. Jeśli jednak używasz transakcji rozproszonej z udziałem bazy danych i brokera komunikatów, tak jak w starszych systemach, takich jak Microsoft Message Queuing (MSMQ), to podejście nie jest zalecane z powodów opisanych przez twierdzenie CAP.

Zasadniczo mikrousługi służą do tworzenia skalowalnych i wysoce dostępnych systemów. Upraszczając nieco, twierdzenie CAP mówi, że nie można utworzyć (rozproszonej) bazy danych (lub mikrousługi, która jest właścicielem modelu), która jest stale dostępna, silnie spójna i odporna na dowolną partycję. Należy wybrać dwie z tych trzech właściwości.

W architekturach opartych na mikrousługach należy wybrać dostępność i tolerancję. Należy też de-podkreślać silną spójność. W związku z tym w większości nowoczesnych aplikacji opartych na mikrousługach zwykle nie chcesz używać transakcji rozproszonych w komunikatach, tak jak podczas implementowania transakcji rozproszonych opartych na koordynatorze transakcji rozproszonych systemu Windows (DTC) za pomocą msMQ.

Wróćmy do początkowego problemu i jego przykładu. Jeśli usługa ulegnie awarii po zaktualizowaniu bazy danych (w tym przypadku bezpośrednio po wierszu kodu za _context.SaveChangesAsync()pomocą polecenia ), ale przed opublikowaniem zdarzenia integracji ogólny system może stać się niespójny. Takie podejście może mieć krytyczne znaczenie dla działania firmy, w zależności od określonej operacji biznesowej, z którą masz do czynienia.

Jak wspomniano wcześniej w sekcji architektury, możesz mieć kilka podejść do radzenia sobie z tym problemem:

  • Używanie pełnego wzorca określania źródła zdarzeń.

  • Korzystanie z wyszukiwania dzienników transakcji.

  • Za pomocą wzorca skrzynka nadawcza. Jest to tabela transakcyjna do przechowywania zdarzeń integracji (rozszerzanie transakcji lokalnych).

W tym scenariuszu użycie pełnego wzorca określania źródła zdarzeń (ES) jest jednym z najlepszych metod, jeśli nie jest to najlepsze. Jednak w wielu scenariuszach aplikacji może nie być możliwe zaimplementowanie pełnego systemu ES. ES oznacza przechowywanie tylko zdarzeń domeny w transakcyjnej bazie danych zamiast przechowywania bieżących danych stanu. Przechowywanie tylko zdarzeń domeny może mieć duże korzyści, takie jak posiadanie dostępnej historii systemu i możliwość określenia stanu systemu w dowolnym momencie w przeszłości. Jednak zaimplementowanie pełnego systemu ES wymaga ponownej architektury większości systemu i wprowadzenia wielu innych złożoności i wymagań. Na przykład chcesz użyć bazy danych specjalnie utworzonej do określania źródła zdarzeń, takiego jak Magazyn zdarzeń lub bazy danych zorientowanej na dokumenty, takiej jak Azure Cosmos DB, MongoDB, Cassandra, CouchDB lub RavenDB. ES jest doskonałym rozwiązaniem tego problemu, ale nie najprostszym rozwiązaniem, chyba że znasz już określanie źródła zdarzeń.

Opcja korzystania z funkcji wyszukiwania dzienników transakcji początkowo wygląda przezroczystie. Jednak aby użyć tego podejścia, mikrousługę należy połączyć z dziennikem transakcji RDBMS, takim jak dziennik transakcji programu SQL Server. Takie podejście prawdopodobnie nie jest pożądane. Kolejną wadą jest to, że aktualizacje niskiego poziomu zarejestrowane w dzienniku transakcji mogą nie być na tym samym poziomie co zdarzenia integracji wysokiego poziomu. Jeśli tak, proces odwrotnej inżynierii tych operacji dziennika transakcji może być trudny.

Zrównoważone podejście to kombinacja transakcyjnej tabeli bazy danych i uproszczonego wzorca ES. Możesz użyć stanu takiego jak "gotowy do opublikowania zdarzenia", który został ustawiony w oryginalnym zdarzeniu podczas zatwierdzania go w tabeli zdarzeń integracji. Następnie spróbujesz opublikować zdarzenie w magistrali zdarzeń. Jeśli akcja publikowania zdarzenia powiedzie się, uruchom kolejną transakcję w usłudze pochodzenia i przenieś stan z "gotowego do opublikowania zdarzenia" na "zdarzenie zostało już opublikowane".

Jeśli akcja publikowania zdarzenia w magistrali zdarzeń nie powiedzie się, dane nadal nie będą niespójne w mikrousłudze pochodzenia — nadal są one oznaczone jako "gotowe do opublikowania zdarzenia" i w odniesieniu do pozostałych usług, ostatecznie będą spójne. Zawsze możesz mieć zadania w tle sprawdzające stan transakcji lub zdarzeń integracji. Jeśli zadanie znajdzie zdarzenie w stanie "gotowy do opublikowania zdarzenia", może spróbować ponownie opublikować to zdarzenie w magistrali zdarzeń.

Zwróć uwagę, że w przypadku tego podejścia utrwalone są tylko zdarzenia integracji dla każdej mikrousługi pochodzenia i tylko zdarzenia, które mają komunikować się z innymi mikrousługami lub systemami zewnętrznymi. Z kolei w pełnym systemie ES przechowujesz również wszystkie zdarzenia domeny.

W związku z tym takie zrównoważone podejście jest uproszczonym systemem ES. Potrzebna jest lista zdarzeń integracji ze swoim bieżącym stanem ("gotowy do opublikowania" i "opublikowany"). Należy jednak zaimplementować tylko te stany dla zdarzeń integracji. W tym podejściu nie trzeba przechowywać wszystkich danych domeny jako zdarzeń w transakcyjnej bazie danych, tak jak w pełnym systemie ES.

Jeśli używasz już relacyjnej bazy danych, możesz użyć tabeli transakcyjnej do przechowywania zdarzeń integracji. Aby osiągnąć niepodzielność w aplikacji, należy użyć dwuetapowego procesu opartego na transakcjach lokalnych. Zasadniczo masz tabelę IntegrationEvent w tej samej bazie danych, w której masz jednostki domeny. Ta tabela działa jako ubezpieczenie umożliwiające osiągnięcie niepodzielności, dzięki czemu dołączono utrwalone zdarzenia integracji do tych samych transakcji, które zatwierdzają dane domeny.

Krok po kroku proces wygląda następująco:

  1. Aplikacja rozpoczyna transakcję lokalnej bazy danych.

  2. Następnie aktualizuje stan jednostek domeny i wstawia zdarzenie do tabeli zdarzeń integracji.

  3. Na koniec zatwierdza transakcję, więc uzyskasz żądaną niepodzielność, a następnie

  4. W jakiś sposób publikujesz zdarzenie (dalej).

Podczas implementowania kroków publikowania zdarzeń dostępne są następujące opcje:

  • Opublikuj zdarzenie integracji bezpośrednio po zatwierdzeniu transakcji i użyj innej transakcji lokalnej, aby oznaczyć zdarzenia w tabeli jako opublikowane. Następnie użyj tabeli jako artefaktu, aby śledzić zdarzenia integracji w przypadku problemów w zdalnych mikrousług i wykonywać akcje wyrównywujące na podstawie przechowywanych zdarzeń integracji.

  • Użyj tabeli jako rodzaju kolejki. Oddzielny wątek aplikacji lub proces wysyła zapytanie do tabeli zdarzeń integracji, publikuje zdarzenia w magistrali zdarzeń, a następnie używa transakcji lokalnej do oznaczania zdarzeń jako opublikowanych.

Rysunek 6–22 przedstawia architekturę dla pierwszego z tych podejść.

Diagram of atomicity when publishing without a worker microservice.

Rysunek 6–22. Niepodzielność podczas publikowania zdarzeń w magistrali zdarzeń

Na rysunku 6–22 brakuje dodatkowej mikrousługi procesu roboczego, która jest odpowiedzialna za sprawdzanie i potwierdzanie powodzenia opublikowanych zdarzeń integracji. W przypadku awarii, że dodatkowa mikrousługa procesu kontrolnego może odczytywać zdarzenia z tabeli i ponownie je publikować, czyli powtarzać krok 2.

Informacje o drugim podejściu: używasz tabeli EventLog jako kolejki i zawsze używasz mikrousługi procesu roboczego do publikowania komunikatów. W takim przypadku proces jest podobny do przedstawionego na rysunku 6–23. Spowoduje to wyświetlenie dodatkowej mikrousługi, a tabela jest pojedynczym źródłem podczas publikowania zdarzeń.

Diagram of atomicity when publishing with a worker microservice.

Rysunek 6–23. Niepodzielność podczas publikowania zdarzeń w magistrali zdarzeń za pomocą mikrousługi procesu roboczego

Dla uproszczenia przykład eShopOnContainers używa pierwszego podejścia (bez dodatkowych procesów lub mikrousług kontrolnych) oraz magistrali zdarzeń. Jednak przykład eShopOnContainers nie obsługuje wszystkich możliwych przypadków awarii. W rzeczywistej aplikacji wdrożonej w chmurze należy uwzględnić fakt, że problemy pojawią się w końcu, i musisz zaimplementować tę logikę sprawdzania i ponownego wysłania. Użycie tabeli jako kolejki może być bardziej skuteczne niż pierwsze podejście, jeśli ta tabela jest jednym źródłem zdarzeń podczas publikowania ich (z procesem roboczym) za pośrednictwem magistrali zdarzeń.

Implementowanie niepodzielności podczas publikowania zdarzeń integracji za pośrednictwem magistrali zdarzeń

Poniższy kod pokazuje, jak utworzyć jedną transakcję obejmującą wiele obiektów DbContext — jeden kontekst związany z aktualizowanym oryginalnymi danymi, a drugi kontekst związany z tabelą IntegrationEventLog.

Transakcja w poniższym przykładowym kodzie nie będzie odporna, jeśli połączenia z bazą danych mają jakikolwiek problem w momencie uruchomienia kodu. Może się to zdarzyć w systemach opartych na chmurze, takich jak usługa Azure SQL DB, która może przenosić bazy danych między serwerami. Aby zaimplementować odporne transakcje w wielu kontekstach, zobacz sekcję Implementowanie odpornych połączeń SQL platformy Entity Framework Core w dalszej części tego przewodnika.

W poniższym przykładzie pokazano cały proces w jednym fragcie kodu. Jednak implementacja eShopOnContainers jest refaktoryzowana i dzieli tę logikę na wiele klas, aby ułatwić konserwację.

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

Po utworzeniu zdarzenia integracji ProductPriceChangedIntegrationEvent transakcja przechowująca oryginalną operację domeny (aktualizując element wykazu) obejmuje również trwałość zdarzenia w tabeli EventLog. Dzięki temu jest to jedna transakcja i zawsze będzie można sprawdzić, czy komunikaty o zdarzeniach zostały wysłane.

Tabela dziennika zdarzeń jest aktualizowana niepodziecznie przy użyciu oryginalnej operacji bazy danych przy użyciu transakcji lokalnej względem tej samej bazy danych. Jeśli którakolwiek z operacji zakończy się niepowodzeniem, zgłaszany jest wyjątek, a transakcja cofa wszystkie ukończone operacje, zachowując spójność między operacjami domeny a komunikatami zdarzeń zapisanymi w tabeli.

Odbieranie komunikatów z subskrypcji: programy obsługi zdarzeń w mikrousługach odbiorcy

Oprócz logiki subskrypcji zdarzeń należy zaimplementować kod wewnętrzny dla procedur obsługi zdarzeń integracji (takich jak metoda wywołania zwrotnego). Procedura obsługi zdarzeń określa miejsce odbierania i przetwarzania komunikatów o zdarzeniach określonego typu.

Program obsługi zdarzeń najpierw odbiera wystąpienie zdarzenia z magistrali zdarzeń. Następnie lokalizuje składnik do przetworzenia związane z tym zdarzeniem integracji, propagując i utrwalając zdarzenie jako zmianę stanu w mikrousłudze odbiorcy. Jeśli na przykład zdarzenie ProductPriceChanged pochodzi z mikrousługi katalogu, jest obsługiwane w mikrousłudze koszyka i zmienia stan mikrousługi koszyka odbiorcy, jak pokazano w poniższym kodzie.

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

Program obsługi zdarzeń musi sprawdzić, czy produkt istnieje w dowolnym z wystąpień koszyka. Aktualizuje również cenę elementu dla każdego powiązanego elementu wiersza koszyka. Na koniec tworzy alert, który ma być wyświetlany użytkownikowi na temat zmiany ceny, jak pokazano na rysunku 6–24.

Screenshot of a browser showing the price change notification on the user cart.

Rysunek 6–24. Wyświetlanie zmiany ceny elementu w koszyku zgodnie ze zdarzeniami integracji

Idempotency in update message events (Idempotentność w zdarzeniach komunikatów aktualizacji)

Ważnym aspektem zdarzeń komunikatów aktualizacji jest to, że awaria w dowolnym momencie komunikacji powinna spowodować ponowne pobranie komunikatu. W przeciwnym razie zadanie w tle może spróbować opublikować wydarzenie, które zostało już opublikowane, tworząc warunek wyścigu. Upewnij się, że aktualizacje są idempotentne lub zapewniają wystarczającą ilość informacji, aby upewnić się, że można wykryć duplikat, odrzucić je i wysłać z powrotem tylko jedną odpowiedź.

Jak wspomniano wcześniej, idempotencyjność oznacza, że można wykonać operację wiele razy bez zmiany wyniku. W środowisku obsługi komunikatów, tak jak podczas komunikowania zdarzeń, zdarzenie jest idempotentne, jeśli może być dostarczane wiele razy bez zmiany wyniku mikrousługi odbiorcy. Może to być konieczne ze względu na charakter samego zdarzenia lub ze względu na sposób, w jaki system obsługuje zdarzenie. Idempotentność komunikatów jest ważna w każdej aplikacji korzystającej z komunikatów, a nie tylko w aplikacjach implementujących wzorzec magistrali zdarzeń.

Przykładem operacji idempotentnej jest instrukcja SQL, która wstawia dane do tabeli tylko wtedy, gdy te dane nie są jeszcze w tabeli. Nie ma znaczenia, ile razy uruchamiasz instrukcję SQL; wynik będzie taki sam — tabela będzie zawierać te dane. Takie idempotentność może być również konieczna w przypadku obsługi komunikatów, jeśli potencjalnie mogą być wysyłane komunikaty i dlatego przetwarzane więcej niż raz. Jeśli na przykład logika ponawiania powoduje, że nadawca wysyła dokładnie ten sam komunikat więcej niż raz, musisz upewnić się, że jest idempotentny.

Istnieje możliwość zaprojektowania komunikatów idempotentnych. Możesz na przykład utworzyć zdarzenie z komunikatem "ustaw cenę produktu na 25 USD" zamiast "dodaj $5 do ceny produktu". Pierwszy komunikat można bezpiecznie przetworzyć dowolną liczbę razy, a wynik będzie taki sam. To nieprawda dla drugiego komunikatu. Ale nawet w pierwszym przypadku możesz nie chcieć przetworzyć pierwszego zdarzenia, ponieważ system mógł również wysłać nowsze zdarzenie zmiany ceny i byłoby zastąpienie nowej ceny.

Innym przykładem może być zdarzenie zakończone zamówieniem, które jest propagowane do wielu subskrybentów. Aplikacja musi upewnić się, że informacje o kolejności są aktualizowane w innych systemach tylko raz, nawet jeśli istnieją zduplikowane zdarzenia komunikatów dla tego samego zdarzenia zakończonego zamówieniem.

Wygodne jest posiadanie jakiejś tożsamości na zdarzenie, dzięki czemu można utworzyć logikę, która wymusza przetwarzanie każdego zdarzenia tylko raz na odbiorcę.

Niektóre przetwarzanie komunikatów jest z natury idempotentne. Na przykład jeśli system generuje miniatury obrazów, może nie mieć znaczenia, ile razy komunikat o wygenerowanej miniaturze jest przetwarzany; wynikiem jest to, że miniatury są generowane i są takie same za każdym razem. Z drugiej strony operacje, takie jak wywoływanie bramy płatności w celu naliczania opłaty za kartę kredytową, w ogóle nie mogą być idempotentne. W takich przypadkach należy upewnić się, że przetwarzanie komunikatu wiele razy ma oczekiwany efekt.

Dodatkowe zasoby

Deduplikacja komunikatów zdarzeń integracji

Możesz upewnić się, że zdarzenia komunikatów są wysyłane i przetwarzane tylko raz na subskrybenta na różnych poziomach. Jednym ze sposobów jest użycie funkcji deduplikacji oferowanej przez używaną infrastrukturę obsługi komunikatów. Innym jest zaimplementowanie niestandardowej logiki w docelowej mikrousłudze. Sprawdzanie poprawności zarówno na poziomie transportu, jak i na poziomie aplikacji jest najlepszym rozwiązaniem.

Deduplikowanie zdarzeń komunikatów na poziomie programu EventHandler

Jednym ze sposobów upewnienia się, że zdarzenie jest przetwarzane tylko raz przez dowolny odbiornik, jest zaimplementowanie określonej logiki podczas przetwarzania zdarzeń komunikatów w programach obsługi zdarzeń. Na przykład jest to metoda używana w aplikacji eShopOnContainers, jak widać w kodzie źródłowym klasy UserCheckoutAcceptedIntegrationEventHandler po odebraniu UserCheckoutAcceptedIntegrationEvent zdarzenia integracji. (W tym przypadku obiekt CreateOrderCommand jest opakowany IdentifiedCommandza pomocą elementu , używając eventMsg.RequestId identyfikatora jako , przed wysłaniem go do programu obsługi poleceń).

Deduplikacja komunikatów podczas korzystania z biblioteki RabbitMQ

Gdy występują sporadyczne awarie sieci, komunikaty mogą być zduplikowane, a odbiornik komunikatów musi być gotowy do obsługi tych zduplikowanych komunikatów. Jeśli to możliwe, odbiorcy powinni obsługiwać komunikaty w sposób idempotentny, co jest lepsze niż jawne obsługiwanie ich z deduplikacją.

Zgodnie z dokumentacją RabbitMQ" "Jeśli komunikat jest dostarczany do konsumenta, a następnie ponownie w kolejce (ponieważ nie został potwierdzony przed usunięciem połączenia konsumenta, na przykład), RabbitMQ ustawi ponownie flagę redelivered na nim, gdy zostanie dostarczony ponownie (czy do tego samego konsumenta, czy innego).

Jeśli ustawiono flagę "redelivered", odbiorca musi to uwzględnić, ponieważ komunikat mógł już zostać przetworzony. Ale to nie jest gwarantowane; komunikat mógł nigdy nie dotrzeć do odbiornika po opuszczeniu brokera komunikatów, być może z powodu problemów z siecią. Z drugiej strony, jeśli flaga "redelivered" nie jest ustawiona, gwarantowana jest, że wiadomość nie została wysłana więcej niż raz. W związku z tym odbiorca musi deduplikować komunikaty lub przetwarzać komunikaty w sposób idempotentny tylko wtedy, gdy flaga "redelivered" jest ustawiona w komunikacie.

Dodatkowe zasoby