如何建立 Java 應用程式,以使用 Azure Cosmos DB for NoSQL 和變更摘要處理器
適用於:NoSQL
Azure Cosmos DB 是 Microsoft 提供的完全受控 NoSQL 資料庫。 這可讓您輕鬆建置全域散發和高可調整性的應用程式。 本操作指南會逐步引導您完成建立 JAVA 應用程式的流程,該應用程式會使用 Azure Cosmos DB for NoSQL 資料庫,並實作變更摘要處理器以進行即時資料處理。 Java 應用程式會使用 Azure Cosmos DB Java SDK v4 來與 Azure Cosmos DB for NoSQL 通訊。
重要
本教學課程僅適用於 Azure Cosmos DB Java SDK v4。 如需詳細資訊,請檢視 Azure Cosmos DB JAVA SDK v4 版本資訊、Maven 存放庫、Azure Cosmos DB 中的變更摘要流程和 Azure Cosmos DB JAVA SDK v4 疑難排解指南。 如果您目前使用的版本比 v4 舊,請參閱遷移至 Azure Cosmos DB Java SDK v4 指南,以取得升級至 v4 的協助。
必要條件
Azure Cosmos DB 帳戶:您也可以從 Azure 入口網站建立帳戶,或者也可以使用 Azure Cosmos DB 模擬器。
JAVA 開發環境:請確定您的機器上已安裝至少 8 版本的 JAVA 開發套件 (JDK)。
Azure Cosmos DB JAVA SDK V4:提供與 Azure Cosmos DB 互動的必要功能。
背景
Azure Cosmos DB 變更摘要會提供事件驅動介面,以觸發動作來回應有許多用途的文件插入。
管理變更摘要事件的工作,主要是由 SDK 內建的變更摘要處理器程式庫負責處理。 此程式庫的功能強大,足以將變更摘要事件分散於多個背景工作角色 (如有需要的話)。 您只需要對變更摘要程式庫提供回呼。
這個簡單的 JAVA 應用程式範例示範使用 Azure Cosmos DB 和變更摘要處理器進行即時資料處理。 應用程式會將範例文件插入「摘要容器」,以模擬資料流。 繫結至摘要容器的變更摘要處理器,會處理傳入的變更並記錄文件內容。 處理器會自動管理用於平行處理的租用時間。
原始程式碼
您可以複製 SDK 範例存放庫,並在 SampleChangeFeedProcessor.java
中找到此範例:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/
逐步解說
使用 Azure Cosmos DB 和 Azure Cosmos DB Java SDK V4 在 JAVA 應用程式中設定
ChangeFeedProcessorOptions
。ChangeFeedProcessorOptions
提供基本設定來控制資料處理期間變更摘要處理器的行為。options = new ChangeFeedProcessorOptions(); options.setStartFromBeginning(false); options.setLeasePrefix("myChangeFeedDeploymentUnit"); options.setFeedPollDelay(Duration.ofSeconds(5)); options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);
使用相關的設定初始化 ChangeFeedProcessor ,包括主機名稱、摘要容器、租用容器和資料處理邏輯。 start() 方法會開始資料處理,以便從摘要容器同時和即時處理傳入的資料變更。
logger.info("Start Change Feed Processor on worker (handles changes asynchronously)"); ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder() .hostName("SampleHost_1") .feedContainer(feedContainer) .leaseContainer(leaseContainer) .handleChanges(handleChanges()) .options(options) .buildChangeFeedProcessor(); changeFeedProcessorInstance.start() .subscribeOn(Schedulers.boundedElastic()) .subscribe();
使用
handleChanges()
方法指定委派處理傳入資料變更。 方法會處理來自變更摘要的已接收 JsonNode 文件。 身為開發人員,您有兩個選項可用來處理變更摘要提供給您的 JsonNode 文件。 其中一個選項是以 JsonNode 形式操作文件。 這非常棒,特別是如果您所有文件沒有單一整齊的資料模型。 第二個選項 - 將 JsonNode 轉換成與 JsonNode 具有相同結構的 POJO。 然後,您可以在 POJO 上操作。private static Consumer<List<JsonNode>> handleChanges() { return (List<JsonNode> docs) -> { logger.info("Start handleChanges()"); for (JsonNode document : docs) { try { //Change Feed hands the document to you in the form of a JsonNode //As a developer you have two options for handling the JsonNode document provided to you by Change Feed //One option is to operate on the document in the form of a JsonNode, as shown below. This is great //especially if you do not have a single uniform data model for all documents. logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter() .writeValueAsString(document)); //You can also transform the JsonNode to a POJO having the same structure as the JsonNode, //as shown below. Then you can operate on the POJO. CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class); logger.info("id: " + pojo_doc.getId()); } catch (JsonProcessingException e) { e.printStackTrace(); } } isWorkCompleted = true; logger.info("End handleChanges()"); }; }
建置和執行 JAVA 應用程式。 應用程式會啟動變更摘要處理器、將範例文件插入摘要容器,以及處理傳入的變更。
推論
在本指南中,您將了解如何使用 Azure Cosmos DB JAVA SDK V4 建立 Java 應用程式,該應用程式會使用 Azure Cosmos DB for NoSQL 資料庫,並使用變更摘要處理器以進行即時資料處理。 您可以擴充此應用程式來處理更複雜的使用案例,並使用 Azure Cosmos DB 建置健全、可調整且全域散發的應用程式。
其他資源
下一步
您現在可以在下列文章中繼續深入了解變更摘要估算器: