Dela via


Prenumerera på händelser

Dricks

Det här innehållet är ett utdrag från eBook, .NET Microservices Architecture for Containerized .NET Applications, tillgängligt på .NET Docs eller som en kostnadsfri nedladdningsbar PDF som kan läsas offline.

.NET Microservices Architecture for Containerized .NET Applications eBook cover thumbnail.

Det första steget för att använda händelsebussen är att prenumerera mikrotjänsterna på de händelser som de vill ta emot. Den funktionen bör göras i mottagarmikrotjänsterna.

Följande enkla kod visar vad varje mottagarmikrotjänst behöver implementera när tjänsten startas (dvs. i Startup klassen) så att den prenumererar på de händelser den behöver. I det här fallet basket-api måste mikrotjänsten prenumerera ProductPriceChangedIntegrationEvent på och meddelandena OrderStartedIntegrationEvent .

När du till ProductPriceChangedIntegrationEvent exempel prenumererar på händelsen gör det varukorgens mikrotjänst medveten om eventuella ändringar i produktpriset och låter den varna användaren om ändringen om produkten finns i användarens korg.

var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

eventBus.Subscribe<ProductPriceChangedIntegrationEvent,
                   ProductPriceChangedIntegrationEventHandler>();

eventBus.Subscribe<OrderStartedIntegrationEvent,
                   OrderStartedIntegrationEventHandler>();

När den här koden har körts lyssnar prenumerantens mikrotjänst via RabbitMQ-kanaler. När ett meddelande av typen ProductPriceChangedIntegrationEvent kommer anropar koden händelsehanteraren som skickas till den och bearbetar händelsen.

Publicera händelser via händelsebussen

Slutligen publicerar meddelandesändaren (origin microservice) integreringshändelserna med kod som liknar följande exempel. (Den här metoden är ett förenklat exempel som inte tar hänsyn till atomitet.) Du implementerar liknande kod när en händelse måste spridas över flera mikrotjänster, vanligtvis direkt efter att du har checkat in data eller transaktioner från ursprungsmikrotjänsten.

Först skulle event bus-implementeringsobjektet (baserat på RabbitMQ eller baserat på en servicebuss) matas in i styrenhetskonstruktorn, som i följande kod:

[Route("api/v1/[controller]")]
public class CatalogController : ControllerBase
{
    private readonly CatalogContext _context;
    private readonly IOptionsSnapshot<Settings> _settings;
    private readonly IEventBus _eventBus;

    public CatalogController(CatalogContext context,
        IOptionsSnapshot<Settings> settings,
        IEventBus eventBus)
    {
        _context = context;
        _settings = settings;
        _eventBus = eventBus;
    }
    // ...
}

Sedan använder du den från kontrollantens metoder, till exempel i metoden UpdateProduct:

[Route("items")]
[HttpPost]
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem product)
{
    var item = await _context.CatalogItems.SingleOrDefaultAsync(
        i => i.Id == product.Id);
    // ...
    if (item.Price != product.Price)
    {
        var oldPrice = item.Price;
        item.Price = product.Price;
        _context.CatalogItems.Update(item);
        var @event = new ProductPriceChangedIntegrationEvent(item.Id,
            item.Price,
            oldPrice);
        // Commit changes in original transaction
        await _context.SaveChangesAsync();
        // Publish integration event to the event bus
        // (RabbitMQ or a service bus underneath)
        _eventBus.Publish(@event);
        // ...
    }
    // ...
}

I det här fallet, eftersom ursprungsmikrotjänsten är en enkel CRUD-mikrotjänst, placeras koden direkt i en webb-API-kontrollant.

I mer avancerade mikrotjänster, som när du använder CQRS-metoder, kan den implementeras i CommandHandler -klassen i Handle() -metoden.

Utforma atomitet och återhämtning vid publicering till händelsebussen

