Compartir a través de


Spring Cloud Stream con Azure Event Hubs

En este tutorial se muestra cómo enviar y recibir mensajes mediante Azure Event Hubs y Spring Cloud Stream Binder EventHubs en una aplicación de Spring Boot.

Requisitos previos

Nota:

Para conceder a su cuenta acceso a los recursos, en Azure Event Hubs, asigne el rol Azure Event Hubs Data Receiver y Azure Event Hubs Data Sender a la cuenta de Microsoft Entra que está usando actualmente. A continuación, en la cuenta de Azure Storage, asigne el rol Storage Blob Data Contributor a la cuenta de Microsoft Entra que está usando actualmente. Para obtener más información sobre cómo conceder roles de acceso, consulte Asignación de roles de Azure mediante Azure Portal y Autorización del acceso a recursos de Event Hubs mediante Microsoft Entra ID.

Importante

Se necesita Spring Boot versión 2.5 o superior para completar los pasos descritos en este tutorial.

Envío y recepción de mensajes de Azure Event Hubs

Con una cuenta de Azure Storage y un centro de eventos de Azure, puede enviar y recibir mensajes mediante Spring Cloud Azure Stream Binder Event Hubs.

Para instalar el módulo Spring Cloud Azure Stream Binder Event Hubs, agregue las siguientes dependencias al archivo pom.xml:

  • La lista de materiales (BOM) de Azure de Spring Cloud:

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.18.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Nota:

    Si usa Spring Boot 2.x, asegúrese de establecer la versión de spring-cloud-azure-dependencies en 4.19.0. Esta lista de materiales (BOM) debe configurarse en la sección <dependencyManagement> del archivo pom.xml. Esto garantiza que todas las dependencias de Spring Cloud Azure usen la misma versión. Para obtener más información sobre la versión que se usa para esta lista de materiales, consulte Qué versión de Spring Cloud Azure debo usar.

  • El artefacto de Spring Cloud Azure Stream Binder Event Hubs:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

Incorporación del código de la aplicación

Siga estos pasos para configurar la aplicación para generar y consumir mensajes mediante Azure Event Hubs.

  1. Configure las credenciales del centro de eventos agregando las siguientes propiedades al archivo application.properties.

     spring.cloud.azure.eventhubs.namespace=${AZURE_EVENTHUBS_NAMESPACE}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=${AZURE_STORAGE_ACCOUNT_NAME}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=${AZURE_STORAGE_CONTAINER_NAME}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.bindings.consume-in-0.group=${AZURE_EVENTHUB_CONSUMER_GROUP}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.initial-delay=0
     spring.cloud.stream.poller.fixed-delay=1000
    

    En la siguiente tabla se describen los campos de la configuración:

    Campo Descripción
    spring.cloud.azure.eventhubs.namespace Especifique el espacio de nombres que obtuvo en el centro de eventos desde Azure Portal.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Especifique la cuenta de almacenamiento que creó en este tutorial.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Especifique el contenedor de la cuenta de almacenamiento.
    spring.cloud.stream.bindings.consume-in-0.destination Especifique el centro de eventos que usó en este tutorial.
    spring.cloud.stream.bindings.consume-in-0.group Especifique los grupos de consumidores de la instancia de Event Hubs.
    spring.cloud.stream.bindings.supply-out-0.destination Especifique el mismo centro de eventos que usó en este tutorial.
    spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode Especifique MANUAL.
    spring.cloud.function.definition Especifique el bean funcional que se va a enlazar a los destinos externos expuestos por los enlaces.
    spring.cloud.stream.poller.initial-delay Especifique el retraso inicial para los desencadenadores periódicos. El valor predeterminado es 0.
    spring.cloud.stream.poller.fixed-delay Especifique un retraso fijo para el sondeador predeterminado en milisegundos. El valor predeterminado es 1000 L.
  2. Edite el archivo de clase de inicio para mostrar el siguiente contenido.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class EventHubBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
                        +"time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
                checkpointer.success()
                            .doOnSuccess(success->LOGGER.info("Message '{}' successfully checkpointed",
                                message.getPayload()))
                            .doOnError(error->LOGGER.error("Exception found", error))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to sendMessage.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Sugerencia

    En este tutorial, no hay ninguna operación de autenticación en las configuraciones ni en el código. Sin embargo, la conexión a los servicios de Azure requiere autenticación. Para completar la autenticación, debe usar Azure Identity. Spring Cloud Azure usa DefaultAzureCredential, que la biblioteca de Azure Identity proporciona para ayudarle a obtener credenciales sin cambios en el código.

    DefaultAzureCredential admite varios métodos de autenticación y determina qué método se usa en tiempo de ejecución. Este enfoque permite que la aplicación use diferentes métodos de autenticación en distintos entornos, como entornos locales o de producción, sin implementar código específico del entorno. Para obtener más información, vea DefaultAzureCredential.

    Para completar la autenticación en entornos de desarrollo local, puede usar la CLI de Azure, Visual Studio Code, PowerShell u otros métodos. Para obtener más información, consulte Autenticación de Azure en entornos de desarrollo de Java. Para completar la autenticación en entornos de hospedaje de Azure, se recomienda usar la identidad administrada asignada por el usuario. Para obtener más información, consulta ¿Qué son las identidades administradas para recursos de Azure?

  3. Inicie la aplicación. Los mensajes como este se publicarán en el registro de la aplicación, como se muestra en la salida del ejemplo siguiente:

    New message received: 'Hello World', partition key: 107207233, sequence number: 458, offset: 94256, enqueued time: 2023-02-17T08:27:59.641Z
    Message 'Hello World!' successfully checkpointed
    

Implementación en Azure Spring Apps

Ahora que tiene la aplicación Spring Boot que se ejecuta localmente, es el momento de moverla a producción. Azure Spring Apps facilita la implementación de aplicaciones de Spring Boot en Azure sin necesidad de realizar cambios en el código. El servicio administra la infraestructura de las aplicaciones de Spring, con el fin de que los desarrolladores puedan centrarse en el código. Azure Spring Apps proporciona administración del ciclo de vida mediante el uso de una supervisión y un diagnóstico completos, administración de la configuración, detección de servicios, integración de CI/CD e implementaciones azul-verde, entre otros. Para implementar la aplicación en Azure Spring Apps, consulte Implementación de la primera aplicación en Azure Spring Apps.

Pasos siguientes