開始使用 Azure Cosmos DB 分析存放區中的異動資料擷取
適用於:NoSQL MongoDB
使用 Azure Cosmos DB 分析存放區中的異動資料擷取 (CDC) 作為 Azure Data Factory 或 Azure Synapse Analytics 的來源,以擷取資料的特定變更。
注意
請注意,還無法在資料流程上使用適用於 Azure Cosmos DB for MongoDB API 的連結服務介面。 不過,除非支援 Mongo 連結服務,否則您將能夠搭配使用帳戶的文件端點與 "Azure Cosmos DB for NoSQL" 連結服務介面,以作為因應措施。 在 NoSQL 連結服務上,選擇 [手動輸入] 以提供 Cosmos DB 帳戶資訊,並使用帳戶的文件端點 (例如:https://[your-database-account-uri].documents.azure.com:443/
),而不是 MongoDB 端點 (例如:mongodb://[your-database-account-uri].mongo.cosmos.azure.com:10255/
)
必要條件
- 現有的 Azure Cosmos DB 帳戶。
- 如果您有 Azure 訂用帳戶,則請建立新的帳戶。
- 如尚未擁有 Azure 訂用帳戶,請在開始之前先建立免費帳戶。
- 或者,您可以在認可之前免費試用 Azure Cosmos DB。
啟用分析存放區
首先,在帳戶層級啟用 Azure Synapse Link,然後針對適合您工作負載的容器啟用分析存放區。
啟用 Azure Synapse Link:啟用適用於 Azure Cosmos DB 帳戶的 Azure Synapse Link
啟用容器的分析存放區:
選項 指南 針對特定新容器啟用 為新容器啟用 Azure Synapse Link 針對特定現有容器啟用 為現有容器啟用 Azure Synapse Link
使用資料流程建立目標 Azure 資源
分析存放區的異動資料擷取功能可以透過 Azure Data Factory 或 Azure Synapse Analytics 的資料流程功能取得。 在此指南中,使用 Azure Data Factory。
重要
您也可以使用 Azure Synapse Analytics。 首先,如果您還沒有 Azure Synapse 工作區,則請建立 Azure Synapse 工作區。 在新建立的工作區內,選取 [開發] 索引標籤,並選取 [新增資源],然後選取 [資料流程]。
如果您還沒有 Azure Data Factory,則請建立 Azure Data Factory。
提示
可能的話,請在 Azure Cosmos DB 帳戶所在的相同區域中建立資料處理站。
啟動新建立的資料處理站。
在資料處理站中,選取 [資料流程] 索引標籤,然後選取 [新增資料流程]。
提供新建立資料流程的唯一名稱。 在此範例中,資料流程命名為
cosmoscdc
。
設定分析存放區容器的來源設定
現在建立和設定來源,以從 Azure Cosmos DB 帳戶的分析存放區流動資料。
選取新增來源。
在 [輸出資料流名稱] 欄位中,輸入 cosmos。
在 [來源類型] 區段中,選取 [內嵌]。
在 [資料集] 欄位中,選取 [Azure - Azure Cosmos DB for NoSQL]。
為您的帳戶建立名為 cosmoslinkedservice 的新連結服務。 在 [新增連結服務] 快顯對話方塊中選取現有的 Azure Cosmos DB for NoSQL 帳戶,然後選取 [確定]。 在此範例中,我們選取預先存在且名為
msdocs-cosmos-source
的 Azure Cosmos DB for NoSQL 帳戶,以及名為cosmicworks
的資料庫。針對存放區類型,選取 [分析]。
選取 [來源選項] 索引標籤。
在 [來源選項] 內,選取您的目標容器,然後啟用 [資料流程偵錯]。 在此範例中,容器命名為
products
。選取 [資料流程偵錯]。 在 [開啟資料流程偵錯] 快顯對話方塊中,保留預設選項,然後選取 [確定]。
[來源選項] 索引標籤也包含您可能想要啟用的其他選項。 此表描述這些選項:
選項 | 描述 |
---|---|
擷取中繼更新 | 如果您想要擷取項目的變更歷程記錄 (包括異動資料擷取讀取之間的中繼變更),則請啟用此選項。 |
擷取刪除 | 啟用此選項以擷取使用者已刪除記錄,並將其套用至接收器。 無法在 Azure 資料總管和 Azure Cosmos DB 接收器上套用刪除。 |
擷取交易存放區 TTL | 啟用此選項以擷取 Azure Cosmos DB 交易存放區 (存留時間) TTL 已刪除記錄,並套用至接收器。 無法在 Azure 資料總管和 Azure Cosmos DB 接收器上套用 TTL 刪除。 |
Batchsize (以位元組為單位) | 此設定實際上是 GB。 如果您想要批次處理異動資料擷取摘要,則請以 GB 為單位來指定大小 |
額外設定 | 額外 Azure Cosmos DB 分析存放區設定和其值。 (例如:spark.cosmos.allowWhiteSpaceInFieldNames -> true ) |
使用來源選項
當您核取任一 Capture intermediate updates
、Capture Deltes
和 Capture Transactional store TTLs
選項時,CDC 程序將會使用下列值來建立和填入接收中的 __usr_opType
欄位:
值 | Description | 選項 |
---|---|---|
1 | UPDATE | 擷取中繼更新 |
2 | INSERT | 插入沒有選項,預設為開啟 |
3 | USER_DELETE | 擷取刪除 |
4 | TTL_DELETE | 擷取交易存放區 TTL |
如果您必須區分 TTL 已刪除記錄與使用者或應用程式所刪除的文件,則請檢查 Capture intermediate updates
和 Capture Transactional store TTLs
選項。 然後,您必須適應 CDC 程序或應用程式或是查詢,以根據您的業務需求所需內容來使用 __usr_opType
。
提示
如果需要下游取用者在核取「擷取中繼更新」選項來還原更新順序,則系統時間戳記 _ts
欄位可以用作排序欄位。
建立與設定更新和刪除作業的接收器設定
首先,建立直接 Azure Blob 儲存體接收器,然後將接收器設定為只篩選特定作業的資料。
建立 Azure Blob 儲存體帳戶和容器 (如果您還沒有的話)。 在接下來的範例中,我們將使用名為
msdocsblobstorage
的帳戶以及名為output
的容器。提示
可能的話,請在 Azure Cosmos DB 帳戶所在的相同區域中建立儲存體帳戶。
回到 Azure Data Factory,針對從
cosmos
來源所擷取的異動資料建立新的接收器。提供接收器的唯一名稱。 在此範例中,接收器命名為
storage
。在 [接收器類型] 區段中,選取 [內嵌]。 在 [資料集] 欄位中,選取 [差異]。
使用 Azure Blob 儲存體,為您的帳戶建立名為 storagelinkedservice 的新連結服務。 在 [新增連結服務] 快顯對話方塊中選取現有的 Azure Blob 儲存體帳戶,然後選取 [確定]。 在此範例中,我們選取預先存在且名為
msdocsblobstorage
的 Azure Blob 儲存體帳戶。選取 [設定] 索引標籤。
在 [設定] 內,將 [資料夾路徑] 設定為 Blob 容器的名稱。 在此範例中,容器名稱為
output
。找出 [Update 方法] 區段,然後將選取範圍變更為只允許 delete 和 update 作業。 此外,使用欄位
{_rid}
作為唯一識別碼,將 [索引鍵資料行] 指定為 [資料行清單]。選取 [驗證],以確保您尚未發生任何錯誤或遺漏。 然後,選取 [發佈] 以發佈資料流程。
排程異動資料擷取執行
發佈資料流程之後,您可以新增管線來移動和轉換資料。
建立新管線。 提供管線的唯一名稱。 在此範例中,管線命名為
cosmoscdcpipeline
。在 [活動] 區段中,展開 [移動和轉換] 選項,然後選取 [資料流程]。
提供資料流程活動的唯一名稱。 在此範例中,活動命名為
cosmoscdcactivity
。在 [設定] 索引標籤中,選取您稍早在本指南中建立且名為
cosmoscdc
的資料流程。 然後,根據工作負載的資料量和所需延遲,來選取計算大小。提示
針對大於 100 GB 的增量資料大小,建議使用具有 32 個核心計數 (+16 個驅動程式核心) 的「自訂」大小。
選取 [新增觸發程序]。 排程此管線以適合您工作負載的步調來執行。 在此範例中,管線設定成每五分鐘執行一次。
注意
異動資料擷取執行的最低週期性時間範圍是一分鐘。
選取 [驗證],以確保您尚未發生任何錯誤或遺漏。 然後,選取 [發佈] 以發佈管線。
使用 Azure Cosmos DB 分析存放區異動資料擷取,來觀察放入 Azure Blob 儲存體容器中作為資料流程輸出的資料。
注意
初始叢集啟動時間最多可能需要三分鐘的時間。 若要避免後續異動資料擷取執行中的叢集啟動時間,請設定資料流程叢集「存留時間」值。 如需整合執行階段和 TTL 的詳細資訊,請參閱 Azure Data Factory 中的整合執行階段。
並行作業
來源選項中的批次大小,或是接收器緩慢擷取變更資料流的情況,可能會導致同時執行多個作業。 若要避免這種情況,請在 [管線] 設定中將 [並行] 選項設定為 1,以確保在目前執行完成之前不會觸發新的執行。