När du publicerar integreringshändelser via ett distribuerat meddelandesystem som din händelsebuss, har du problem med att atomiskt uppdatera den ursprungliga databasen och publicera en händelse (det vill sa antingen att båda åtgärderna har slutförts eller ingen av dem). I det förenklade exemplet som visades tidigare checkar koden till exempel in data till databasen när produktpriset ändras och publicerar sedan ett ProductPriceChangedIntegrationEvent-meddelande. Inledningsvis kan det se viktigt ut att dessa två åtgärder utförs atomiskt. Men om du använder en distribuerad transaktion som involverar databasen och meddelandekoordinatorn, som du gör i äldre system som Microsoft Message Queuing (MSMQ), rekommenderas inte den här metoden av de skäl som beskrivs i CAP-satsen.

I grund och botten använder du mikrotjänster för att skapa skalbara och högtillgängliga system. För att förenkla något säger CAP-satsen att du inte kan skapa en (distribuerad) databas (eller en mikrotjänst som äger dess modell) som ständigt är tillgänglig, starkt konsekvent och tolerant mot någon partition. Du måste välja två av dessa tre egenskaper.

I mikrotjänstbaserade arkitekturer bör du välja tillgänglighet och tolerans, och du bör framhäva stark konsekvens. I de flesta moderna mikrotjänstbaserade program vill du därför vanligtvis inte använda distribuerade transaktioner i meddelanden, som när du implementerar distribuerade transaktioner baserat på Windows Distributed Transaction Coordinator (DTC) med MSMQ.

Nu går vi tillbaka till det första problemet och dess exempel. Om tjänsten kraschar efter att databasen har uppdaterats (i det här fallet direkt efter kodraden med _context.SaveChangesAsync()), men innan integrationshändelsen publiceras, kan det övergripande systemet bli inkonsekvent. Den här metoden kan vara affärskritisk, beroende på vilken verksamhet du har att göra med.

Som tidigare nämnts i arkitekturavsnittet kan du ha flera metoder för att hantera det här problemet:

  • Använd det fullständiga mönstret händelsekällor.

  • Använda utvinning av transaktionsloggar.

  • Använd utkorgsmönstret. Det här är en transaktionstabell som lagrar integreringshändelserna (utökar den lokala transaktionen).

I det här scenariot är användning av det fullständiga ES-mönstret (Event Sourcing) en av de bästa metoderna, om inte det bästa. Men i många programscenarier kanske du inte kan implementera ett fullständigt ES-system. ES innebär att endast lagra domänhändelser i transaktionsdatabasen i stället för att lagra aktuella tillståndsdata. Lagring av endast domänhändelser kan ha stora fördelar, till exempel att ha systemets historik tillgänglig och att kunna fastställa systemets tillstånd när som helst tidigare. Om du implementerar ett fullständigt ES-system måste du dock omcertifiera det mesta av systemet och introducera många andra komplexiteter och krav. Du vill till exempel använda en databas som är specifikt utformad för händelsekällor, till exempel Event Store eller en dokumentorienterad databas som Azure Cosmos DB, MongoDB, Cassandra, CouchDB eller RavenDB. ES är en bra metod för det här problemet, men inte den enklaste lösningen om du inte redan är bekant med händelsekällor.

Alternativet att använda utvinning av transaktionsloggar ser inledningsvis transparent ut. Om du vill använda den här metoden måste mikrotjänsten dock kopplas till RDBMS-transaktionsloggen, till exempel SQL Server-transaktionsloggen. Detta tillvägagångssätt är förmodligen inte önskvärt. En annan nackdel är att de uppdateringar på låg nivå som registrerats i transaktionsloggen kanske inte är på samma nivå som dina integreringshändelser på hög nivå. I så fall kan processen med att omvänt konstruera dessa transaktionsloggåtgärder vara svår.

En balanserad metod är en blandning av en transaktionsdatabastabell och ett förenklat ES-mönster. Du kan använda ett tillstånd som "redo att publicera händelsen", som du angav i den ursprungliga händelsen när du checkar in den i tabellen för integrationshändelser. Sedan försöker du publicera händelsen till händelsebussen. Om åtgärden publish-event lyckas startar du en annan transaktion i ursprungstjänsten och flyttar tillståndet från "redo att publicera händelsen" till "händelsen har redan publicerats".

