Mönster för händelsereplikeringsuppgifter
Översikten över federationsöversikten och replikeringsfunktionerna förklarar grunderna för och de grundläggande elementen i replikeringsuppgifterna, och vi rekommenderar att du bekantar dig med dem innan du fortsätter med den här artikeln.
I den här artikeln beskriver vi implementeringsvägledningen för flera av de mönster som markerats i översiktsavsnittet.
Replikering
Replikeringsmönstret kopierar händelser från en händelsehubb till nästa, eller från en händelsehubb till något annat mål, till exempel en Service Bus-kö. Händelserna vidarebefordras utan att göra några ändringar i händelsenyttolasten.
Implementeringen av det här mönstret omfattas av händelsereplikeringen mellan Händelsehubbar och Händelsereplikering mellan Event Hubs och Service Bus-exempel och självstudiekursen Använd Apache Kafka MirrorMaker med Event Hubs för det specifika fallet med replikering av data från en Apache Kafka-asynkron koordinator till Event Hubs.
Flöden och orderbevarande
Replikering, antingen via Azure Functions eller Azure Stream Analytics, syftar inte till att säkerställa skapandet av exakta 1:1-kloner av en källhändelsehubb till en målhändelsehubb, utan fokuserar på att bevara den relativa ordningen för händelser där programmet kräver det. Programmet kommunicerar detta genom att gruppera relaterade händelser med samma partitionsnyckel och Event Hubs ordnar meddelanden med samma partitionsnyckel sekventiellt i samma partition.
Viktigt!
"Offset"-informationen är unik för varje händelsehubb och förskjutningar för samma händelser skiljer sig åt mellan Event Hub-instanser. Om du vill hitta en position i en kopierad händelseström använder du tidsbaserade förskjutningar och refererar till de distribuerade tjänsttilldelade metadata.
Tidsbaserade förskjutningar startar mottagaren vid en viss tidpunkt:
- EventPosition.FromStart() – Läs alla behållna data igen.
- EventPosition.FromEnd() – Läsa alla nya data från tidpunkten för anslutningen.
- EventPosition.FromEnqueuedTime(dateTime) – Alla data från ett visst datum och en viss tid.
I EventProcessor anger du positionen via InitialOffsetProvider på EventProcessorOptions. Med de andra mottagar-API:erna skickas positionen genom konstruktorn.
De fördefinierade replikeringsfunktionshjälparna som tillhandahålls som exempel som används i Azure Functions-baserade vägledningen ser till att händelseströmmar med samma partitionsnyckel som hämtas från en källpartition skickas till målhändelsehubben som en batch i den ursprungliga dataströmmen och med samma partitionsnyckel.
Om partitionsantalet för käll- och målhändelsehubben är identiskt mappas alla strömmar i målet till samma partitioner som de gjorde i källan. Om partitionsantalet är annorlunda, vilket är viktigt i några av de ytterligare mönster som beskrivs i följande, skiljer sig mappningen åt, men strömmar hålls alltid tillsammans och i ordning.
Den relativa ordningen för händelser som hör till olika strömmar eller oberoende händelser utan en partitionsnyckel i en målpartition kan alltid skilja sig från källpartitionen.
Tjänsttilldelade metadata
De tjänsttilldelade metadata för en händelse som hämtas från källhändelsehubben, den ursprungliga kötiden, sekvensnumret och förskjutningen ersätts av nya tjänsttilldelade värden i målhändelsehubben, men med hjälpfunktionerna, replikeringsuppgifterna som anges i våra exempel bevaras de ursprungliga värdena i användaregenskaper: repl-enqueue-time
(ISO8601 sträng), repl-sequence
, . repl-offset
Dessa egenskaper är av typen sträng och innehåller det strängifierade värdet för respektive ursprungliga egenskaper. Om händelsen vidarebefordras flera gånger läggs de tjänsttilldelade metadata för den omedelbara källan till i de redan befintliga egenskaperna, med värden avgränsade med semikolon.
Redundans
Om du använder replikering i haveriberedskapssyfte, för att skydda mot regionala tillgänglighetshändelser i Event Hubs-tjänsten eller mot nätverksavbrott, kräver ett sådant fel att du utför en redundansväxling från en händelsehubb till nästa, vilket uppmanar producenter och/eller konsumenter att använda den sekundära slutpunkten.
För alla redundansscenarier förutsätts det att de nödvändiga elementen i namnrymderna är strukturellt identiska, vilket innebär att Event Hubs och Konsumentgrupper är identiskt namngivna och att regler för signatur för delad åtkomst och/eller rollbaserade regler för åtkomstkontroll har konfigurerats på samma sätt. Du kan skapa (och uppdatera) ett sekundärt namnområde genom att följa riktlinjerna för att flytta namnområden och utelämna rensningssteget.
För att tvinga producenter och konsumenter att växla måste du göra informationen om vilket namnområde som ska användas tillgängligt för sökning på en plats som är lätt att nå och uppdatera. Om producenter eller konsumenter stöter på frekventa eller beständiga fel bör de konsultera platsen och justera sin konfiguration. Det finns många sätt att dela den konfigurationen, men vi påpekar två i följande: DNS och filresurser.
DNS-baserad redundanskonfiguration
En kandidatmetod är att lagra informationen i DNS SRV-poster i en DNS som du styr och peka på respektive Event Hub-slutpunkter.
Viktigt!
Tänk på att Event Hubs inte tillåter att dess slutpunkter är direkt aliaserade med CNAME-poster, vilket innebär att du använder DNS som en elastisk uppslagsmekanism för slutpunktsadresser och inte för att direkt matcha IP-adressinformation.
Anta att du äger domänen example.com
och för ditt program en zon test.example.com
. För två alternativa händelsehubbar skapar du nu ytterligare två kapslade zoner och en SRV-post i var och en.
SRV-posterna är, enligt vanlig konvention, prefix med _azure_eventhubs._amqp
och innehåller två slutpunktsposter: en för AMQP-over-TLS på port 5671 och en för AMQP-over-WebSockets på port 443, som båda pekar på Event Hubs-slutpunkten för namnområdet som motsvarar zonen.
Zon | SRV-post |
---|---|
eh1.test.example.com |
_azure_servicebus._amqp.eh1.test.example.com 1 1 5671 eh1-test-example-com.servicebus.windows.net 2 2 443 eh1-test-example-com.servicebus.windows.net |
eh2.test.example.com |
_azure_servicebus._amqp.eh2.test.example.com 1 1 5671 eh2-test-example-com.servicebus.windows.net 2 2 443 eh2-test-example-com.servicebus.windows.net |
I programmets zon skapar du sedan en CNAME-post som pekar på den underordnade zonen som motsvarar din primära händelsehubb:
CNAME-post | Alias |
---|---|
eventhub.test.example.com |
eh1.test.example.com |
Med hjälp av en DNS-klient som gör det möjligt att köra frågor mot CNAME- och SRV-poster explicit (de inbyggda klienterna i Java och .NET tillåter endast enkel matchning av namn till IP-adresser) kan du sedan matcha önskad slutpunkt. Med DnsClient.NET är till exempel uppslagsfunktionen:
static string GetEventHubName(string aliasName)
{
const string SrvRecordPrefix = "_azure_eventhub._amqp.";
LookupClient lookup = new LookupClient();
return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
where srv.Port == 5671
select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}
Funktionen returnerar målvärdnamnet som registrerats för port 5671 i zonen som för närvarande är alias med CNAME enligt ovan.
Om du utför en redundansväxling måste du redigera CNAME-posten och peka den mot den alternativa zonen.
Fördelen med att använda DNS, och särskilt Azure DNS, är att Azure DNS-information replikeras globalt och därför är motståndskraftig mot avbrott i en region.
Den här proceduren liknar hur Event Hubs Geo-DR fungerar, men helt under din egen kontroll och fungerar även med aktiva/aktiva scenarier.
Filresursbaserad redundanskonfiguration
Det enklaste alternativet till att använda DNS för att dela slutpunktsinformation är att placera namnet på den primära slutpunkten i en oformaterad textfil och hantera filen från en infrastruktur som är robust mot avbrott och fortfarande tillåter uppdateringar.
Om du redan kör en webbplatsinfrastruktur med hög tillgänglighet med global tillgänglighet och innehållsreplikering lägger du till en sådan fil där och publicerar om filen om det behövs en växel.
Varning
Du bör bara publicera slutpunktsnamnet på det här sättet, inte en fullständig anslutningssträng inklusive hemligheter.
Extra saker att tänka på när det gäller rederior över konsumenter
För Event Hub-konsumenter beror ytterligare överväganden för redundansstrategin på händelseprocessorns behov.
Om det finns ett haveri som kräver att ett system återskapas, inklusive databaser, från säkerhetskopierade data och databaserna matas direkt eller via mellanliggande bearbetning från händelserna i händelsehubben, återställer du säkerhetskopian och vill sedan börja spela upp händelser i systemet från den tidpunkt då databassäkerhetskopian skapades och inte från det ögonblick då det ursprungliga systemet förstördes.
Om ett fel bara påverkar en del av ett system eller bara en enskild händelsehubb, som inte kan nås, kommer du förmodligen att vilja fortsätta bearbeta händelser från ungefär samma position där bearbetningen avbröts.
Om du vill förverkliga något av scenariot och använda händelseprocessorn för din respektive Azure SDK skapar du ett nytt kontrollpunktslager och anger en inledande partitionsposition baserat på den tidsstämpel som du vill återuppta bearbetningen från.
Om du fortfarande har åtkomst till kontrollpunktsarkivet för den händelsehubb som du växlar från, hjälper de utbredade metadata som beskrivs ovan dig att hoppa över händelser som redan har hanterats och återupptas exakt där du senast slutade.
Slå ihop
Kopplingsmönstret har en eller flera replikeringsuppgifter som pekar på ett mål, eventuellt samtidigt med vanliga producenter som också skickar händelser till samma mål.
Varianter av dessa patters är:
- Två eller flera replikeringsfunktioner hämtar samtidigt händelser från separata källor och skickar dem till samma mål.
- Ytterligare en replikeringsfunktion som hämtar händelser från en källa medan målet också används direkt av producenter.
- Det tidigare mönstret, men speglat mellan två eller flera händelsehubbar, vilket resulterar i de händelsehubbar som innehåller samma strömmar, oavsett var händelser produceras.
De två första mönstervariationerna är triviala och skiljer sig inte från vanliga replikeringsuppgifter.
Det sista scenariot kräver att redan replikerade händelser inte replikeras igen. Tekniken demonstreras och förklaras i EventHubToEventHubMerge-exemplet .
Editor
Redigeringsmönstret bygger på replikeringsmönstret , men meddelanden ändras innan de vidarebefordras.
Exempel på sådana ändringar är:
- Transcoding – Om händelseinnehållet (även kallat "brödtext" eller "nyttolast") kommer från källan som kodas med Apache Avro-formatet eller något eget serialiseringsformat, men förväntningen av att systemet äger målet är att innehållet ska vara JSON-kodat , kommer en transkodningsreplikeringsuppgift först att deserialisera nyttolasten från Apache Avro till ett minnesinternt objektdiagram och sedan serialisera den grafen till JSON format för händelsen som vidarebefordras. Transkodning omfattar även uppgifter för innehållskomprimering och dekomprimering.
- Transformering – Händelser som innehåller strukturerade data kan kräva omformning av dessa data för enklare förbrukning av nedströmsanvändare. Detta kan innebära arbete som att platta ut kapslade strukturer, rensa överflödiga dataelement eller omforma nyttolasten så att den passar exakt för ett visst schema.
- Batchbearbetning – Händelser kan tas emot i batchar (flera händelser i en enda överföring) från en källa, men måste vidarebefordras singly till ett mål eller vice versa. En aktivitet kan därför vidarebefordra flera händelser baserat på en enda överföring av indatahändelser eller aggregera en uppsättning händelser som sedan överförs tillsammans.
- Validering – Händelsedata från externa källor måste ofta kontrolleras om de följer en uppsättning regler innan de kan vidarebefordras. Reglerna kan uttryckas med hjälp av scheman eller kod. Händelser som inte är kompatibla kan tas bort, med problemet antecknade i loggar, eller vidarebefordras till ett särskilt målmål för att hantera dem ytterligare.
- Berikning – Händelsedata som kommer från vissa källor kan kräva berikning med ytterligare kontext för att de ska kunna användas i målsystem. Det kan handla om att leta upp referensdata och bädda in dessa data med händelsen, eller att lägga till information om källan som är känd för replikeringsaktiviteten, men som inte finns i händelserna.
- Filtrering – Vissa händelser som anländer från en källa kan behöva undanhållas från målet baserat på någon regel. Ett filter testar händelsen mot en regel och släpper händelsen om händelsen inte matchar regeln. Att filtrera bort dubbletthändelser genom att observera vissa kriterier och släppa efterföljande händelser med samma värden är en form av filtrering.
- Kryptografi – En replikeringsuppgift kan behöva dekryptera innehåll som kommer från källan och/eller kryptera innehåll som vidarebefordras och framåt till ett mål, och/eller så kan den behöva verifiera innehållets och metadatas integritet i förhållande till en signatur som bärs i händelsen eller bifoga en sådan signatur.
- Attestering – En replikeringsuppgift kan bifoga metadata, som kan skyddas av en digital signatur, till en händelse som intygar att händelsen har tagits emot via en specifik kanal eller vid en viss tidpunkt.
- Länkning – En replikeringsuppgift kan använda signaturer för strömmar av händelser så att dataströmmens integritet skyddas och att händelser som saknas kan identifieras.
Transformerings-, batch- och berikningsmönster implementeras vanligtvis bäst med Azure Stream Analytics-jobb .
Alla dessa mönster kan implementeras med Hjälp av Azure Functions, med event hubs-utlösaren för att hämta händelser och Event Hub-utdatabindningen för att leverera dem.
Routning
Routningsmönstret bygger på replikeringsmönstret , men i stället för att ha en källa och ett mål har replikeringsaktiviteten flera mål, vilket illustreras här i C#:
[FunctionName("EH2EH")]
public static async Task Run(
[EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
[EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
[EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
ILogger log)
{
foreach (EventData eventData in events)
{
// send to output1 and/or output2 based on criteria
EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
});
EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
});
}
}
Routningsfunktionen tar hänsyn till meddelandets metadata och/eller meddelandets nyttolast och väljer sedan något av de tillgängliga mål som ska skickas till.
I Azure Stream Analytics kan du uppnå samma sak med att definiera flera utdata och sedan köra en fråga per utdata.
select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2
Loggprojektion
Loggprojektionsmönstret jämnar ut händelseströmmen till en indexerad databas, där händelser blir poster i databasen. Vanligtvis läggs händelser till i samma samling eller tabell, och partitionsnyckeln för händelsehubben blir en del av den primära nyckeln som letar efter att göra posten unik.
Loggprojektion kan skapa en tidsseriehistoriker för dina händelsedata eller en komprimerad vy, där endast den senaste händelsen behålls för varje partitionsnyckel. Måldatabasens form är i slutändan upp till dig och programmets behov. Det här mönstret kallas även för "händelsekällor".
Dricks
Du kan enkelt skapa loggprojektioner i Azure SQL Database och Azure Cosmos DB i Azure Stream Analytics, och du bör föredra det alternativet.
Följande Azure-funktion projicerar innehållet i en händelsehubb som komprimerats till en Azure Cosmos DB-samling.
[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
[EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
[CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
ILogger log)
{
foreach (var ev in input)
{
if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
{
var record = new
{
id = ev.SystemProperties.PartitionKey,
data = JsonDocument.Parse(ev.Body),
properties = ev.Properties
};
await output.AddAsync(record);
}
}
}