Modelli di attività di replica di eventi
La panoramica della federazione e la panoramica delle funzioni del replicatore illustrano la logica per e gli elementi di base delle attività di replica ed è consigliabile acquisire familiarità con loro prima di continuare con questo articolo.
In questo articolo vengono illustrate in dettaglio le linee guida per l'implementazione per diversi modelli evidenziati nella sezione panoramica.
Replica
Il modello di replica copia gli eventi da un hub eventi a quello successivo o da un hub eventi a un'altra destinazione, ad esempio una coda di bus di servizio. Gli eventi vengono inoltrati senza apportare modifiche al payload dell'evento.
L'implementazione di questo modello è coperta dalla replica di eventi tra Hub eventi e la replica di eventi tra Hub eventi e gli esempi di bus di servizio e l'esercitazione Usare Apache Kafka MirrorMaker con Hub eventi per il caso specifico di replica dei dati da un broker Apache Kafka in Hub eventi.
Flussi e conservazione degli ordini
La replica, tramite Funzioni di Azure o Analisi di flusso di Azure, non mira a garantire la creazione di cloni esatti 1:1 di un hub eventi di origine in un hub eventi di destinazione, ma è incentrata sul mantenimento dell'ordine relativo degli eventi in cui l'applicazione lo richiede. L'applicazione comunica questo raggruppamento di eventi correlati con la stessa chiave di partizione e Hub eventi dispone i messaggi con la stessa chiave di partizione in sequenza nella stessa partizione.
Importante
Le informazioni di "offset" sono univoche per ogni hub eventi e gli offset per gli stessi eventi differiscono tra le istanze di Hub eventi. Per individuare una posizione in un flusso di eventi copiato, usare gli offset basati sul tempo e fare riferimento ai metadati propagati assegnati al servizio.
Gli offset basati sul tempo avviano il ricevitore in un momento specifico:
- EventPosition.FromStart(): legge di nuovo tutti i dati conservati.
- EventPosition.FromEnd(): legge tutti i nuovi dati dal momento della connessione.
- EventPosition.FromEnqueuedTime(dateTime): tutti i dati a partire da una data e un'ora specificati.
In EventProcessor impostare la posizione tramite InitialOffsetProvider in EventProcessorOptions. Con le altre API ricevitore, la posizione viene passata attraverso il costruttore.
Gli helper della funzione di replica predefiniti forniti come esempi usati nelle linee guida basate su Funzioni di Azure assicurano che i flussi di eventi con la stessa chiave di partizione recuperata da una partizione di origine vengano inviati all'hub eventi di destinazione come batch nel flusso originale e con la stessa chiave di partizione.
Se il numero di partizioni dell'hub eventi di origine e di destinazione è identico, tutti i flussi nella destinazione verranno mappati alle stesse partizioni dell'origine. Se il numero di partizioni è diverso, che conta in alcuni dei modelli descritti di seguito, il mapping sarà diverso, ma i flussi vengono sempre mantenuti insieme e in ordine.
L'ordine relativo degli eventi appartenenti a flussi diversi o di eventi indipendenti senza una chiave di partizione in una partizione di destinazione può essere sempre diverso dalla partizione di origine.
Metadati assegnati al servizio
I metadati assegnati dal servizio di un evento ottenuto dall'hub eventi di origine, l'ora di accodamento originale, il numero di sequenza e l'offset vengono sostituiti da nuovi valori assegnati dal servizio nell'hub eventi di destinazione, ma con le funzioni helper, le attività di replica fornite negli esempi, i valori originali vengono mantenuti nelle proprietà utente: repl-enqueue-time
(ISO8601 stringa), repl-sequence
, repl-offset
.
Tali proprietà sono di tipo string e contengono il valore stringaficato delle rispettive proprietà originali. Se l'evento viene inoltrato più volte, i metadati assegnati dal servizio dell'origine immediata vengono aggiunti alle proprietà già esistenti, con valori separati da punto e virgola.
Failover
Se si usa la replica a scopo di ripristino di emergenza, per proteggersi da eventi di disponibilità a livello di area nel servizio Hub eventi o da interruzioni di rete, qualsiasi scenario di errore di questo tipo richiederà l'esecuzione di un failover da un hub eventi al successivo, indicando ai produttori e/o ai consumer di usare l'endpoint secondario.
Per tutti gli scenari di failover, si presuppone che gli elementi obbligatori degli spazi dei nomi siano strutturalmente identici, vale a dire che Hub eventi e gruppi di consumer sono denominati in modo identico e che le regole di firma di accesso condiviso e/o le regole di controllo degli accessi in base al ruolo vengono configurate nello stesso modo. È possibile creare (e aggiornare) uno spazio dei nomi secondario seguendo le indicazioni per lo spostamento degli spazi dei nomi e omettendo il passaggio di pulizia.
Per forzare i produttori e i consumer a passare, è necessario rendere disponibili le informazioni sullo spazio dei nomi da usare per la ricerca in una posizione facile da raggiungere e aggiornare. Se i produttori o i consumer riscontrano errori frequenti o persistenti, devono consultare tale posizione e regolare la configurazione. Esistono diversi modi per condividere tale configurazione, ma vengono indicati due nei modi seguenti: DNS e condivisioni file.
Configurazione del failover basato su DNS
Un approccio candidato consiste nel contenere le informazioni nei record DNS SRV in un DNS controllato e puntare ai rispettivi endpoint dell'hub eventi.
Importante
Tenere presente che Hub eventi non consente l'alias diretto degli endpoint con i record CNAME, il che significa che si userà DNS come meccanismo di ricerca resiliente per gli indirizzi endpoint e non per risolvere direttamente le informazioni sugli indirizzi IP.
Si supponga di essere proprietari del dominio example.com
e, per l'applicazione, di una zona test.example.com
. Per due hub eventi alternativi, verranno ora create due zone annidate e un record SRV in ognuno.
I record SRV sono, seguendo una convenzione comune, preceduti _azure_eventhubs._amqp
da e contengono due record endpoint: uno per AMQP-over-TLS sulla porta 5671 e uno per AMQP-over-WebSocket sulla porta 443, entrambi puntando all'endpoint di Hub eventi dello spazio dei nomi corrispondente alla zona.
Zona | Record SRV |
---|---|
eh1.test.example.com |
_azure_servicebus._amqp.eh1.test.example.com 1 1 5671 eh1-test-example-com.servicebus.windows.net 2 2 443 eh1-test-example-com.servicebus.windows.net |
eh2.test.example.com |
_azure_servicebus._amqp.eh2.test.example.com 1 1 5671 eh2-test-example-com.servicebus.windows.net 2 2 443 eh2-test-example-com.servicebus.windows.net |
Nella zona dell'applicazione si creerà quindi una voce CNAME che punta alla zona subordinata corrispondente all'hub eventi primario:
Record CNAME | Alias |
---|---|
eventhub.test.example.com |
eh1.test.example.com |
Usando un client DNS che consente di eseguire query su record CNAME e SRV in modo esplicito (i client predefiniti di Java e .NET consentono solo la risoluzione semplice dei nomi agli indirizzi IP), è quindi possibile risolvere l'endpoint desiderato. Con DnsClient.NET, ad esempio, la funzione di ricerca è:
static string GetEventHubName(string aliasName)
{
const string SrvRecordPrefix = "_azure_eventhub._amqp.";
LookupClient lookup = new LookupClient();
return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
where srv.Port == 5671
select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}
La funzione restituisce il nome host di destinazione registrato per la porta 5671 della zona attualmente con alias con CNAME, come illustrato in precedenza.
L'esecuzione di un failover richiede la modifica del record CNAME e la punta alla zona alternativa.
Il vantaggio dell'uso di DNS e, in particolare di DNS di Azure, è che le informazioni DNS di Azure vengono replicate a livello globale e quindi resilienti in caso di interruzioni a singola area.
Questa procedura è simile al funzionamento del ripristino di emergenza geografico di Hub eventi, ma completamente sotto il proprio controllo e funziona anche con scenari attivi/attivi.
Configurazione del failover basato su condivisione file
L'alternativa più semplice all'uso del DNS per la condivisione delle informazioni sugli endpoint consiste nell'inserire il nome dell'endpoint primario in un file di testo normale e gestire il file da un'infrastruttura affidabile contro le interruzioni e consente comunque gli aggiornamenti.
Se si esegue già un'infrastruttura del sito Web a disponibilità elevata con disponibilità globale e replica del contenuto, aggiungere tale file e ripubblicare il file se è necessario un commutatore.
Attenzione
È consigliabile pubblicare il nome dell'endpoint solo in questo modo, non un stringa di connessione completo, inclusi i segreti.
Considerazioni aggiuntive per il failover dei consumer
Per i consumer di Hub eventi, altre considerazioni per la strategia di failover dipendono dalle esigenze del processore di eventi.
Se si verifica un'emergenza che richiede la ricompilazione di un sistema, inclusi i database, dai dati di backup e i database vengono inseriti direttamente o tramite l'elaborazione intermedia degli eventi contenuti nell'hub eventi, si ripristinerà il backup e quindi si desidera avviare la riproduzione degli eventi nel sistema dal momento in cui è stato creato il backup del database e non dal momento in cui il sistema originale è stato distrutto.
Se un errore interessa solo una sezione di un sistema o solo un singolo hub eventi, che è diventato non raggiungibile, è probabile che si voglia continuare a elaborare gli eventi dalla stessa posizione in cui l'elaborazione è stata interrotta.
Per realizzare uno scenario e usare il processore di eventi della rispettiva SDK di Azure, si creerà un nuovo archivio checkpoint e si fornirà una posizione iniziale della partizione, in base al timestamp da cui si vuole riprendere l'elaborazione.
Se si ha ancora accesso all'archivio checkpoint dell'hub eventi da cui si passa, i metadati propagati descritti in precedenza consentiranno di ignorare gli eventi già gestiti e riprendere esattamente da dove è stato interrotto l'ultimo.
Unire
Il modello di unione ha una o più attività di replica che puntano a una destinazione, possibilmente contemporaneamente ai producer regolari che inviano eventi alla stessa destinazione.
Le varianti di questi patter sono:
- Due o più funzioni di replica acquisiscono simultaneamente eventi da origini separate e li inviano alla stessa destinazione.
- Un'altra funzione di replica che acquisisce eventi da un'origine mentre la destinazione viene usata direttamente dai producer.
- Il modello precedente, ma con mirroring tra due o più hub eventi, determinando gli hub eventi contenenti gli stessi flussi, indipendentemente dalla posizione in cui vengono generati gli eventi.
Le prime due varianti di modello sono semplici e non differiscono dalle attività di replica normale.
L'ultimo scenario richiede l'esclusione di eventi già replicati dalla replica. La tecnica viene illustrata e illustrata nell'esempio EventHubToEventHubMerge .
Editor
Il modello di editor si basa sul modello di replica , ma i messaggi vengono modificati prima che vengano inoltrati.
Esempi di tali modifiche sono:
- Transcodifica: se il contenuto dell'evento (detto anche "corpo" o "payload") arriva dall'origine codificato usando il formato Apache Avro o un formato di serializzazione proprietario, ma l'aspettativa del sistema proprietario della destinazione è che il contenuto venga codificato in formato JSON, un'attività di replica transcodifica prima deserializzerà il payload da Apache Avro in un oggetto grafico in memoria e quindi serializzerà il grafico nel file JSON formato per l'evento che viene inoltrato. La transcodifica include anche attività di compressione e decompressione del contenuto.
- Trasformazione: gli eventi che contengono dati strutturati possono richiedere la riorganizzazione dei dati per semplificare l'utilizzo da parte dei consumer downstream. Questo può comportare operazioni come l'appiattimento di strutture annidate, l'eliminazione di elementi di dati estranei o la modifica del payload per adattarsi esattamente a uno schema specifico.
- Invio in batch: gli eventi possono essere ricevuti in batch (più eventi in un singolo trasferimento) da un'origine, ma devono essere inoltrati in modo singly a una destinazione o viceversa. Un'attività può quindi inoltrare più eventi in base a un singolo trasferimento di eventi di input o aggregare un set di eventi che vengono quindi trasferiti insieme.
- Convalida: i dati degli eventi provenienti da origini esterne spesso devono essere verificati se sono conformi a un set di regole prima che vengano inoltrati. Le regole possono essere espresse usando schemi o codice. Gli eventi che non devono essere conformi possono essere eliminati, con il problema annotato nei log o possono essere inoltrati a una destinazione di destinazione speciale per gestirli ulteriormente.
- Arricchimento : i dati degli eventi provenienti da alcune origini potrebbero richiedere l'arricchimento con un contesto aggiuntivo perché sia utilizzabile nei sistemi di destinazione. Ciò può comportare la ricerca dei dati di riferimento e l'incorporamento dei dati con l'evento o l'aggiunta di informazioni sull'origine nota all'attività di replica, ma non contenute negli eventi.
- Filtro: alcuni eventi provenienti da un'origine potrebbero dover essere trattenuti dalla destinazione in base a una regola. Un filtro verifica l'evento in base a una regola e elimina l'evento se l'evento non corrisponde alla regola. Filtrare gli eventi duplicati osservando determinati criteri ed eliminando gli eventi successivi con gli stessi valori è una forma di filtro.
- Crittografia: un'attività di replica potrebbe dover decrittografare il contenuto proveniente dall'origine e/o crittografare il contenuto inoltrato a una destinazione e/o deve verificare l'integrità del contenuto e dei metadati relativi a una firma eseguita nell'evento o allegare tale firma.
- Attestazione : un'attività di replica può allegare metadati, potenzialmente protetti da una firma digitale, a un evento che attesta che l'evento è stato ricevuto tramite un canale specifico o in un momento specifico.
- Concatenamento : un'attività di replica può applicare firme ai flussi di eventi in modo che l'integrità del flusso sia protetta e che gli eventi mancanti possano essere rilevati.
I modelli di trasformazione, invio in batch e arricchimento sono in genere implementati meglio con i processi di Analisi di flusso di Azure.
Tutti questi modelli possono essere implementati usando Funzioni di Azure, usando il trigger di Hub eventi per l'acquisizione di eventi e l'associazione di output dell'hub eventi per il recapito.
Definizione dei percorsi di trasferimento
Il modello di routing si basa sul modello di replica , ma invece di avere un'origine e una destinazione, l'attività di replica ha più destinazioni, illustrate qui in C#:
[FunctionName("EH2EH")]
public static async Task Run(
[EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
[EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
[EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
ILogger log)
{
foreach (EventData eventData in events)
{
// send to output1 and/or output2 based on criteria
EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
});
EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
});
}
}
La funzione di routing considererà i metadati del messaggio e/o il payload del messaggio e quindi scegli una delle destinazioni disponibili a cui inviare.
In Analisi di flusso di Azure è possibile ottenere lo stesso risultato definendo più output e quindi eseguendo una query per output.
select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2
Proiezione log
Il modello di proiezione del log rende flat il flusso di eventi in un database indicizzato, con eventi che diventano record nel database. In genere, gli eventi vengono aggiunti alla stessa raccolta o tabella e la chiave di partizione dell'hub eventi diventa parte della chiave primaria che cerca di rendere univoco il record.
La proiezione di log può produrre uno storico della serie temporale dei dati dell'evento o una visualizzazione compattata, in cui solo l'evento più recente viene mantenuto per ogni chiave di partizione. La forma del database di destinazione dipende dall'utente e dalle esigenze dell'applicazione. Questo modello viene anche definito "origine eventi".
Suggerimento
È possibile creare facilmente proiezioni di log in database SQL di Azure e Azure Cosmos DB in Analisi di flusso di Azure ed è consigliabile scegliere questa opzione.
La funzione di Azure seguente proietta il contenuto di un hub eventi compattato in una raccolta di Azure Cosmos DB.
[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
[EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
[CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
ILogger log)
{
foreach (var ev in input)
{
if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
{
var record = new
{
id = ev.SystemProperties.PartitionKey,
data = JsonDocument.Parse(ev.Body),
properties = ev.Properties
};
await output.AddAsync(record);
}
}
}