Compartir vía


Cómo crear una aplicación de Java que utiliza Azure Cosmos DB for NoSQL y el procesador de fuente de cambios

SE APLICA A: NoSQL

Azure Cosmos DB es un servicio de base de datos NoSQL totalmente administrado proporcionado por Microsoft. Permite crear aplicaciones de gran escalabilidad y distribuidas globalmente. Esta guía paso a paso le guía por el proceso de creación de una aplicación Java que usa la base de datos de Azure Cosmos DB for NoSQL e implementa el procesador de fuente de cambios para el procesamiento de datos en tiempo real. La aplicación de Java se comunica con Azure Cosmos DB for NoSQL mediante el SDK de Azure Cosmos DB para Java v4.

Importante

Este tutorial solo es para el SDK de Azure Cosmos DB para Java v4. Consulte las notas de la versión del SDK de Azure Cosmos DB para Java v4, el repositorio de Maven, el procesador de fuente de cambios en Azure Cosmos DB y la guía de solución de problemas del SDK de Azure Cosmos DB para Java v4 para obtener más información. Si en la actualidad usa una versión anterior a v4, vea la guía Migración al SDK de Azure Cosmos DB para Java v4 a fin de obtener ayuda para actualizar a v4.

Requisitos previos

  • Cuenta de Azure Cosmos DB: puede crearla desde Azure Portal o también puede usar el emulador de Azure Cosmos DB.

  • Entorno de desarrollo de Java: asegúrese de que tiene instalado el kit de desarrollo de Java (JDK) en la máquina con, al menos, la versión 8.

  • SDK de Azure Cosmos DB para Java V4: proporciona las características necesarias para interactuar con Azure Cosmos DB.

Fondo

La fuente de cambios de Azure Cosmos DB proporciona una interfaz orientada a eventos para desencadenar acciones en respuesta a la inserción de documentos. Esta herramienta tiene muchos usos.

La biblioteca de procesadores de fuente de cambios integrada en el SDK se ocupa en gran medida del trabajo de administración de eventos de la fuente de cambios. Esta biblioteca es lo suficientemente eficaz como para distribuir eventos de la fuente de cambios entre varios trabajos, si así se desea. Todo lo que tiene que hacer es proporcionar una devolución de llamada a la biblioteca de fuentes de cambios.

Este sencillo ejemplo de aplicación Java muestra el procesamiento de datos en tiempo real con Azure Cosmos DB y el procesador de fuente de cambios. La aplicación inserta documentos de ejemplo en un "contenedor de fuentes" para simular un flujo de datos. El procesador de fuente de cambios, enlazado al contenedor de fuentes, procesa los cambios entrantes y registra el contenido del documento. El procesador administra automáticamente las concesiones para el procesamiento paralelo.

Código fuente

Puede clonar el repositorio de ejemplo del SDK y encontrar este ejemplo en SampleChangeFeedProcessor.java:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Tutorial

  1. Configure ChangeFeedProcessorOptions en una aplicación Java mediante Azure Cosmos DB y el SDK de Azure Cosmos DB para Java V4. El ChangeFeedProcessorOptions proporciona una configuración esencial para controlar el comportamiento del procesador de fuente de cambios durante el procesamiento de datos.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);
    
  2. Inicialice ChangeFeedProcessor con las configuraciones pertinentes, incluidos el nombre del host, el contenedor de alimentación, el contenedor de arrendamiento y la lógica de tratamiento de datos. El método start() inicia el procesamiento de los datos, lo que permite el procesamiento concurrente y en tiempo real de los cambios de datos entrantes procedentes del contenedor de alimentación.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .options(options)
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. Especifique que el delegado controla los cambios de datos entrantes mediante el método handleChanges(). El método procesa los documentos JsonNode recibidos de la fuente de cambios. Como desarrollador, tiene dos opciones para controlar el documento JsonNode proporcionado por fuente de cambios. Una opción es operar en el documento en forma de JsonNode. Esto es una opción excelente, en especial, si no tiene un único modelo de datos uniforme para todos los documentos. La segunda opción: transforme JsonNode en un POJO que tenga la misma estructura que JsonNode. A continuación, puede operar en el POJO.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Compile y ejecute la aplicación Java. La aplicación inicia el procesador de fuente de cambios, inserta documentos de ejemplo en el contenedor de fuentes y procesa los cambios entrantes.

Conclusión

En esta guía, ha aprendido a crear una aplicación Java mediante el SDK de Azure Cosmos DB para Java V4, que usa la base de datos de Azure Cosmos DB for NoSQL y el procesador de fuente de cambios para el procesamiento de datos en tiempo real. Puede ampliar esta aplicación para controlar casos de uso más complejos y crear aplicaciones sólidas, escalables y distribuidas globalmente mediante Azure Cosmos DB.

Recursos adicionales

Pasos siguientes

Puede obtener más información sobre el estimador de la fuente de cambios en los siguientes artículos: