Delen via


Patronen voor gebeurtenisreplicatietaken

In het overzicht van federatie- en replicatorfuncties wordt uitgelegd wat de redenen zijn voor en de basiselementen van replicatietaken. Het is raadzaam om uzelf vertrouwd te maken met deze taken voordat u verdergaat met dit artikel.

In dit artikel beschrijven we de implementatierichtlijnen voor verschillende patronen die in de overzichtssectie zijn gemarkeerd.

Replicatie

Het replicatiepatroon kopieert gebeurtenissen van de ene Event Hub naar de volgende, of van een Event Hub naar een andere bestemming, zoals een Service Bus-wachtrij. De gebeurtenissen worden doorgestuurd zonder wijzigingen aan te brengen in de nettolading van de gebeurtenis.

De implementatie van dit patroon wordt gedekt door de gebeurtenisreplicatie tussen Event Hubs en Event Replication tussen Event Hubs en Service Bus-voorbeelden en de zelfstudie Apache Kafka MirrorMaker gebruiken met Event Hubs voor het specifieke geval van het repliceren van gegevens van een Apache Kafka-broker naar Event Hubs.

Streams en bestelbehoud

Replicatie, hetzij via Azure Functions of Azure Stream Analytics, is niet bedoeld om het maken van exacte 1:1 kloons van een brongebeurtenishub in een doelgebeurtenishub te garanderen, maar is gericht op het behouden van de relatieve volgorde van gebeurtenissen waarvoor de toepassing dit vereist. De toepassing communiceert dit door gerelateerde gebeurtenissen te groeperen met dezelfde partitiesleutel en Event Hubs rangschikt berichten met dezelfde partitiesleutel opeenvolgend in dezelfde partitie.

Belangrijk

De 'offset'-informatie is uniek voor elke Event Hub en offsets voor dezelfde gebeurtenissen verschillen tussen Event Hub-exemplaren. Als u een positie in een gekopieerde gebeurtenisstroom wilt vinden, gebruikt u tijdgebaseerde offsets en verwijst u naar de door de service toegewezen metagegevens.

Op tijd gebaseerde offsets starten uw ontvanger op een bepaald tijdstip:

  • EventPosition.FromStart() - Alle bewaarde gegevens opnieuw lezen.
  • EventPosition.FromEnd() - Alle nieuwe gegevens lezen vanaf het moment van de verbinding.
  • EventPosition.FromEnqueuedTime(dateTime) - Alle gegevens beginnen vanaf een bepaalde datum en tijd.

In de EventProcessor stelt u de positie in via de InitialOffsetProvider op de EventProcessorOptions. Met de andere ontvanger-API's wordt de positie doorgegeven via de constructor.

De vooraf gebouwde hulp voor replicatiefuncties die worden geleverd als voorbeelden die worden gebruikt in de op Azure Functions gebaseerde richtlijnen, zorgen ervoor dat gebeurtenisstromen met dezelfde partitiesleutel die uit een bronpartitie zijn opgehaald, als een batch in de oorspronkelijke stream en met dezelfde partitiesleutel in de doel-Event Hub worden verzonden.

Als het aantal partities van de bron- en doelgebeurtenishub identiek is, worden alle streams in het doel toegewezen aan dezelfde partities als in de bron. Als het aantal partities verschilt, wat van belang is in een aantal verdere patronen die in het volgende worden beschreven, verschilt de toewijzing, maar worden streams altijd bij elkaar gehouden en in volgorde gehouden.

De relatieve volgorde van gebeurtenissen die behoren tot verschillende streams of van onafhankelijke gebeurtenissen zonder partitiesleutel in een doelpartitie kan altijd verschillen van de bronpartitie.

Door de service toegewezen metagegevens

De door de service toegewezen metagegevens van een gebeurtenis die is verkregen uit de bronGebeurtenishub, de oorspronkelijke wachtrijtijd, het volgnummer en de offset, worden vervangen door nieuwe service toegewezen waarden in de doel-Event Hub, maar met de helperfuncties, replicatietaken die in onze voorbeelden worden verstrekt, blijven de oorspronkelijke waarden behouden in gebruikerseigenschappen: repl-enqueue-time (ISO8601 tekenreeks), repl-sequence, repl-offset.

Deze eigenschappen zijn van het type tekenreeks en bevatten de tekenreekswaarde van de respectieve oorspronkelijke eigenschappen. Als de gebeurtenis meerdere keren wordt doorgestuurd, worden de door de service toegewezen metagegevens van de onmiddellijke bron toegevoegd aan de reeds bestaande eigenschappen, met waarden gescheiden door puntkomma's.

Failover

