De estimator van de wijzigingenfeed gebruiken
VAN TOEPASSING OP: NoSQL
In dit artikel wordt beschreven hoe u de voortgang van de processorexemplaren van de wijzigingenfeed kunt controleren terwijl ze de wijzigingenfeed lezen.
Waarom is het bewaken van de voortgang belangrijk?
De wijzigingenfeedprocessor fungeert als een aanwijzer die door uw wijzigingenfeed wordt verplaatst en de wijzigingen aan een gedelegeerde implementatie levert.
De implementatie van de wijzigingenfeedprocessor kan wijzigingen met een bepaalde snelheid verwerken op basis van de beschikbare resources, zoals CPU, geheugen, netwerk, enzovoort.
Als deze snelheid langzamer is dan de snelheid waarmee uw wijzigingen in uw Azure Cosmos DB-container plaatsvinden, begint uw processor achter te blijven.
Als u dit scenario identificeert, begrijpt u of we de implementatie van de wijzigingenfeedprocessor moeten schalen.
De wijzigingsfeed-estimator implementeren
Als pushmodel voor automatische meldingen
Net als de verwerker van de wijzigingenfeed kan de wijzigingsfeedschatter werken als pushmodel. De estimator meet het verschil tussen het laatst verwerkte item (gedefinieerd door de status van de leasecontainer) en de meest recente wijziging in de container en pusht deze waarde naar een gemachtigde. Het interval waarmee de meting wordt uitgevoerd, kan ook worden aangepast met een standaardwaarde van 5 seconden.
Als uw wijzigingenfeedprocessor bijvoorbeeld de nieuwste versiemodus gebruikt en als volgt wordt gedefinieerd:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
De juiste manier om een estimator te initialiseren om die processor te meten, gebruikt GetChangeFeedEstimatorBuilder
als volgt:
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
Waarbij zowel de processor als de estimator dezelfde leaseContainer
en dezelfde naam hebben.
De andere twee parameters zijn de gedelegeerde, die een getal ontvangt dat aangeeft hoeveel wijzigingen moeten worden gelezen door de processor en het tijdsinterval waarop u deze meting wilt uitvoeren.
Een voorbeeld van een gemachtigde die de schatting ontvangt, is:
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
U kunt deze schatting naar uw bewakingsoplossing verzenden en deze gebruiken om te begrijpen hoe uw voortgang zich na verloop van tijd gedraagt.
Als een gedetailleerde schatting op aanvraag
In tegenstelling tot het pushmodel is er een alternatief waarmee u de schatting op aanvraag kunt verkrijgen. Dit model biedt ook meer gedetailleerde informatie:
- De geschatte vertraging per lease.
- Het exemplaar dat eigenaar is van en elke lease verwerkt, zodat u kunt vaststellen of er een probleem is op een exemplaar.
Als uw wijzigingenfeedprocessor als volgt is gedefinieerd:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
U kunt de estimator maken met dezelfde leaseconfiguratie:
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
En wanneer u wilt, met de frequentie die u nodig hebt, kunt u de gedetailleerde schatting verkrijgen:
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Elk ChangeFeedProcessorState
bevat de lease- en vertragingsgegevens en ook de huidige instantie die eigenaar is van het huidige exemplaar.
Implementatie van estimator
De wijzigingsfeed-estimator hoeft niet te worden geïmplementeerd als onderdeel van uw wijzigingenfeedverwerker en hoeft ook geen deel uit te maken van hetzelfde project. U wordt aangeraden de estimator te implementeren op een onafhankelijk exemplaar van uw processors. Eén estimator-exemplaar kan de voortgang van alle leases en exemplaren in de implementatie van de wijzigingenfeedprocessor bijhouden.
Elke schatting verbruikt aanvraageenheden uit uw bewaakte en leasecontainers. Een frequentie van 1 minuut tussen is een goed startpunt, hoe lager de frequentie, hoe hoger de verbruikte aanvraageenheden.
Ondersteunde modi voor wijzigingenfeeds
De estimator van de wijzigingenfeed kan worden gebruikt voor zowel de nieuwste versiemodus als alle versies en de modus Voor verwijderen. In beide modi is de opgegeven schatting niet gegarandeerd een exacte telling van openstaande wijzigingen die moeten worden verwerkt.
Aanvullende bronnen
- Azure Cosmos DB SDK
- Gebruiksvoorbeelden op GitHub (nieuwste versie van.NET)
- Gebruiksvoorbeelden op GitHub (.NET alle versies en verwijderingen)
- Gebruiksvoorbeelden op GitHub (Java)
- Aanvullende voorbeelden op GitHub
Volgende stappen
U kunt nu verdergaan met meer informatie over de verwerker van wijzigingenfeeds in het volgende artikel: