다음을 통해 공유


Azure Cosmos DB for NoSQL 및 변경 피드 프로세서를 사용하는 Java 애플리케이션을 만드는 방법

적용 대상: NoSQL

Azure Cosmos DB는 Microsoft에서 제공하는 완전 관리형 NoSQL 데이터베이스 서비스입니다. 이를 통해 전역적으로 분산되고 확장성이 뛰어난 애플리케이션을 쉽게 빌드할 수 있습니다. 이 방법 가이드에서는 Azure Cosmos DB for NoSQL 데이터베이스를 사용하는 Java 애플리케이션을 만드는 프로세스를 안내하고 실시간 데이터 처리를 위해 변경 피드 프로세서를 구현합니다. Java 애플리케이션은 Azure Cosmos DB Java SDK v4를 사용하여 Azure Cosmos DB for NoSQL과 통신합니다.

Important

이 자습서는 Azure Cosmos DB Java SDK v4 전용입니다. 자세한 내용은 Azure Cosmos DB Java SDK v4 릴리스 정보, Maven 리포지토리, Azure Cosmos DB의 변경 피드 프로세서 및 Azure Cosmos DB Java SDK v4 문제 해결 가이드 를 참조하세요. 현재 v4보다 이전 버전을 사용하는 경우 v4로 업그레이드하는 데 도움이 필요하면 Azure Cosmos DB Java SDK v4로 마이그레이션 가이드를 참조하세요.

필수 조건

  • Azure Cosmos DB 계정: Azure Portal에서 만들거나 Azure Cosmos DB 에뮬레이터도 사용할 수 있습니다.

  • Java 개발 환경: 8개 이상의 버전으로 컴퓨터에 JDK(Java Development Kit)가 설치되어 있는지 확인합니다.

  • Azure Cosmos DB Java SDK V4: Azure Cosmos DB와 상호 작용하는 데 필요한 기능을 제공합니다.

배경

Azure Cosmos DB 변경 피드는 많은 용도로 사용되는 문서 삽입에 대한 응답으로 작업을 트리거하는 이벤트 기반 인터페이스를 제공합니다.

변경 피드 이벤트 관리 작업은 SDK에 내장된 변경 피드 프로세서 라이브러리에서 주로 처리됩니다. 이 라이브러리는 필요한 경우 다수의 작업자에게 변경 피드 이벤트를 배포할 수 있을 만큼 강력합니다. 변경 피드 라이브러리에 콜백을 제공하기만 하면 됩니다.

Java 애플리케이션의 간단한 예제는 Azure Cosmos DB 및 변경 피드 프로세서를 사용하여 실시간 데이터 처리를 보여 줍니다. 애플리케이션은 샘플 문서를 "피드 컨테이너"에 삽입하여 데이터 스트림을 시뮬레이션합니다. 피드 컨테이너에 바인딩된 변경 피드 프로세서는 들어오는 변경 내용을 처리하고 문서 콘텐츠를 기록합니다. 프로세서는 병렬 처리를 위한 임대를 자동으로 관리합니다.

소스 코드

SDK 예제 리포지토리를 복제하고 다음에서 이 예제를 찾을 수 있습니다 SampleChangeFeedProcessor.java.

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

연습

  1. ChangeFeedProcessorOptions Azure Cosmos DB 및 Azure Cosmos DB Java SDK V4를 사용하여 Java 애플리케이션에서 구성합니다. 데이터 ChangeFeedProcessorOptions 처리 중에 변경 피드 프로세서의 동작을 제어하기 위한 필수 설정을 제공합니다.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);
    
  2. 호스트 이름, 피드 컨테이너, 임대 컨테이너 및 데이터 처리 논리를 포함하여 관련 구성을 사용하여 ChangeFeedProcessor를 초기화합니다. start() 메서드는 데이터 처리를 시작하여 피드 컨테이너에서 들어오는 데이터 변경 내용을 동시에 실시간으로 처리할 수 있도록 합니다.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .options(options)
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. 메서드를 사용하여 들어오는 데이터 변경 내용을 처리하는 대리자를 지정합니다 handleChanges() . 이 메서드는 변경 피드에서 받은 JsonNode 문서를 처리합니다. 개발자는 변경 피드에서 제공한 JsonNode 문서를 처리하는 두 가지 옵션을 사용할 수 있습니다. 한 가지 옵션은 JsonNode 형식으로 문서에서 작업하는 것입니다. 이는 특히 모든 문서에 대한 단일 균일한 데이터 모델이 없는 경우에 적합합니다. 두 번째 옵션 - JsonNode와 구조가 동일한 POJO로 JsonNode를 변환합니다. 그런 다음 POJO에서 작동할 수 있습니다.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Java 애플리케이션을 빌드하고 실행합니다. 애플리케이션은 변경 피드 프로세서를 시작하고, 샘플 문서를 피드 컨테이너에 삽입하고, 들어오는 변경 내용을 처리합니다.

결론

이 가이드에서는 Azure Cosmos DB for NoSQL 데이터베이스를 사용하고 실시간 데이터 처리를 위해 변경 피드 프로세서를 사용하는 Azure Cosmos DB Java SDK V4를 사용하여 Java 애플리케이션을 만드는 방법을 알아보았습니다. 이 애플리케이션을 확장하여 더 복잡한 사용 사례를 처리하고 Azure Cosmos DB를 사용하여 강력하고 확장 가능하며 전역적으로 분산된 애플리케이션을 빌드할 수 있습니다.

추가 리소스

다음 단계

이제 다음 문서에서 변경 피드 예측 도구에 대해 자세히 알아볼 수 있습니다.