Als u replicatie gebruikt voor herstel na noodgevallen, om te beschermen tegen regionale beschikbaarheidsgebeurtenissen in de Event Hubs-service of tegen netwerkonderbrekingen, moet voor dergelijke fouten een failover van de ene Event Hub naar het volgende worden uitgevoerd, waarbij producenten en/of consumenten het secundaire eindpunt moeten gebruiken.

Voor alle failoverscenario's wordt ervan uitgegaan dat de vereiste elementen van de naamruimten structureel identiek zijn, wat betekent dat Event Hubs en consumentengroepen identiek zijn benoemd en dat gedeelde toegangshandtekeningsregels en/of regels voor op rollen gebaseerd toegangsbeheer op dezelfde manier worden ingesteld. U kunt een secundaire naamruimte maken (en bijwerken) door de richtlijnen te volgen voor het verplaatsen van naamruimten en het weglaten van de opschoonstap.

Als u wilt afdwingen dat producenten en consumenten overschakelen, moet u de informatie over welke naamruimte beschikbaar moet worden gemaakt voor zoeken op een locatie die gemakkelijk te bereiken en bij te werken is. Als producenten of consumenten frequente of permanente fouten ondervinden, moeten ze die locatie raadplegen en hun configuratie aanpassen. Er zijn talloze manieren om die configuratie te delen, maar we wijzen op twee in het volgende: DNS en bestandsshares.

Failoverconfiguratie op basis van DNS

Een kandidaatbenadering is het opslaan van de informatie in DNS SRV-records in een DNS die u beheert en verwijst naar de respectieve Event Hub-eindpunten.

Belangrijk

Houd er rekening mee dat Event Hubs niet toestaat dat de eindpunten rechtstreeks worden gealiaseerd met CNAME-records. Dit betekent dat u DNS gebruikt als een tolerant opzoekmechanisme voor eindpuntadressen en niet om IP-adresgegevens rechtstreeks om te zetten.

Stel dat u de eigenaar bent van het domein example.com en, voor uw toepassing, een zone test.example.com. Voor twee alternatieve Event Hubs maakt u nu twee andere geneste zones en een SRV-record in elk.

De SRV-records zijn, volgens de algemene conventie, voorafgegaan door _azure_eventhubs._amqp en bevatten twee eindpuntrecords: één voor AMQP-over-TLS op poort 5671 en één voor AMQP-over-WebSockets op poort 443, die beide verwijzen naar het Event Hubs-eindpunt van de naamruimte die overeenkomt met de zone.

Zone SRV-record
eh1.test.example.com _azure_servicebus._amqp.eh1.test.example.com
1 1 5671 eh1-test-example-com.servicebus.windows.net
2 2 443 eh1-test-example-com.servicebus.windows.net
eh2.test.example.com _azure_servicebus._amqp.eh2.test.example.com
1 1 5671 eh2-test-example-com.servicebus.windows.net
2 2 443 eh2-test-example-com.servicebus.windows.net

In de zone van uw toepassing maakt u vervolgens een CNAME-vermelding die verwijst naar de onderliggende zone die overeenkomt met uw primaire Event Hub:

CNAME-record Alias
eventhub.test.example.com eh1.test.example.com

Met behulp van een DNS-client die het uitvoeren van query's op CNAME- en SRV-records expliciet toestaat (de ingebouwde clients van Java en .NET alleen eenvoudige omzetting van namen naar IP-adressen toestaan), kunt u vervolgens het gewenste eindpunt omzetten. Met DnsClient.NET is de opzoekfunctie bijvoorbeeld:

static string GetEventHubName(string aliasName)
{
    const string SrvRecordPrefix = "_azure_eventhub._amqp.";
    LookupClient lookup = new LookupClient();

    return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
            from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
            where srv.Port == 5671
            select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}

De functie retourneert de doelhostnaam die is geregistreerd voor poort 5671 van de zone die momenteel is gealiaseerd met de CNAME, zoals hierboven wordt weergegeven.

Voor het uitvoeren van een failover moet de CNAME-record worden bewerkt en naar de alternatieve zone worden verwijst.

Het voordeel van het gebruik van DNS, en met name Azure DNS, is dat Azure DNS-gegevens wereldwijd worden gerepliceerd en daarom bestand zijn tegen storingen in één regio.

Deze procedure is vergelijkbaar met de werking van de Event Hubs Geo-DR , maar volledig onder uw eigen controle en werkt ook met actieve/actieve scenario's.

Failoverconfiguratie op basis van bestandsshares

Het eenvoudigste alternatief voor het gebruik van DNS voor het delen van eindpuntgegevens is het plaatsen van de naam van het primaire eindpunt in een bestand zonder opmaak en het bestand uit een infrastructuur die robuust is tegen storingen en nog steeds updates toestaat.

Als u al een maximaal beschikbare website-infrastructuur uitvoert met globale beschikbaarheid en inhoudsreplicatie, voegt u daar een dergelijk bestand toe en publiceert u het bestand opnieuw als er een switch nodig is.