Om åtgärden publish-event i händelsebussen misslyckas är data fortfarande inte inkonsekventa inom ursprungsmikrotjänsten– den är fortfarande markerad som "redo att publicera händelsen", och när det gäller resten av tjänsterna blir den till slut konsekvent. Du kan alltid ha bakgrundsjobb som kontrollerar tillståndet för transaktionerna eller integrationshändelserna. Om jobbet hittar en händelse i tillståndet "redo att publicera händelsen" kan den försöka publicera händelsen på nytt till händelsebussen.

Observera att med den här metoden bevarar du bara integreringshändelserna för varje ursprungsmikrotjänst och endast de händelser som du vill kommunicera med andra mikrotjänster eller externa system. I ett fullständigt ES-system lagrar du däremot även alla domänhändelser.

Därför är denna balanserade metod ett förenklat ES-system. Du behöver en lista över integreringshändelser med deras aktuella tillstånd ("redo att publicera" jämfört med "publicerad"). Men du behöver bara implementera dessa tillstånd för integrationshändelserna. Och i den här metoden behöver du inte lagra alla dina domändata som händelser i transaktionsdatabasen, som i ett fullständigt ES-system.

Om du redan använder en relationsdatabas kan du använda en transaktionstabell för att lagra integrationshändelser. För att uppnå atomicitet i ditt program använder du en tvåstegsprocess baserad på lokala transaktioner. I grund och botten har du en IntegrationEvent-tabell i samma databas där du har dina domänentiteter. Den tabellen fungerar som en försäkring för att uppnå atomitet så att du inkluderar bevarade integreringshändelser i samma transaktioner som genomför dina domändata.

Steg för steg går processen så här:

  1. Programmet påbörjar en lokal databastransaktion.

  2. Den uppdaterar sedan tillståndet för dina domänentiteter och infogar en händelse i integrationshändelsetabellen.

  3. Slutligen checkar den in transaktionen, så du får önskad atomitet och sedan

  4. Du publicerar händelsen på något sätt (nästa).

När du implementerar stegen för att publicera händelserna har du följande alternativ:

  • Publicera integrationshändelsen direkt efter att transaktionen har genomförts och använd en annan lokal transaktion för att markera händelserna i tabellen som publicerade. Använd sedan tabellen precis som en artefakt för att spåra integreringshändelser i händelse av problem i fjärrmikrotjänster och utföra kompenserande åtgärder baserat på de lagrade integrationshändelserna.

  • Använd tabellen som en typ av kö. En separat programtråd eller process frågar integrationshändelsetabellen, publicerar händelserna till händelsebussen och använder sedan en lokal transaktion för att markera händelserna som publicerade.

Bild 6–22 visar arkitekturen för den första av dessa metoder.

Diagram of atomicity when publishing without a worker microservice.

Bild 6-22. Atomicitet vid publicering av händelser till händelsebussen

Metoden som illustreras i bild 6-22 saknar ytterligare en mikrotjänst för arbetare som ansvarar för att kontrollera och bekräfta att de publicerade integrationshändelserna lyckas. Vid fel kan den ytterligare checker worker-mikrotjänsten läsa händelser från tabellen och publicera om dem, det vill säga upprepa steg nummer 2.

Om den andra metoden: du använder tabellen EventLog som en kö och använder alltid en arbetsmikrotjänst för att publicera meddelandena. I så fall är processen som den som visas i bild 6–23. Detta visar ytterligare en mikrotjänst och tabellen är den enda källan när händelser publiceras.

Diagram of atomicity when publishing with a worker microservice.

Bild 6-23. Atomicitet vid publicering av händelser till händelsebussen med en arbetsmikrotjänst

För enkelhetens skull använder exemplet eShopOnContainers den första metoden (utan ytterligare processer eller kontrollmikrotjänster) plus händelsebussen. Exemplet eShopOnContainers hanterar dock inte alla möjliga felfall. I ett verkligt program som distribueras till molnet måste du ta till dig det faktum att problem uppstår så småningom, och du måste implementera den kontrollen och skicka logiken igen. Att använda tabellen som en kö kan vara effektivare än den första metoden om du har tabellen som en enda källa till händelser när du publicerar dem (med arbetaren) via händelsebussen.

Implementera atomicitet när du publicerar integrationshändelser via händelsebussen

Följande kod visar hur du kan skapa en enda transaktion med flera DbContext-objekt – en kontext som är relaterad till de ursprungliga data som uppdateras och den andra kontexten som är relaterad till tabellen IntegrationEventLog.

Transaktionen i exempelkoden nedan är inte motståndskraftig om anslutningar till databasen har något problem vid den tidpunkt då koden körs. Detta kan inträffa i molnbaserade system som Azure SQL DB, som kan flytta databaser mellan servrar. Information om hur du implementerar elastiska transaktioner i flera kontexter finns i avsnittet Implementera elastiska Entity Framework Core SQL-anslutningar senare i den här guiden.

För tydlighetens skull visar följande exempel hela processen i en enda kod. EShopOnContainers-implementeringen omstruktureras dock och delar upp den här logiken i flera klasser så att det är enklare att underhålla.

// Update Product from the Catalog microservice
//
public async Task<IActionResult> UpdateProduct([FromBody]CatalogItem productToUpdate)
{
  var catalogItem =
       await _catalogContext.CatalogItems.SingleOrDefaultAsync(i => i.Id ==
                                                               productToUpdate.Id);
  if (catalogItem == null) return NotFound();

  bool raiseProductPriceChangedEvent = false;
  IntegrationEvent priceChangedEvent = null;

  if (catalogItem.Price != productToUpdate.Price)
          raiseProductPriceChangedEvent = true;

  if (raiseProductPriceChangedEvent) // Create event if price has changed
  {
      var oldPrice = catalogItem.Price;
      priceChangedEvent = new ProductPriceChangedIntegrationEvent(catalogItem.Id,
                                                                  productToUpdate.Price,
                                                                  oldPrice);
  }
  // Update current product
  catalogItem = productToUpdate;

  // Just save the updated product if the Product's Price hasn't changed.
  if (!raiseProductPriceChangedEvent)
  {
      await _catalogContext.SaveChangesAsync();
  }
  else  // Publish to event bus only if product price changed
  {
        // Achieving atomicity between original DB and the IntegrationEventLog
        // with a local transaction
        using (var transaction = _catalogContext.Database.BeginTransaction())
        {
           _catalogContext.CatalogItems.Update(catalogItem);
           await _catalogContext.SaveChangesAsync();

           await _integrationEventLogService.SaveEventAsync(priceChangedEvent);

           transaction.Commit();
        }

      // Publish the integration event through the event bus
      _eventBus.Publish(priceChangedEvent);

      _integrationEventLogService.MarkEventAsPublishedAsync(
                                                priceChangedEvent);
  }

  return Ok();
}

När integrationshändelsen ProductPriceChangedIntegrationEvent har skapats innehåller transaktionen som lagrar den ursprungliga domänåtgärden (uppdatera katalogobjektet) även beständigheten för händelsen i tabellen EventLog. Detta gör det till en enda transaktion och du kommer alltid att kunna kontrollera om händelsemeddelanden har skickats.

Händelseloggtabellen uppdateras atomiskt med den ursprungliga databasåtgärden med hjälp av en lokal transaktion mot samma databas. Om någon av åtgärderna misslyckas utlöses ett undantag och transaktionen återställer alla slutförda åtgärder, vilket bibehåller konsekvensen mellan domänåtgärderna och de händelsemeddelanden som sparats i tabellen.

Ta emot meddelanden från prenumerationer: händelsehanterare i mottagarmikrotjänster

