Implementace spolehlivého zasílání zpráv v distribuovaných systémech může být náročná. Tento článek popisuje, jak používat model Transakční pošta k odeslání pro spolehlivé zasílání zpráv a zaručené doručení událostí, důležitou součástí podpůrného zpracování idempotentní zprávy. K tomu použijete transakční dávky služby Azure Cosmos DB a kanál změn v kombinaci se službou Azure Service Bus.
Přehled
Architektury mikroslužeb jsou stále oblíbenější a ukazují slib při řešení problémů, jako je škálovatelnost, udržovatelnost a flexibilita, zejména ve velkých aplikacích. Tento model architektury ale přináší také výzvy, pokud jde o zpracování dat. V distribuovanýchaplikacích Pro podporu takového scénáře obvykle používáte řešení pro zasílání zpráv, jako je RabbitMQ, Kafka nebo Azure Service Bus, které distribuuje data (události) z jedné služby přes sběrnici zasílání zpráv do jiných služeb aplikace. Interní nebo externí spotřebitelé se pak můžou přihlásit k odběru těchto zpráv a dostávat oznámení o změnách hned po manipulaci s daty.
Známým příkladem v této oblasti je systém objednávání: když chce uživatel vytvořit objednávku, Ordering
služba přijímá data z klientské aplikace prostřednictvím koncového bodu REST. Namapuje datovou část na interní reprezentaci Order
objektu, aby ověřila data. Po úspěšném potvrzení databáze publikuje OrderCreated
událost do sběrnice zpráv. Jakákoli jiná služba, která má zájem o nové objednávky (například Inventory
Invoicing
službu), by se přihlásila k OrderCreated
odběru zpráv, zpracovávala je a ukládala do vlastní databáze.
Následující pseudokód ukazuje, jak tento proces obvykle vypadá z Ordering
pohledu služby:
CreateNewOrder(CreateOrderDto order){
// Validate the incoming data.
...
// Apply business logic.
...
// Save the object to the database.
var result = _orderRespository.Create(order);
// Publish the respective event.
_messagingService.Publish(new OrderCreatedEvent(result));
return Ok();
}
Tento přístup funguje dobře, dokud nedojde k chybě mezi uložením objektu objednávky a publikováním odpovídající události. Odesílání události může v tuto chvíli selhat z mnoha důvodů:
- Chyby sítě
- Výpadek služby zpráv
- Selhání hostitele
Ať je chyba jakákoli, výsledkem je, že OrderCreated
událost nemůže být publikována do sběrnice zpráv. Ostatní služby nebudou upozorněny na vytvoření objednávky. Služba Ordering
se teď musí postarat o různé věci, které nesouvisí se skutečným obchodním procesem. Musí mít přehled o událostech, které je stále potřeba umístit do sběrnice zpráv, jakmile bude zase online. Dokonce i nejhorší případ může nastat: nekonzistence dat v aplikaci kvůli ztraceným událostem.
Řešení
Existuje dobře známý vzor s názvem Transakční pošta k odeslání, který vám pomůže vyhnout se těmto situacím. Zajišťuje, aby se události ukládaly do úložiště dat (obvykle v tabulce Pošta k odeslání ve vaší databázi), než se nakonec nasdílí do zprostředkovatele zpráv. Pokud jsou obchodní objekt a odpovídající události uloženy ve stejné databázové transakci, je zaručeno, že se neztratí žádná data. Všechno se potvrdí, jinak se všechno vrátí, pokud dojde k chybě. Pokud chcete událost nakonec publikovat, jiná služba nebo pracovní proces dotazuje tabulku Pošta k odeslání na neošetřené položky, publikuje události a označí je jako zpracované. Tento model zajišťuje, že po vytvoření nebo úpravě obchodního objektu nedojde ke ztrátě událostí.
Stáhněte si soubor aplikace Visio s touto architekturou.
V relační databázi je implementace modelu jednoduchá. Pokud služba například používá Entity Framework Core, použije kontext Entity Framework k vytvoření databázové transakce, uloží obchodní objekt a událost a potvrdí transakci nebo provede vrácení zpět. Pracovní služba, která zpracovává události, se také snadno implementuje: pravidelně dotazuje tabulku Pošta k odeslání pro nové položky, publikuje nově vložené události do sběrnice zpráv a nakonec označí tyto položky jako zpracované.
V praxi nejsou věci tak snadné, jak by se mohly podívat na první pohled. Nejdůležitější je zajistit, aby se pořadí událostí zachovalo, aby OrderUpdated
se událost nepublikovala před událostí OrderCreated
.
Implementace ve službě Azure Cosmos DB
Tato část ukazuje, jak implementovat model Transakční pošta k odeslání ve službě Azure Cosmos DB, aby bylo možné dosáhnout spolehlivého zasílání zpráv v pořadí mezi různými službami pomocí kanálu změn služby Azure Cosmos DB a služby Service Bus. Ukazuje ukázkovou službu, která spravuje Contact
objekty (FirstName
, LastName
, Email
, Company
informace atd.). Používá vzor oddělení odpovědnosti příkazů a dotazů (CQRS) a řídí se základními koncepty návrhu řízeného doménou (DDD). Vzorový kód pro implementaci najdete na GitHubu.
Objekt Contact
v ukázkové službě má následující strukturu:
{
"name": {
"firstName": "John",
"lastName": "Doe"
},
"description": "This is a contact",
"email": "johndoe@contoso.com",
"company": {
"companyName": "Contoso",
"street": "Street",
"houseNumber": "1a",
"postalCode": "092821",
"city": "Palo Alto",
"country": "US"
},
"createdAt": "2021-09-22T11:07:37.3022907+02:00",
"deleted": false
}
Jakmile Contact
se vytvoří nebo aktualizuje, vygeneruje události obsahující informace o aktuální změně. Mimo jiné můžou být události domény:
ContactCreated
. Vyvolá se při přidání kontaktu.ContactNameUpdated
. Vyvolání přiFirstName
změně neboLastName
změně.ContactEmailUpdated
. Vyvolá se při aktualizaci e-mailové adresy.ContactCompanyUpdated
. Vyvolá se při změně některé z vlastností společnosti.
Transakční dávky
Pokud chcete tento model implementovat, musíte zajistit Contact
, aby obchodní objekt a odpovídající události byly uloženy ve stejné databázové transakci. Ve službě Azure Cosmos DB fungují transakce jinak než v relačních databázových systémech. Transakce Azure Cosmos DB, označované jako transakční dávky, pracují s jedním logickým oddílem, takže zaručují atomicitu, konzistenci, izolaci a odolnost (ACID). V transakční dávkové operaci nemůžete uložit dva dokumenty v různých kontejnerech nebo logických oddílech. Pro ukázkovou službu to znamená, že obchodní objekt i událost nebo události budou vloženy do stejného kontejneru a logického oddílu.
Kontext, úložiště a UnitOfWork
Jádrem ukázkové implementace je kontext kontejneru, který sleduje objekty uložené ve stejné transakční dávce. Udržuje seznam vytvořených a upravených objektů a funguje v jednom kontejneru Azure Cosmos DB. Rozhraní pro něj vypadá takto:
public interface IContainerContext
{
public Container Container { get; }
public List<IDataObject<Entity>> DataObjects { get; }
public void Add(IDataObject<Entity> entity);
public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
public void Reset();
}
Seznam v komponentě kontextu kontejneru sleduje Contact
a DomainEvent
objekty. Obě budou vloženy do stejného kontejneru. To znamená, že ve stejném kontejneru Azure Cosmos DB je uloženo více typů objektů a k rozlišení mezi obchodním objektem a událostí použijte Type
vlastnost.
Pro každý typ je vyhrazené úložiště, které definuje a implementuje přístup k datům. Rozhraní Contact
úložiště poskytuje tyto metody:
public interface IContactsRepository
{
public void Create(Contact contact);
public Task<(Contact, string)> ReadAsync(Guid id, string etag);
public Task DeleteAsync(Guid id, string etag);
public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
public void Update(Contact contact, string etag);
}
Úložiště Event
vypadá podobně, s výjimkou jediné metody, která v úložišti vytváří nové události:
public interface IEventRepository
{
public void Create(ContactDomainEvent e);
}
Implementace obou rozhraní úložiště získávají odkaz prostřednictvím injektáže závislostí do jedné IContainerContext
instance, aby se zajistilo, že oba fungují ve stejném kontextu služby Azure Cosmos DB.
Poslední komponenta je UnitOfWork
, která potvrdí změny uchovávané v IContainerContext
instanci do služby Azure Cosmos DB:
public class UnitOfWork : IUnitOfWork
{
private readonly IContainerContext _context;
public IContactRepository ContactsRepo { get; }
public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
{
_context = ctx;
ContactsRepo = cRepo;
}
public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
{
return _context.SaveChangesAsync(cancellationToken);
}
}
Zpracování událostí: Vytvoření a publikování
Při každém vytvoření, změně nebo odstranění objektu Contact
(obnovitelné odstranění) služba vyvolá odpovídající událost. Jádrem poskytnutého řešení je kombinace návrhu řízeného doménou (DDD) a mediátora navrženého Jimmy Bogardem. Navrhuje udržování seznamu událostí, ke kterým došlo kvůli změnám objektu domény a publikování těchto událostí před uložením skutečného objektu do databáze.
Seznam změn se uchovává v samotném objektu domény, takže žádná jiná komponenta nemůže měnit řetěz událostí. Chování udržování událostí (IEvent
instancí) v objektu domény je definováno prostřednictvím rozhraní IEventEmitter<IEvent>
a implementováno v abstraktní DomainEntity
třídě:
public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
private readonly List<IEvent> _events = new();
[JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();
public virtual void AddEvent(IEvent domainEvent)
{
var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
if (i < 0)
{
_events.Add(domainEvent);
}
else
{
_events.RemoveAt(i);
_events.Insert(i, domainEvent);
}
}
[...]
[...]
}
Objekt Contact
vyvolává události domény. Entita Contact
se řídí základními koncepty DDD a konfiguruje settery vlastností domény jako soukromé. Ve třídě neexistují žádné veřejné settery. Místo toho nabízí metody pro manipulaci s interním stavem. V těchto metodách mohou být vyvolány vhodné události pro určitou změnu (například ContactNameUpdated
ContactEmailUpdated
) .
Tady je příklad aktualizací jména kontaktu. (Událost je vyvolána na konci metody.)
public void SetName(string firstName, string lastName)
{
if (string.IsNullOrWhiteSpace(firstName) ||
string.IsNullOrWhiteSpace(lastName))
{
throw new ArgumentException("FirstName or LastName cannot be empty");
}
Name = new Name(firstName, lastName);
if (IsNew) return;
AddEvent(new ContactNameUpdatedEvent(Id, Name));
ModifiedAt = DateTimeOffset.UtcNow;
}
Odpovídající ContactNameUpdatedEvent
, který sleduje změny, vypadá takto:
public class ContactNameUpdatedEvent : ContactDomainEvent
{
public Name Name { get; }
public ContactNameUpdatedEvent(Guid contactId, Name contactName) :
base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
{
Name = contactName;
}
}
Zatím se události právě protokolují do objektu domény a nic se neuloží do databáze nebo dokonce publikuje do zprostředkovatele zpráv. Po doporučení se seznam událostí zpracuje přímo před uložením obchodního objektu do úložiště dat. V tomto případě k tomu dochází v SaveChangesAsync
metodě IContainerContext
instance, která je implementována v privátní RaiseDomainEvents
metodě. (dObjs
je seznam sledovaných entit kontextu kontejneru.)
private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
var eventEmitters = new List<IEventEmitter<IEvent>>();
// Get all EventEmitters.
foreach (var o in dObjs)
if (o.Data is IEventEmitter<IEvent> ee)
eventEmitters.Add(ee);
// Raise events.
if (eventEmitters.Count <= 0) return;
foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
_mediator.Publish(evt);
}
Na posledním řádku se balíček MediatR , implementace mediátorového vzoru v jazyce C#, používá k publikování události v aplikaci. Je to možné, protože všechny události, jako je ContactNameUpdatedEvent
implementace INotification
rozhraní balíčku MediatR.
Tyto události je potřeba zpracovat odpovídající obslužnou rutinou. Zde přichází implementace IEventsRepository
do hry. Tady je ukázka NameUpdated
obslužné rutiny události:
public class ContactNameUpdatedHandler :
INotificationHandler<ContactNameUpdatedEvent>
{
private IEventRepository EventRepository { get; }
public ContactNameUpdatedHandler(IEventRepository eventRepo)
{
EventRepository = eventRepo;
}
public Task Handle(ContactNameUpdatedEvent notification,
CancellationToken cancellationToken)
{
EventRepository.Create(notification);
return Task.CompletedTask;
}
}
Instance IEventRepository
se vloží do třídy obslužné rutiny prostřednictvím konstruktoru. Jakmile ContactNameUpdatedEvent
se publikuje ve službě, Handle
vyvolá se metoda a použije instanci úložiště událostí k vytvoření objektu oznámení. Tento objekt oznámení se zase vloží do seznamu sledovaných objektů v objektu IContainerContext
a spojí objekty uložené ve stejné transakční dávce se službou Azure Cosmos DB.
Zatím kontext kontejneru ví, které objekty se mají zpracovat. Aby se nakonec sledované objekty zachovaly ve službě Azure Cosmos DB, IContainerContext
implementace vytvoří transakční dávku, přidá všechny relevantní objekty a spustí operaci s databází. Popsaný proces se zpracovává v SaveInTransactionalBatchAsync
metodě, která je vyvolána metodou SaveChangesAsync
.
Tady jsou důležité části implementace, kterou potřebujete k vytvoření a spuštění transakční dávky:
private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
CancellationToken cancellationToken)
{
if (DataObjects.Count > 0)
{
var pk = new PartitionKey(DataObjects[0].PartitionKey);
var tb = Container.CreateTransactionalBatch(pk);
DataObjects.ForEach(o =>
{
TransactionalBatchItemRequestOptions tro = null;
if (!string.IsNullOrWhiteSpace(o.Etag))
tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };
switch (o.State)
{
case EntityState.Created:
tb.CreateItem(o);
break;
case EntityState.Updated or EntityState.Deleted:
tb.ReplaceItem(o.Id, o, tro);
break;
}
});
var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
}
// Return copy of current list as result.
var result = new List<IDataObject<Entity>>(DataObjects);
// Work has been successfully done. Reset DataObjects list.
DataObjects.Clear();
return result;
}
Tady je přehled toho, jak proces zatím funguje (pro aktualizaci jména u objektu kontaktu):
- Klient chce aktualizovat jméno kontaktu. Metoda
SetName
je vyvolána u objektu kontaktu a vlastnosti jsou aktualizovány. - Událost
ContactNameUpdated
se přidá do seznamu událostí v objektu domény. - Vyvolá se metoda úložiště
Update
kontaktů, která přidá objekt domény do kontextu kontejneru. Objekt je nyní sledován. CommitAsync
je vyvolána vUnitOfWork
instanci, která následně voláSaveChangesAsync
kontext kontejneru.- V rámci
SaveChangesAsync
, všechny události v seznamu objektu domény jsou publikoványMediatR
instancí a jsou přidány prostřednictvím úložiště událostí do stejného kontextu kontejneru. - V souboru < a0/0
SaveChangesAsync
> se vytvoří.TransactionalBatch
Bude obsahovat objekt kontaktu i událost. - Spuštění
TransactionalBatch
a data se zapíšou do služby Azure Cosmos DB. SaveChangesAsync
aCommitAsync
úspěšně se vrátí.
Uchování
Jak vidíte v předchozích fragmentech kódu, všechny objekty uložené ve službě DataObject
Azure Cosmos DB se zabalí do instance. Tento objekt poskytuje společné vlastnosti:
ID
.PartitionKey
.Type
.State
.Updated
Podobně jakoCreated
v Azure Cosmos DB se neuchová.Etag
. Pro optimistické uzamčení.TTL
. Vlastnost Time To Live pro automatické vyčištění starých dokumentůData
. Obecný datový objekt.
Tyto vlastnosti jsou definovány v obecném rozhraní, které se volá IDataObject
a používá úložiště a kontext kontejneru:
public interface IDataObject<out T> where T : Entity
{
string Id { get; }
string PartitionKey { get; }
string Type { get; }
T Data { get; }
string Etag { get; set; }
int Ttl { get; }
EntityState State { get; set; }
}
Objekty zabalené v DataObject
instanci a uložené do databáze pak budou vypadat jako v této ukázce (Contact
a ContactNameUpdatedEvent
):
// Contact document/object. After creation.
{
"id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"type": "contact",
"data": {
"name": {
"firstName": "John",
"lastName": "Doe"
},
"description": "This is a contact",
"email": "johndoe@contoso.com",
"company": {
"companyName": "Contoso",
"street": "Street",
"houseNumber": "1a",
"postalCode": "092821",
"city": "Palo Alto",
"country": "US"
},
"createdAt": "2021-09-22T11:07:37.3022907+02:00",
"deleted": false,
"id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
},
"ttl": -1,
"_etag": "\"180014cc-0000-1500-0000-614455330000\"",
"_ts": 1632301657
}
// After setting a new name, this is how an event document looks.
{
"id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
"partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"type": "domainEvent",
"data": {
"name": {
"firstName": "Jane",
"lastName": "Doe"
},
"contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"action": "ContactNameUpdatedEvent",
"id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
"createdAt": "2021-09-22T11:37:37.3022907+02:00"
},
"ttl": 120,
"_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
"_ts": 1632303457
}
Vidíte, že Contact
dokumenty a ContactNameUpdatedEvent
(typ domainEvent
) mají stejný klíč oddílu a že oba dokumenty budou zachovány ve stejném logickém oddílu.
Zpracování kanálu změn
Ke čtení datového proudu událostí a jejich odesílání do zprostředkovatele zpráv bude služba používat kanál změn služby Azure Cosmos DB.
Kanál změn je trvalý protokol změn v kontejneru. Funguje na pozadí a sleduje úpravy. V rámci jednoho logického oddílu je zaručeno pořadí změn. Nejpohodlnější způsob, jak číst kanál změn, je použít funkci Azure s triggerem služby Azure Cosmos DB. Další možností je použít knihovnu procesoru kanálu změn. Umožňuje integrovat zpracování kanálu změn ve webovém rozhraní API jako službu na pozadí (prostřednictvím IHostedService
rozhraní). Ukázka zde používá jednoduchou konzolovou aplikaci, která implementuje abstraktní třídu BackgroundService k hostování dlouhotrvajících úloh na pozadí v aplikacích .NET Core.
Pokud chcete přijímat změny z kanálu změn služby Azure Cosmos DB, musíte vytvořit instanci ChangeFeedProcessor
objektu, zaregistrovat metodu obslužné rutiny pro zpracování zpráv a začít naslouchat změnám:
private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
var changeFeedProcessor = _container
.GetChangeFeedProcessorBuilder<ExpandoObject>(
_configuration.GetSection("Cosmos")["ProcessorName"],
HandleChangesAsync)
.WithInstanceName(Environment.MachineName)
.WithLeaseContainer(_leaseContainer)
.WithMaxItems(25)
.WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
.WithPollInterval(TimeSpan.FromSeconds(3))
.Build();
_logger.LogInformation("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
_logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
return changeFeedProcessor;
}
Metoda obslužné rutiny (HandleChangesAsync
zde) pak zpracuje zprávy. V této ukázce se události publikují do tématu služby Service Bus, které je rozdělené na oddíly pro škálovatelnost a má povolenou funkci odstranění duplicit. Každá služba, která má zájem o Contact
změny objektů, se pak může přihlásit k odběru daného tématu služby Service Bus a přijímat a zpracovávat změny pro svůj vlastní kontext.
Vytvořené zprávy služby Service Bus mají SessionId
vlastnost. Při použití relací ve službě Service Bus zaručujete, že se zachová pořadí zpráv (napřed in, first out (FIFO)). Pro tento případ použití je nutné zachovat pořadí.
Tady je fragment kódu, který zpracovává zprávy z kanálu změn:
private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
_logger.LogInformation($"Received {changes.Count} document(s).");
var eventsCount = 0;
Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();
foreach (var document in changes as dynamic)
{
if (!((IDictionary<string, object>)document).ContainsKey("type") ||
!((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.
if (document.type == EVENT_TYPE) // domainEvent.
{
string json = JsonConvert.SerializeObject(document.data);
var sbMessage = new ServiceBusMessage(json)
{
ContentType = "application/json",
Subject = document.data.action,
MessageId = document.id,
PartitionKey = document.partitionKey,
SessionId = document.partitionKey
};
// Create message batch per partitionKey.
if (partitionedMessages.ContainsKey(document.partitionKey))
{
partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
}
else
{
partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
}
eventsCount++;
}
}
if (partitionedMessages.Count > 0)
{
_logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");
// Loop over each partition.
foreach (var partition in partitionedMessages)
{
// Create batch for partition.
using var messageBatch =
await _topicSender.CreateMessageBatchAsync(cancellationToken);
foreach (var msg in partition.Value)
if (!messageBatch.TryAddMessage(msg))
throw new Exception();
_logger.LogInformation(
$"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");
try
{
await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e.Message);
throw;
}
}
}
else
{
_logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
}
}
Zpracování chyb
Pokud během zpracování změn dojde k chybě, knihovna kanálu změn restartuje čtení zpráv na místě, kde úspěšně zpracovala poslední dávku. Pokud například aplikace úspěšně zpracovala 10 000 zpráv, pracuje na dávce 10 001 až 10 025 a dojde k chybě, může restartovat a vyzvednout svou práci na pozici 10 001. Knihovna automaticky sleduje, co bylo zpracováno prostřednictvím informací uložených v kontejneru ve službě Leases
Azure Cosmos DB.
Je možné, že služba již odeslala některé zprávy, které se znovu zpracovaly do služby Service Bus. Za normálních okolností by tento scénář vedl ke zpracování duplicitních zpráv. Jak už jsme uvedli dříve, service Bus má funkci pro detekci duplicitních zpráv, kterou je potřeba pro tento scénář povolit. Služba zkontroluje, jestli už byla zpráva přidána do tématu služby Service Bus (nebo fronty) na základě vlastnosti zprávy řízené MessageId
aplikací. Tato vlastnost je nastavena na ID
dokument události. Pokud se stejná zpráva odešle znovu do služby Service Bus, služba ji ignoruje a zahodí.
Údržba
V typické implementaci Transactional Outbox služba aktualizuje zpracovávané události a nastaví Processed
vlastnost na true
, která indikuje, že zpráva byla úspěšně publikována. Toto chování lze implementovat ručně v metodě obslužné rutiny. V aktuálním scénáři takový proces není potřeba. Azure Cosmos DB sleduje události zpracovávané pomocí kanálu změn (v kombinaci s kontejnerem Leases
).
V posledním kroku někdy potřebujete události z kontejneru odstranit, abyste zachovali pouze nejnovější záznamy a dokumenty. K pravidelnému vyčištění platí pro implementaci další funkce služby Azure Cosmos DB: Time To Live (TTL
) na dokumenty. Azure Cosmos DB může automaticky odstraňovat dokumenty na TTL
základě vlastnosti, kterou je možné přidat do dokumentu: časové období v sekundách. Služba bude neustále kontrolovat kontejner dokumentů, které mají TTL
vlastnost. Jakmile vyprší platnost dokumentu, Azure Cosmos DB ho z databáze odebere.
Pokud všechny komponenty fungují podle očekávání, události se zpracovávají a publikují rychle: během několika sekund. Pokud ve službě Azure Cosmos DB dojde k chybě, události se do sběrnice zpráv neodesílají, protože obchodní objekt i odpovídající události nelze do databáze uložit. Jedinou věcí, kterou je potřeba vzít v úvahu, je nastavit v dokumentech odpovídající TTL
hodnotu DomainEvent
, když pracovní proces na pozadí (procesor kanálu změn) nebo service bus není k dispozici. V produkčním prostředí je nejlepší vybrat časový rozsah několika dnů. Například 10 dní. Všechny zúčastněné komponenty pak budou mít dostatek času na zpracování a publikování změn v rámci aplikace.
Shrnutí
Model Transakční pošta k odeslání řeší problém spolehlivého publikování událostí domény v distribuovaných systémech. Potvrzením stavu obchodního objektu a jejích událostí ve stejné transakční dávce a použitím procesoru na pozadí jako předávání zpráv zajistíte, že ostatní služby, interní nebo externí, nakonec obdrží informace, na které závisejí. Tato ukázka není tradiční implementací modelu Transakční pošta k odeslání. Používá funkce, jako je kanál změn služby Azure Cosmos DB a funkce Time To Live, které udržují věci jednoduché a čisté.
Tady je souhrn komponent Azure používaných v tomto scénáři:
Stáhněte si soubor aplikace Visio s touto architekturou.
Výhody tohoto řešení jsou:
- Spolehlivé zasílání zpráv a zaručené doručování událostí
- Zachování pořadí událostí a odstranění duplicit zpráv přes Service Bus
- Není nutné udržovat další
Processed
vlastnost, která indikuje úspěšné zpracování dokumentu události. - Odstranění událostí ze služby Azure Cosmos DB prostřednictvím hodnoty TTL (Time to Live). Tento proces nevyužívají jednotky žádostí potřebné ke zpracování požadavků uživatelů a aplikací. Místo toho v úloze na pozadí používá jednotky žádostí "leftover".
- Zpracování zpráv s důkazy o chybách prostřednictvím
ChangeFeedProcessor
funkce (nebo funkce Azure) - Volitelné: Více procesorů kanálu změn, z nichž každá udržuje svůj vlastní ukazatel v kanálu změn.
Důležité informace
Ukázková aplikace probíraná v tomto článku ukazuje, jak v Azure implementovat model Transakční pošta k odeslání se službou Azure Cosmos DB a Service Bus. Existují také další přístupy, které používají databáze NoSQL. Chcete-li zaručit, že obchodní objekt a události budou spolehlivě uloženy v databázi, můžete vložit seznam událostí do dokumentu obchodního objektu. Nevýhodou tohoto přístupu je, že proces čištění bude muset aktualizovat každý dokument, který obsahuje události. To není ideální, zejména pokud jde o náklady na jednotku žádosti ve srovnání s použitím hodnoty TTL.
Mějte na paměti, že byste neměli brát v úvahu vzorový kód, který je zde k dispozici pro produkční prostředí. Má určitá omezení týkající se multithreadingu, zejména způsobu zpracování událostí ve DomainEntity
třídě a způsobu sledování objektů v CosmosContainerContext
implementacích. Použijte ho jako výchozí bod pro vlastní implementace. Případně zvažte použití existujících knihoven, které už mají tuto funkci integrovanou, jako je NServiceBus nebo MassTransit.
Nasazení tohoto scénáře
Zdrojový kód, soubory nasazení a pokyny k otestování tohoto scénáře najdete na GitHubu: https://github.com/mspnp/transactional-outbox-pattern
Přispěvatelé
Tento článek spravuje Microsoft. Původně byla napsána následujícími přispěvateli.
Hlavní autor:
- Christian Dennig | Vedoucí softwarový inženýr
Pokud chcete zobrazit neveřejné profily LinkedIn, přihlaste se na LinkedIn.
Další kroky
Další informace najdete v těchto článcích:
- Návrh řízený doménou
- Azure Service Bus: Odstranění duplicitních zpráv
- Knihovna procesoru kanálu změn
- Jimmy Bogard: Lepší vzor událostí domény