Let op

U moet de naam van het eindpunt alleen op deze manier publiceren, niet een volledige verbindingsreeks inclusief geheimen.

Extra overwegingen voor failover van consumenten

Voor Event Hub-consumenten zijn verdere overwegingen voor de failoverstrategie afhankelijk van de behoeften van de gebeurtenisprocessor.

Als er zich een noodgeval voordoet waarbij een systeem, inclusief databases, van back-upgegevens, opnieuw moet worden opgebouwd en de databases rechtstreeks of via tussenliggende verwerking vanuit de gebeurtenissen in de Event Hub moeten worden verwerkt, herstelt u de back-up en wilt u vervolgens gebeurtenissen opnieuw afspelen in het systeem vanaf het moment waarop de databaseback-up is gemaakt en niet vanaf het moment dat het oorspronkelijke systeem is vernietigd.

Als een fout alleen van invloed is op een segment van een systeem of zelfs slechts één Event Hub, die onbereikbaar is geworden, wilt u waarschijnlijk doorgaan met het verwerken van gebeurtenissen vanaf ongeveer dezelfde positie waar de verwerking is onderbroken.

Als u een van beide scenario's wilt realiseren en de gebeurtenisprocessor van uw respectieve Azure SDK wilt gebruiken, maakt u een nieuw controlepuntarchief en geeft u een initiële partitiepositie op op basis van de tijdstempel waaruit u de verwerking wilt hervatten.

Als u nog steeds toegang hebt tot het controlepuntarchief van de Event Hub waarvan u afschakelt, kunt u met de hierboven beschreven doorgegeven metagegevens gebeurtenissen overslaan die al zijn verwerkt en hervat, precies vanaf waar u het laatst was gebleven.

Samenvoeging

Het samenvoegpatroon bevat een of meer replicatietaken die verwijzen naar één doel, mogelijk gelijktijdig met reguliere producenten die ook gebeurtenissen naar hetzelfde doel verzenden.

Variaties van deze patters zijn:

  • Twee of meer replicatiefuncties verzamelen gelijktijdig gebeurtenissen uit afzonderlijke bronnen en verzenden ze naar hetzelfde doel.
  • Nog een replicatiefunctie die gebeurtenissen van een bron verkrijgt, terwijl het doel ook rechtstreeks door producenten wordt gebruikt.
  • Het vorige patroon, maar gespiegeld tussen twee of meer Event Hubs, wat resulteert in die Event Hubs die dezelfde streams bevatten, ongeacht waar gebeurtenissen worden geproduceerd.

De eerste twee patroonvariaties zijn triviaal en verschillen niet van normale replicatietaken.

In het laatste scenario moeten al gerepliceerde gebeurtenissen niet opnieuw worden gerepliceerd. De techniek wordt gedemonstreerd en uitgelegd in het EventHubToEventHubMerge-voorbeeld .

Editor

Het editorpatroon is gebaseerd op het replicatiepatroon , maar berichten worden gewijzigd voordat ze worden doorgestuurd.