Förutom logiken för händelseprenumeration måste du implementera den interna koden för integrationshändelsehanterare (till exempel en återanropsmetod). Händelsehanteraren är där du anger var händelsemeddelanden av en viss typ ska tas emot och bearbetas.

En händelsehanterare tar först emot en händelseinstans från händelsebussen. Sedan letar den upp komponenten som ska bearbetas relaterad till den integrationshändelsen, som sprider och bevarar händelsen som en ändring i tillståndet i mottagarmikrotjänsten. Om en ProductPriceChanged-händelse till exempel kommer från katalogmikrotjänsten hanteras den i korgens mikrotjänst och ändrar även tillståndet i den här mottagarkorgens mikrotjänst, enligt följande kod.

namespace Microsoft.eShopOnContainers.Services.Basket.API.IntegrationEvents.EventHandling
{
    public class ProductPriceChangedIntegrationEventHandler :
        IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
    {
        private readonly IBasketRepository _repository;

        public ProductPriceChangedIntegrationEventHandler(
            IBasketRepository repository)
        {
            _repository = repository;
        }

        public async Task Handle(ProductPriceChangedIntegrationEvent @event)
        {
            var userIds = await _repository.GetUsers();
            foreach (var id in userIds)
            {
                var basket = await _repository.GetBasket(id);
                await UpdatePriceInBasketItems(@event.ProductId, @event.NewPrice, basket);
            }
        }

        private async Task UpdatePriceInBasketItems(int productId, decimal newPrice,
            CustomerBasket basket)
        {
            var itemsToUpdate = basket?.Items?.Where(x => int.Parse(x.ProductId) ==
                productId).ToList();
            if (itemsToUpdate != null)
            {
                foreach (var item in itemsToUpdate)
                {
                    if(item.UnitPrice != newPrice)
                    {
                        var originalPrice = item.UnitPrice;
                        item.UnitPrice = newPrice;
                        item.OldUnitPrice = originalPrice;
                    }
                }
                await _repository.UpdateBasket(basket);
            }
        }
    }
}

Händelsehanteraren måste kontrollera om produkten finns i någon av korginstanserna. Den uppdaterar även artikelpriset för varje relaterat korgradsobjekt. Slutligen skapar den en avisering som ska visas för användaren om prisändringen, enligt bild 6–24.

Screenshot of a browser showing the price change notification on the user cart.

Bild 6-24. Visa en artikelprisändring i en korg enligt vad som kommuniceras av integrationshändelser

Idempotens i uppdateringsmeddelandehändelser

En viktig aspekt av uppdateringsmeddelandehändelser är att ett fel när som helst i kommunikationen ska leda till att meddelandet görs ett nytt försök. Annars kan en bakgrundsaktivitet försöka publicera en händelse som redan har publicerats, vilket skapar ett konkurrenstillstånd. Kontrollera att uppdateringarna antingen är idempotent eller att de ger tillräckligt med information för att säkerställa att du kan identifiera en dubblett, ignorera den och skicka tillbaka endast ett svar.

Som tidigare nämnts innebär idempotens att en åtgärd kan utföras flera gånger utan att resultatet ändras. I en meddelandemiljö, som vid kommunikation av händelser, är en händelse idempotent om den kan levereras flera gånger utan att ändra resultatet för mottagarens mikrotjänst. Detta kan vara nödvändigt på grund av själva händelsens art eller på grund av hur systemet hanterar händelsen. Meddelande-idempotens är viktigt i alla program som använder meddelanden, inte bara i program som implementerar event bus-mönstret.

Ett exempel på en idempotent-åtgärd är en SQL-instruktion som endast infogar data i en tabell om dessa data inte redan finns i tabellen. Det spelar ingen roll hur många gånger du kör infognings-SQL-instruktionen. resultatet blir detsamma – tabellen innehåller dessa data. Idempotens som detta kan också vara nödvändigt när du hanterar meddelanden om meddelandena potentiellt kan skickas och därför bearbetas mer än en gång. Om omförsökslogik till exempel gör att en avsändare skickar exakt samma meddelande mer än en gång måste du se till att det är idempotent.

