Condividi tramite


Introduzione a Change Data Capture nell'archivio analitico per Azure Cosmos DB

SI APPLICA A: NoSQL MongoDB

Usare Change Data Capture (CDC) nell'archivio analitico di Azure Cosmos DB come origine per Azure Data Factory o Azure Synapse Analytics per acquisire modifiche specifiche ai dati.

Nota

Si noti che l'interfaccia del servizio collegato per l'API Azure Cosmos DB for MongoDB non è ancora disponibile nel flusso di dati. Tuttavia, è possibile usare l'endpoint documento dell'account con l'interfaccia del servizio collegato "Azure Cosmos DB for NoSQL" come soluzione alternativa fino a quando non sarà supportato il servizio collegato Mongo. In un servizio collegato NoSQL scegliere "Immetti manualmente" per fornire le informazioni sull'account Cosmos DB e usare l'endpoint documento dell'account (ad esempio, https://[your-database-account-uri].documents.azure.com:443/) anziché l'endpoint MongoDB (ad esempio, mongodb://[your-database-account-uri].mongo.cosmos.azure.com:10255/)

Prerequisiti

Abilitare l'archivio analitico

Prima di tutto, abilitare Collegamento ad Azure Synapse a livello dell'account e quindi abilitare l'archivio analitico per i contenitori appropriato per il carico di lavoro.

  1. Abilitare Collegamento ad Azure Synapse: Abilitare Collegamento ad Azure Synapse per un account Azure Cosmos DB

  2. Abilitare l'archivio analitico per i contenitori:

    Opzione Guida
    Abilitare per un nuovo contenitore specifico Abilitare Collegamento ad Azure Synapse per i nuovi contenitori
    Abilitare per un contenitore specifico esistente Abilitare Collegamento ad Azure Synapse per i contenitori esistenti

Creare una risorsa di Azure di destinazione usando i flussi di dati

La funzionalità Change Data Capture dell'archivio analitico è disponibile tramite la funzionalità flusso di dati di Azure Data Factory o Azure Synapse Analytics. Per questa guida, usare Azure Data Factory.

Importante

In alternativa, è possibile usare Azure Synapse Analytics. Prima di tutto, creare un'area di lavoro di Azure Synapse, se non ne è già disponibile una. Nell'area di lavoro appena creata selezionare la scheda Sviluppo, quindi Aggiungi nuova risorsa e infine Flusso di dati.

  1. Creare un'istanza di Azure Data Factory, se non ne è già disponibile una.

    Suggerimento

    Se possibile, creare la data factory nella stessa area in cui risiede l'account Azure Cosmos DB.

  2. Avviare la data factory appena creata.

  3. Nella data factory selezionare la scheda Flussi di dati e quindi selezionare Nuovo flusso di dati.

  4. Assegnare al flusso di dati appena creato un nome univoco. In questo esempio il flusso di dati è denominato cosmoscdc.

    Screnshot di un nuovo flusso di dati con il nome cosmoscdc.

Configurare le impostazioni di origine per il contenitore dell'archivio analitico

Creare e configurare un'origine per il flusso dei dati dall'archivio analitico dell'account Azure Cosmos DB.

  1. Seleziona Aggiungi origine.

    Screenshot dell'opzione di menu Aggiungi origine.

  2. Nel campo Nome flusso di output immettere cosmos.

    Screenshot della denominazione del cosmo di origine appena creato.

  3. Nella sezione Tipo di origine selezionare Inline.

    Screenshot della selezione del tipo di origine inline.

  4. Nel campo Set di dati selezionare Azure - Azure Cosmos DB for NoSQL.

    Screenshot della selezione di Azure Cosmos DB per NoSQL come tipo di set di dati.

  5. Creare un nuovo servizio collegato per l'account denominato cosmoslinkedservice. Selezionare l'account Azure Cosmos DB for NoSQL esistente nella finestra di dialogo popup Nuovo servizio collegato e quindi scegliere OK. In questo esempio viene selezionato un account Azure Cosmos DB for NoSQL preesistente denominato msdocs-cosmos-source e un database denominato cosmicworks.

    Screenshot della finestra di dialogo Nuovo servizio collegato con un account Azure Cosmos DB selezionato.

  6. Selezionare Analitico per il tipo di archivio.

    Screenshot dell'opzione analitica selezionata per un servizio collegato.

  7. Selezionare la scheda Opzioni origine.

  8. In Opzioni origine selezionare il contenitore di destinazione e abilitare Debug del flusso di dati. In questo esempio il contenitore è denominato products.

    Screenshot di un contenitore di origine selezionato per i prodotti denominati.

  9. Selezionare Debug flusso di dati. Nella finestra di dialogo popup Attiva il debug del flusso di dati mantenere le opzioni predefinite e quindi scegliere OK.

    Screenshot dell'opzione Attiva/Disattiva per abilitare il debug del flusso di dati.

  10. La scheda Opzioni origine contiene anche altre opzioni che è possibile abilitare. Questa tabella descrive tali opzioni:

Opzione Descrizione
Acquisisci aggiornamenti intermedi Abilitare questa opzione se si vuole acquisire la cronologia delle modifiche apportate agli elementi, incluse le modifiche intermedie tra le letture di Change Data Capture.
Acquisisci eliminazioni Abilitare questa opzione per acquisire i record eliminati dall'utente e applicarli nel sink. Le eliminazioni non possono essere applicate in Esplora dati di Azure e nei sink di Azure Cosmos DB.
Acquisisci record TTL dell'archivio transazionale Abilitare questa opzione per acquisire i record TTL (Time-To-Live) eliminati dell'archivio transazionale di Azure Cosmos DB e applicarli al sink. Le eliminazioni dei record TTL non possono essere applicate a Esplora dati di Azure e ai sink di Azure Cosmos DB.
Dimensioni batch in byte Questa impostazione è infatti espressa in gigabyte. Specificare le dimensioni in gigabyte se si vuole raggruppare in batch i feed di Change Data Capture
Configurazioni aggiuntive Configurazioni aggiuntive dell'archivio analitico di Azure Cosmos DB e i relativi valori. (Ad esempio: spark.cosmos.allowWhiteSpaceInFieldNames -> true)

Utilizzo delle opzioni origine

Quando si seleziona una delle opzioni Capture intermediate updates, Capture Deltes e Capture Transactional store TTLs, il processo CDC creerà e popola il campo __usr_opType nel sink con i valori seguenti:

Valore Descrizione Opzione
1 UPDATE Acquisisci aggiornamenti intermedi
2 INSERT Non esiste un'opzione per gli inserimenti, è attivata per impostazione predefinita
3 USER_DELETE Acquisisci eliminazioni
4 TTL_DELETE Acquisisci record TTL dell'archivio transazionale

Se è necessario distinguere i record TTL eliminati dai documenti eliminati da utenti o applicazioni, è necessario selezionare entrambe le opzioni Capture intermediate updates e Capture Transactional store TTLs. È quindi necessario adattare i processi, le applicazioni o le query CDC per usare __usr_opType in base alle esigenze aziendali.

Suggerimento

Se è necessario che i consumer downstream ripristinino l'ordine degli aggiornamenti con l'opzione "Acquisisci aggiornamenti intermedi" selezionata, il campo _ts del timestamp di sistema può essere usato come campo di ordinamento.

Creare e configurare le impostazioni del sink per le operazioni di aggiornamento ed eliminazione

Prima di tutto, creare un sink di Archiviazione BLOB di Azure e quindi configurare il sink per filtrare i dati solo per operazioni specifiche.

  1. Se non se ne ha già uno, creare un account di Archiviazione BLOB di Azure e un contenitore. Per gli esempi successivi si userà un account denominato msdocsblobstorage e un contenitore denominato output.

    Suggerimento

    Se possibile, creare l'account di archiviazione nella stessa area in cui risiede l'account Azure Cosmos DB.

  2. Tornare ad Azure Data Factory e creare un nuovo sink per i dati delle modifiche acquisiti dall'origine cosmos.

    Screenshot dell'aggiunta di un nuovo sink connesso all'origine esistente.

  3. Assegnare al sink un nome univoco. In questo esempio il sink è denominato storage.

    Screenshot della denominazione dell'archiviazione sink appena creata.

  4. Nella sezione Tipo di sink selezionare Inline. Nel campo Set di dati selezionare Delta.

    Screenshot della selezione e del tipo di set di dati Delta inline per il sink.

  5. Creare un nuovo servizio collegato per l'account denominato storagelinkedservice usando Archiviazione BLOB di Azure. Selezionare l'account di archiviazione BLOB di Azure esistente nella finestra di dialogo popup Nuovo servizio collegato e quindi scegliere OK. In questo esempio viene selezionato un account di archiviazione BLOB di Azure preesistente denominato msdocsblobstorage.

    Screenshot delle opzioni del tipo di servizio per un nuovo servizio collegato Delta.

    Screenshot della finestra di dialogo Nuovo servizio collegato con un account Archiviazione BLOB di Azure selezionato.

  6. Seleziona la scheda Impostazioni.

  7. In Impostazioni impostare Percorso cartella sul nome del contenitore BLOB. In questo esempio il nome del contenitore è output.

    Screenshot del contenitore BLOB denominato output impostato come destinazione sink.

  8. Individuare la sezione Metodo aggiornamento e modificare le selezioni in modo da consentire solo le operazioni di eliminazione e aggiornamento. Specificare anche le Colonne chiave come Elenco di colonne utilizzando il campo {_rid} come identificatore univoco.

    Screenshot dei metodi di aggiornamento e delle colonne chiave specificati per il sink.

  9. Selezionare Convalida per assicurarsi che non siano presenti errori o omissioni. Selezionare quindi Pubblica per pubblicare il flusso di dati.

    Screenshot dell'opzione per convalidare e quindi pubblicare il flusso di dati corrente.

Pianificare l'esecuzione di Change Data Capture

Dopo la pubblicazione di un flusso di dati, è possibile aggiungere una nuova pipeline per spostare e trasformare i dati.

  1. Crea una nuova pipeline. Assegnare alla pipeline un nome univoco. In questo esempio la pipeline è denominata cosmoscdcpipeline.

    Screenshot della nuova opzione della pipeline all'interno della sezione resources.

  2. Nella sezione Attività espandere l'opzione Sposta e trasforma e quindi selezionare Flusso di dati.

    Screenshot dell'opzione attività flusso di dati all'interno della sezione attività.

  3. Assegnare un nome univoco all'attività del flusso di dati. In questo esempio l'attività è denominata cosmoscdcactivity.

  4. Nella scheda Impostazioni selezionare il flusso di dati denominato cosmoscdc creato in precedenza in questa guida. Selezionare quindi una dimensione di calcolo in base al volume dei dati e alla latenza necessaria per il carico di lavoro.

    Screenshot delle impostazioni di configurazione per il flusso di dati e le dimensioni di calcolo per l'attività.

    Suggerimento

    Per dimensioni incrementali dei dati maggiori di 100 GB, è consigliabile usare le dimensioni Personalizzate con un numero di core pari a 32 (+16 core driver).

  5. Selezionare + Aggiungi trigger. Pianificare l'esecuzione di questa pipeline alla cadenza appropriata per il carico di lavoro. In questo esempio la pipeline è configurata per l'esecuzione ogni cinque minuti.

    Screenshot del pulsante Aggiungi trigger per una nuova pipeline.

    Screenshot di una configurazione del trigger basata su una pianificazione, a partire dall'anno 2023, che viene eseguita ogni cinque minuti.

    Nota

    La finestra di ricorrenza minima per le esecuzioni di Change Data Capture è di un minuto.

  6. Selezionare Convalida per assicurarsi che non siano presenti errori o omissioni. Selezionare quindi Pubblica per pubblicare la pipeline.

  7. Osservare i dati inseriti nel contenitore di Archiviazione BLOB di Azure come output del flusso di dati usando Change Data Capture nell'archivio analitico di Azure Cosmos DB.

    Screnshot dei file di output dalla pipeline nel contenitore Archiviazione BLOB di Azure.

    Nota

    Il tempo di avvio iniziale del cluster può richiedere fino a tre minuti. Per evitare il tempo di avvio del cluster nelle esecuzioni successive di Change Data Capture, configurare il valore Durata (TTL) del cluster del flusso di dati. Per altre informazioni sul runtime di integrazione e la durata TTL, vedere Runtime di integrazione in Azure Data Factory.

Processi simultanei

Le dimensioni del batch nelle opzioni per l'origine o i casi in cui il sink è lento nell'inserimento del flusso delle modifiche, possono causare l'esecuzione di più processi contemporaneamente. Per evitare questa situazione, impostare l'opzione Concorrenza su 1 nelle impostazioni della pipeline per assicurarsi che non vengano attivate nuove esecuzioni fino al completamento dell'esecuzione corrente.

Passaggi successivi