Voorbeelden voor dergelijke wijzigingen zijn:

  • Transcodering : als de gebeurtenisinhoud (ook wel 'body' of 'payload' genoemd) afkomstig is van de bron die is gecodeerd met behulp van de Apache Avro-indeling of een eigen serialisatie-indeling, maar de verwachting van het systeem dat eigenaar is van het doel, is dat de inhoud JSON wordt gecodeerd, een transcoderingsreplicatietaak eerst deserialiseert de nettolading van Apache Avro in een objectgrafiek in het geheugen en die grafiek vervolgens serialiseert in de JSON indeling voor de gebeurtenis die wordt doorgestuurd. Transcodering omvat ook inhoudscompressie - en decompressietaken.
  • Transformatie : gebeurtenissen die gestructureerde gegevens bevatten, moeten die gegevens mogelijk opnieuw worden aangepast voor eenvoudiger verbruik door downstreamgebruikers. Dit kan betrekking hebben op het platmaken van geneste structuren, het verwijderen van overbodige gegevenselementen of het opnieuw vormgeven van de nettolading zodat deze exact in een bepaald schema past.
  • Batchverwerking : gebeurtenissen kunnen worden ontvangen in batches (meerdere gebeurtenissen in één overdracht) van een bron, maar moeten singly worden doorgestuurd naar een doel of omgekeerd. Een taak kan daarom meerdere gebeurtenissen doorsturen op basis van één invoergebeurtenisoverdracht of een reeks gebeurtenissen aggregeren die vervolgens samen worden overgedragen.
  • Validatie : gebeurtenisgegevens uit externe bronnen moeten vaak worden gecontroleerd of ze voldoen aan een set regels voordat ze kunnen worden doorgestuurd. De regels kunnen worden uitgedrukt met behulp van schema's of code. Gebeurtenissen die niet in overeenstemming zijn, kunnen worden verwijderd, met het probleem dat wordt vermeld in logboeken of kunnen worden doorgestuurd naar een speciale doelbestemming om ze verder te verwerken.
  • Verrijking : gebeurtenisgegevens die afkomstig zijn uit sommige bronnen, vereisen mogelijk verrijking met verdere context, zodat deze bruikbaar zijn in doelsystemen. Dit kan betrekking hebben op het opzoeken van referentiegegevens en het insluiten van die gegevens met de gebeurtenis, of het toevoegen van informatie over de bron die bekend is bij de replicatietaak, maar niet in de gebeurtenissen.
  • Filteren : sommige gebeurtenissen die afkomstig zijn van een bron, moeten mogelijk worden ingehouden vanuit het doel op basis van een bepaalde regel. Met een filter wordt de gebeurtenis getest op basis van een regel en wordt de gebeurtenis verwijderd als de gebeurtenis niet overeenkomt met de regel. Dubbele gebeurtenissen filteren door bepaalde criteria te observeren en volgende gebeurtenissen met dezelfde waarden te verwijderen, is een vorm van filteren.
  • Cryptografie : een replicatietaak moet mogelijk inhoud ontsleutelen die afkomstig is van de bron en/of inhoud versleutelen die wordt doorgestuurd naar een doel, en/of het moet mogelijk de integriteit van inhoud en metagegevens verifiëren ten opzichte van een handtekening die in de gebeurtenis wordt uitgevoerd, of een dergelijke handtekening bijvoegen.
  • Attestation : een replicatietaak kan metagegevens, mogelijk beveiligd door een digitale handtekening, koppelen aan een gebeurtenis die bevestigt dat de gebeurtenis is ontvangen via een specifiek kanaal of op een bepaald tijdstip.
  • Koppelen: een replicatietaak kan handtekeningen toepassen op stromen gebeurtenissen, zodat de integriteit van de stream wordt beveiligd en ontbrekende gebeurtenissen kunnen worden gedetecteerd.

De transformatie-, batch- en verrijkingspatronen zijn over het algemeen het best geïmplementeerd met Azure Stream Analytics-taken .

Al deze patronen kunnen worden geïmplementeerd met behulp van Azure Functions, met behulp van de Event Hubs-trigger voor het verkrijgen van gebeurtenissen en de Event Hub-uitvoerbinding voor het leveren ervan.

Routering

Het routeringspatroon is gebaseerd op het replicatiepatroon , maar in plaats van één bron en één doel te hebben, heeft de replicatietaak meerdere doelen, zoals hier wordt geïllustreerd in C#:

[FunctionName("EH2EH")]
public static async Task Run(
    [EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
    [EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
    [EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
    ILogger log)
{
    foreach (EventData eventData in events)
    {
        // send to output1 and/or output2 based on criteria
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
        });
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
        });
    }
}

De routeringsfunctie houdt rekening met de metagegevens van het bericht en/of de nettolading van het bericht en kiest vervolgens een van de beschikbare bestemmingen om naar te verzenden.

In Azure Stream Analytics kunt u hetzelfde bereiken met het definiëren van meerdere uitvoerwaarden en vervolgens het uitvoeren van een query per uitvoer.

select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2

Logboekprojectie

Met het patroon voor logboekprojectie wordt de gebeurtenisstroom afgevlakt naar een geïndexeerde database, waarbij gebeurtenissen records in de database worden. Normaal gesproken worden gebeurtenissen toegevoegd aan dezelfde verzameling of tabel en maakt de Event Hub-partitiesleutel deel uit van de primaire sleutel die de record uniek maakt.

Logboekprojectie kan een tijdreekshistoricus van uw gebeurtenisgegevens of een gecomprimeerde weergave produceren, waarbij alleen de meest recente gebeurtenis wordt bewaard voor elke partitiesleutel. De vorm van de doeldatabase is uiteindelijk aan u en de behoeften van uw toepassing. Dit patroon wordt ook wel gebeurtenisbronnen genoemd.

Tip

U kunt eenvoudig logboekprojecties maken in Azure SQL Database en Azure Cosmos DB in Azure Stream Analytics, en u moet de voorkeur geven aan die optie.

De volgende Azure Function projecteert de inhoud van een Event Hub die is gecomprimeerd in een Azure Cosmos DB-verzameling.

[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
    [EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
    [CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
    ILogger log)
{
    foreach (var ev in input)
    {
        if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
        {
            var record = new
            {
                id = ev.SystemProperties.PartitionKey,
                data = JsonDocument.Parse(ev.Body),
                properties = ev.Properties
            };
            await output.AddAsync(record);
        }
    }
}

Volgende stappen