Det går att utforma idempotentmeddelanden. Du kan till exempel skapa en händelse som säger "ange produktpriset till 25 USD" i stället för att "lägga till $5 i produktpriset". Du kan på ett säkert sätt bearbeta det första meddelandet när som helst och resultatet blir detsamma. Det stämmer inte för det andra meddelandet. Men även i det första fallet kanske du inte vill bearbeta den första händelsen, eftersom systemet också kunde ha skickat en nyare prisändringshändelse och du skulle skriva över det nya priset.

Ett annat exempel kan vara en order-completed händelse som sprids till flera prenumeranter. Appen måste se till att orderinformationen uppdateras i andra system bara en gång, även om det finns duplicerade meddelandehändelser för samma beställningsfördelade händelse.

Det är praktiskt att ha någon form av identitet per händelse så att du kan skapa logik som framtvingar att varje händelse endast bearbetas en gång per mottagare.

Viss meddelandebearbetning är idempotent. Om ett system till exempel genererar miniatyrbilder kanske det inte spelar någon roll hur många gånger meddelandet om den genererade miniatyrbilden bearbetas. resultatet är att miniatyrbilderna genereras och de är desamma varje gång. Å andra sidan kanske åtgärder som att anropa en betalningsgateway för att debitera ett kreditkort inte alls är idempotent. I dessa fall måste du se till att bearbetningen av ett meddelande flera gånger har den effekt som du förväntar dig.

Ytterligare resurser

Deduplicera meddelanden om integrationshändelser

Du kan se till att meddelandehändelser endast skickas och bearbetas en gång per prenumerant på olika nivåer. Ett sätt är att använda en dedupliceringsfunktion som erbjuds av den meddelandeinfrastruktur som du använder. En annan är att implementera anpassad logik i din målmikrotjänst. Att ha valideringar på både transportnivå och programnivå är ditt bästa val.

Deduplicera meddelandehändelser på EventHandler-nivå

Ett sätt att se till att en händelse endast bearbetas en gång av en mottagare är genom att implementera viss logik när du bearbetar meddelandehändelserna i händelsehanterare. Det är till exempel den metod som används i eShopOnContainers-programmet, som du kan se i källkoden för klassen UserCheckoutAcceptedIntegrationEventHandler när den tar emot en UserCheckoutAcceptedIntegrationEvent integrationshändelse. (I det här fallet CreateOrderCommand omsluts den med en IdentifiedCommand, med som eventMsg.RequestId identifierare, innan den skickas till kommandohanteraren).

Deduplicera meddelanden när du använder RabbitMQ

När tillfälliga nätverksfel inträffar kan meddelanden dupliceras och meddelandemottagaren måste vara redo att hantera dessa duplicerade meddelanden. Om möjligt bör mottagarna hantera meddelanden på ett idempotent sätt, vilket är bättre än att uttryckligen hantera dem med deduplicering.

Enligt RabbitMQ-dokumentationen " Om ett meddelande levereras till en konsument och sedan skickas tillbaka (eftersom det inte bekräftades innan konsumentanslutningen till exempel släpptes) kommer RabbitMQ att ange flaggan för omleverans på den när den levereras igen (oavsett om den är till samma konsument eller en annan).

Om flaggan "redelivered" har angetts måste mottagaren ta hänsyn till det, eftersom meddelandet kanske redan har bearbetats. Men det är inte garanterat; meddelandet kanske aldrig har nått mottagaren efter att det lämnat meddelandekoordinatorn, kanske på grund av nätverksproblem. Om flaggan "redelivered" å andra sidan inte har angetts garanteras det att meddelandet inte har skickats mer än en gång. Därför måste mottagaren deduplicera meddelanden eller bearbeta meddelanden på ett idempotent sätt endast om flaggan "skickas igen" anges i meddelandet.

Ytterligare resurser