Compartir a través de


Spring Cloud Soporte técnico de Azure for Spring Cloud Stream

Este artículo se aplica a: ✔️ Versión 4.14.0 ✔️ versión 5.8.0

Spring Cloud Stream es un marco para crear microservicios basados en eventos altamente escalables conectados con sistemas de mensajería compartidos.

El marco proporciona un modelo de programación flexible basado en expresiones y procedimientos recomendados de Spring ya establecidos y conocidos. Estos procedimientos recomendados incluyen compatibilidad con la semántica persistente de pub/sub, grupos de consumidores y particiones con estado.

Las implementaciones actuales del enlazador incluyen:

Spring Cloud Stream Binder para Azure Event Hubs

Conceptos clave

Spring Cloud Stream Binder para Azure Event Hubs proporciona la implementación de enlace para el marco de Spring Cloud Stream. Esta implementación usa adaptadores de canal de Spring Integration Event Hubs en su base. Desde la perspectiva del diseño, Event Hubs es similar a Kafka. Además, se puede acceder a Event Hubs a través de la API de Kafka. Si el proyecto tiene una dependencia estrecha en la API de Kafka, puede probar el centro de eventos con el ejemplo de API de Kafka.

Grupo de consumidores

Event Hubs proporciona compatibilidad similar del grupo de consumidores como Apache Kafka, pero con una lógica ligeramente diferente. Aunque Kafka almacena todos los desplazamientos confirmados en el agente, debe almacenar los desplazamientos de los mensajes de Event Hubs que se procesan manualmente. El SDK de Event Hubs proporciona la función para almacenar estos desplazamientos dentro de Azure Storage.

Compatibilidad con particiones

Event Hubs proporciona un concepto similar de partición física como Kafka. Pero a diferencia del reequilibrio automático de Kafka entre consumidores y particiones, Event Hubs proporciona un tipo de modo preferente. La cuenta de almacenamiento actúa como concesión para determinar qué consumidor posee la partición. Cuando se inicia un nuevo consumidor, intenta robar algunas particiones de los consumidores más cargados para lograr el equilibrio de carga de trabajo.

Para especificar la estrategia de equilibrio de carga, se proporcionan las propiedades de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* . Para obtener más información, consulte la sección Propiedades del consumidor.

Compatibilidad con consumidores de Batch

El enlazador de Event Hubs de Azure Stream de Spring Cloud admite la característica consumidor de Batch de Spring Cloud Stream.

Para trabajar con el modo de consumidor por lotes, establezca la spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode propiedad trueen . Cuando está habilitado, se recibe un mensaje con una carga de una lista de eventos por lotes y se pasa a la Consumer función . Cada encabezado de mensaje también se convierte en una lista, de la que el contenido es el valor de encabezado asociado analizado de cada evento. Los encabezados comunes del identificador de partición, el punto de control y las últimas propiedades en cola se presentan como un valor único porque todo el lote de eventos comparte el mismo valor. Para obtener más información, consulte la sección Encabezados de mensajes de Event Hubs de Spring Cloud Soporte técnico de Azure for Spring Integration.

Nota:

El encabezado de punto de control solo existe cuando se usa el modo de MANUAL punto de control.

El punto de comprobación del consumidor por lotes admite dos modos: BATCH y MANUAL. BATCH mode es un modo de control automático para controlar todo el lote de eventos una vez que el enlazador los recibe. MANUAL el modo consiste en controlar los eventos por parte de los usuarios. Cuando se usa, Checkpointer se pasa al encabezado del mensaje y los usuarios pueden usarlo para realizar puntos de control.

Puede especificar el tamaño del lote estableciendo las max-size propiedades y max-wait-time que tienen un prefijo de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. La max-size propiedad es necesaria y la max-wait-time propiedad es opcional. Para obtener más información, consulte la sección Propiedades del consumidor.

Configuración de dependencias

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Como alternativa, también puede usar Spring Cloud Azure Stream Event Hubs Starter, como se muestra en el ejemplo siguiente de Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Configuración

El enlazador proporciona las tres partes siguientes de las opciones de configuración:

propiedades de configuración de Conectar ion

Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Event Hubs.

Nota:

Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorización del acceso con el identificador de Microsoft Entra para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.

Conectar las propiedades configurables de spring-cloud-azure-stream-binder-eventhubs:

Propiedad Tipo Descripción
spring.cloud.azure.eventhubs.enabled boolean Si una instancia de Azure Event Hubs está habilitada.
spring.cloud.azure.eventhubs.connection-string Cadena Espacio de nombres de Event Hubs cadena de conexión valor.
spring.cloud.azure.eventhubs.namespace Cadena Valor del espacio de nombres de Event Hubs, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Cadena Nombre de dominio de un valor de espacio de nombres de Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Cadena Dirección del punto de conexión personalizado.

Sugerencia

Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el enlazador de Azure Stream Event Hubs de Spring Cloud. Las opciones de configuración admitidas se presentan en la configuración de Azure de Spring Cloud y se pueden configurar con el prefijo spring.cloud.azure. unificado o el prefijo de spring.cloud.azure.eventhubs..

El enlazador también admite Spring Could Azure Resource Manager de forma predeterminada. Para obtener información sobre cómo recuperar el cadena de conexión con entidades de seguridad que no se conceden con Data roles relacionados, consulte la sección Uso básico de Spring Could Azure Resource Manager.

Propiedades de configuración del punto de control

Esta sección contiene las opciones de configuración del servicio Storage Blobs, que se usa para conservar la propiedad de la partición y la información del punto de control.

Nota:

Desde la versión 4.0.0, cuando la propiedad de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists no está habilitada manualmente, no se creará ningún contenedor de almacenamiento automáticamente con el nombre de spring.cloud.stream.bindings.binding-name.destination.

Propiedades configurables de puntos de control de spring-cloud-azure-stream-binder-eventhubs:

Propiedad Tipo Descripción
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleano Si quiere permitir la creación de contenedores si no existe.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Cadena Nombre de la cuenta de almacenamiento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Cadena Clave de acceso de la cuenta de almacenamiento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Cadena Nombre del contenedor de almacenamiento.

Sugerencia

Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el almacén de puntos de control de Blob de Storage. Las opciones de configuración admitidas se presentan en la configuración de Azure de Spring Cloud y se pueden configurar con el prefijo spring.cloud.azure. unificado o el prefijo de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propiedades de configuración de enlace de Azure Event Hubs

Las siguientes opciones se dividen en cuatro secciones: Propiedades del consumidor, Configuraciones avanzadas de consumidor, Propiedades del productor y Configuraciones avanzadas de productor.

Propiedades del consumidor

Estas propiedades se exponen a través de EventHubsConsumerProperties.

Propiedades configurables del consumidor de spring-cloud-azure-stream-binder-eventhubs:

Propiedad Tipo Descripción
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Modo de punto de control usado cuando el consumidor decide cómo controlar el mensaje
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Entero Decide la cantidad de mensaje de cada partición para realizar un punto de control. Solo surtirá efecto cuando PARTITION_COUNT se use el modo de punto de control.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Duration Decide el intervalo de tiempo para realizar un punto de control. Solo surtirá efecto cuando TIME se use el modo de punto de control.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Entero Número máximo de eventos en un lote. Necesario para el modo de consumidor por lotes.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Duration Duración máxima de tiempo para el consumo por lotes. Solo surtirá efecto cuando el modo de consumidor por lotes esté habilitado y sea opcional.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Duration Duración del intervalo para la actualización.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy Estrategia de equilibrio de carga.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Duration Duración de tiempo después de la cual expira la propiedad de la partición.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Booleano Si el procesador de eventos debe solicitar información sobre el último evento en cola en su partición asociada y realizar un seguimiento de esa información a medida que se reciben los eventos.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Entero Recuento usado por el consumidor para controlar el número de eventos que el consumidor del centro de eventos recibirá y pondrá en cola de forma activa local.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Asignación con la clave como identificador de partición y valores de StartPositionProperties Mapa que contiene la posición del evento que se va a usar para cada partición si no existe un punto de control para la partición en el almacén de puntos de control. Este mapa se clave fuera del identificador de partición.

Nota:

La initial-partition-event-position configuración acepta para map especificar la posición inicial de cada centro de eventos. Por lo tanto, su clave es el identificador de partición, y el valor es de StartPositionProperties , que incluye propiedades de desplazamiento, número de secuencia, fecha y hora de puesta en cola y si son inclusivos. Por ejemplo, puede establecerlo como

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Configuración avanzada del consumidor

La conexión anterior, el punto de control y la configuración común del cliente del SDK de Azure admiten la personalización de cada consumidor de enlazador, que puede configurar con el prefijo spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Propiedades del productor

Estas propiedades se exponen a través de EventHubsProducerProperties.

Propiedades configurables del productor de spring-cloud-azure-stream-binder-eventhubs:

Propiedad Tipo Descripción
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync boolean Marca de conmutador para la sincronización del productor. Si es true, el productor esperará una respuesta después de una operación de envío.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout long Cantidad de tiempo para esperar una respuesta después de una operación de envío. Solo surtirá efecto cuando un productor de sincronización esté habilitado.
Configuración avanzada del productor

La conexión anterior y la configuración de cliente común del SDK de Azure admiten la personalización de cada productor de enlazador, que puede configurar con el prefijo spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Uso básico

Envío y recepción de mensajes desde y hacia Event Hubs

  1. Rellene las opciones de configuración con información de credenciales.

    • Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml :

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Nota:

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.

  • Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml :

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Defina proveedor y consumidor.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Compatibilidad con particiones

Se crea un PartitionSupplier objeto con información de partición proporcionada por el usuario para configurar la información de partición sobre el mensaje que se va a enviar. En el diagrama de flujo siguiente se muestra el proceso de obtención de diferentes prioridades para el identificador y la clave de partición:

Diagram showing a flowchart of the partitioning support process.

Compatibilidad con consumidores de Batch

  1. Proporcione las opciones de configuración por lotes, como se muestra en el ejemplo siguiente:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Defina proveedor y consumidor.

    Para el modo de punto de control como BATCH, puede usar el código siguiente para enviar mensajes y consumir en lotes.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Para el modo de punto de comprobación como MANUAL, puede usar el código siguiente para enviar mensajes y consumir o punto de control en lotes.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Nota:

En el modo de consumo por lotes, el tipo de contenido predeterminado del enlazador de Spring Cloud Stream es application/json, por lo que asegúrese de que la carga del mensaje esté alineada con el tipo de contenido. Por ejemplo, cuando se usa el tipo de contenido predeterminado de application/json para recibir mensajes con String carga, la carga debe estar JSON Stringrodeada de comillas dobles para el texto original String . Mientras que para text/plain el tipo de contenido, puede ser un String objeto directamente. Para más información, consulte Negociación de tipos de contenido de Spring Cloud Stream.

Control de los mensajes de error

  • Control de mensajes de error de enlace de salida

    De forma predeterminada, Spring Integration crea un canal de error global denominado errorChannel. Configure el siguiente punto de conexión de mensaje para controlar los mensajes de error de enlace salientes:

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Control de mensajes de error de enlace de entrada

    Spring Cloud Stream Event Hubs Binder admite dos soluciones para controlar los errores de los enlaces de mensajes entrantes: canales de error y controladores personalizados.

    Canal de error:

    Spring Cloud Stream proporciona un canal de error para cada enlace entrante. ErrorMessage Se envía al canal de error. Para más información, consulte Control de errores en la documentación de Spring Cloud Stream.

    • Canal de error predeterminado

      Puede usar un canal de error global denominado errorChannel para consumir todos los mensajes de error de enlace entrantes. Para controlar estos mensajes, configure el siguiente punto de conexión de mensaje:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Canal de error específico del enlace

      Puede usar un canal de error específico para consumir los mensajes de error de enlace entrante específicos con una prioridad más alta que el canal de error predeterminado. Para controlar estos mensajes, configure el siguiente punto de conexión de mensaje:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Nota:

      El canal de error específico del enlace se excluye mutuamente con otros controladores de errores y canales proporcionados.

    Controlador de errores:

    Spring Cloud Stream expone un mecanismo para proporcionar un controlador de errores personalizado mediante la adición de un Consumer que acepta ErrorMessage instancias. Para obtener más información, consulte Control de errores en la documentación de Spring Cloud Stream.

    Nota:

    Cuando se configura cualquier controlador de errores de enlace, puede funcionar con el canal de error predeterminado.

    • Controlador de errores predeterminado de enlace

      Configure un único Consumer bean para consumir todos los mensajes de error de enlace entrantes. La siguiente función predeterminada se suscribe a cada canal de error de enlace entrante:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      También debe establecer la spring.cloud.stream.default.error-handler-definition propiedad en el nombre de la función.

    • Controlador de errores específico del enlace

      Configure un Consumer bean para consumir los mensajes de error de enlace de entrada específicos. La siguiente función se suscribe al canal de error de enlace de entrada específico y tiene una prioridad más alta que el controlador de errores predeterminado de enlace:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      También debe establecer la spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition propiedad en el nombre de la función.

Encabezados de mensaje de Event Hubs

Para conocer los encabezados de mensaje básicos admitidos, consulte la sección Encabezados de mensaje de Event Hubs de Spring Cloud Soporte técnico de Azure for Spring Integration.

Compatibilidad con varios enlazador

Conectar ion a varios espacios de nombres de Event Hubs también se admite mediante varios enlazadores. En este ejemplo se toma un cadena de conexión como ejemplo. También se admiten credenciales de entidades de servicio e identidades administradas. Puede establecer propiedades relacionadas en la configuración del entorno de cada enlazador.

  1. Para usar varios enlazadores con Event Hubs, configure las siguientes propiedades en el archivo application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nota:

    El archivo de aplicación anterior muestra cómo configurar un único sondeo predeterminado para la aplicación en todos los enlaces. Si desea configurar el sondeo para un enlace específico, puede usar una configuración como spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Necesitamos definir dos proveedores y dos consumidores:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Aprovisionamiento de recursos

El enlazador de Event Hubs admite el aprovisionamiento del centro de eventos y el grupo de consumidores, los usuarios podrían usar las siguientes propiedades para habilitar el aprovisionamiento.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Nota:

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.

Ejemplos

Para más información, consulte el repositorio azure-spring-boot-samples en GitHub.

Spring Cloud Stream Binder para Azure Service Bus

Conceptos clave

Spring Cloud Stream Binder para Azure Service Bus proporciona la implementación de enlace para Spring Cloud Stream Framework. Esta implementación usa adaptadores de canal de Spring Integration Service Bus en su base.

Mensaje programado

Este enlazador admite el envío de mensajes a un tema para el procesamiento retrasado. Los usuarios pueden enviar mensajes programados con encabezado x-delay que expresa en milisegundos un tiempo de retraso para el mensaje. El mensaje se entregará a los temas respectivos después x-delay de milisegundos.

Grupo de consumidores

El tema de Service Bus proporciona compatibilidad similar al grupo de consumidores como Apache Kafka, pero con una lógica ligeramente diferente. Este enlazador se basa en Subscription un tema para actuar como un grupo de consumidores.

Configuración de dependencias

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Como alternativa, también puede usar Spring Cloud Azure Stream Service Bus Starter, como se muestra en el ejemplo siguiente de Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Configuración

El enlazador proporciona las dos partes siguientes de las opciones de configuración:

propiedades de configuración de Conectar ion

Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Service Bus.

Nota:

Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorización del acceso con el identificador de Microsoft Entra para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.

Conectar las propiedades configurables de spring-cloud-azure-stream-binder-servicebus:

Propiedad Tipo Descripción
spring.cloud.azure.servicebus.enabled boolean Indica si una instancia de Azure Service Bus está habilitada.
spring.cloud.azure.servicebus.connection-string Cadena Espacio de nombres de Service Bus cadena de conexión valor.
espacio de nombres spring.cloud.azure.servicebus.namespace Cadena Valor del espacio de nombres de Service Bus, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Cadena Nombre de dominio de un valor de espacio de nombres de Azure Service Bus.

Nota:

Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el enlazador de Spring Cloud Azure Stream Service Bus. Las opciones de configuración admitidas se presentan en la configuración de Azure de Spring Cloud y se pueden configurar con el prefijo spring.cloud.azure. unificado o el prefijo de spring.cloud.azure.servicebus..

El enlazador también admite Spring Could Azure Resource Manager de forma predeterminada. Para obtener información sobre cómo recuperar el cadena de conexión con entidades de seguridad que no se conceden con Data roles relacionados, consulte la sección Uso básico de Spring Could Azure Resource Manager.

Propiedades de configuración de enlace de Azure Service Bus

Las siguientes opciones se dividen en cuatro secciones: Propiedades del consumidor, Configuraciones avanzadas de consumidor, Propiedades del productor y Configuraciones avanzadas de productor.

Propiedades del consumidor

Estas propiedades se exponen a través de ServiceBusConsumerProperties.

Propiedades configurables del consumidor de spring-cloud-azure-stream-binder-servicebus:

Propiedad Tipo Valor predeterminado Descripción
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected boolean false Si los mensajes con errores se enrutan a dlq.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Entero 1 Número máximo de mensajes simultáneos que debe procesar el cliente del procesador de Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Entero nulo Número máximo de sesiones simultáneas que se van a procesar en un momento dado.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Booleano nulo Indica si la sesión está habilitada.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Entero 0 Recuento de capturas previas del cliente de procesador de Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue SubQueue None Tipo de la sub cola a la que se va a conectar.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Duration 5 m Cantidad de tiempo para continuar la renovación automática del bloqueo.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Modo de recepción del cliente de procesador de Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.autocompletar Boolean true Si se deben liquidar los mensajes automáticamente. Si se establece como false, se agregará un encabezado de mensaje de Checkpointer para permitir a los desarrolladores liquidar los mensajes manualmente.
Configuración avanzada del consumidor

La conexión anterior y la configuración de cliente común del SDK de Azure admiten la personalización de cada consumidor de enlazador, que puede configurar con el prefijo spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Propiedades del productor

Estas propiedades se exponen a través de ServiceBusProducerProperties.

Propiedades configurables del productor de spring-cloud-azure-stream-binder-servicebus:

Propiedad Tipo Valor predeterminado Descripción
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync boolean false Marca switch para la sincronización del productor.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout long 10 000 Valor de tiempo de espera para el envío del productor.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType nulo Tipo de entidad de Service Bus del productor, necesario para el productor de enlace.

Importante

Al usar el productor de enlace, se requiere configurar la propiedad de spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type .

Configuración avanzada del productor

La conexión anterior y la configuración de cliente común del SDK de Azure admiten la personalización de cada productor de enlazador, que puede configurar con el prefijo spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Uso básico

Envío y recepción de mensajes desde o a Service Bus

  1. Rellene las opciones de configuración con información de credenciales.

    • Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml :

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Nota:

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.

  • Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml :

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Defina proveedor y consumidor.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Compatibilidad con claves de partición

El enlazador admite la creación de particiones de Service Bus al permitir establecer la clave de partición y el identificador de sesión en el encabezado del mensaje. En esta sección se presenta cómo establecer la clave de partición para los mensajes.

Spring Cloud Stream proporciona una propiedad spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionde expresión spEL de clave de partición . Por ejemplo, establezca esta propiedad como "'partitionKey-' + headers[<message-header-key>]" y agregue un encabezado denominado message-header-key. Spring Cloud Stream usa el valor de este encabezado al evaluar la expresión para asignar una clave de partición. El código siguiente proporciona un productor de ejemplo:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Compatibilidad con sesiones

El enlazador admite sesiones de mensajes de Service Bus. El identificador de sesión de un mensaje se puede establecer a través del encabezado del mensaje.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Nota:

Según la creación de particiones de Service Bus, el identificador de sesión tiene mayor prioridad que la clave de partición. Por lo tanto, cuando se establecen los ServiceBusMessageHeaders#SESSION_ID encabezados y ServiceBusMessageHeaders#PARTITION_KEY , el valor del identificador de sesión se usa finalmente para sobrescribir el valor de la clave de partición.

Control de los mensajes de error

  • Control de mensajes de error de enlace de salida

    De forma predeterminada, Spring Integration crea un canal de error global denominado errorChannel. Configure el siguiente punto de conexión de mensaje para controlar el mensaje de error de enlace de salida.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Control de mensajes de error de enlace de entrada

    Spring Cloud Stream Service Bus Binder admite tres soluciones para controlar los errores de los enlaces de mensajes entrantes: el controlador de errores del enlazador, los canales de error personalizados y los controladores.

    Controlador de errores del enlazador:

    El controlador de errores del enlazador predeterminado controla el enlace de entrada. Este controlador se usa para enviar mensajes con errores a la cola de mensajes fallidos cuando spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected está habilitado. De lo contrario, los mensajes con error se abandonan. Excepto para configurar el canal de error específico del enlace, el controlador de errores del enlazador siempre surte efecto independientemente de si hay otros controladores de errores personalizados o canales.

    Canal de error:

    Spring Cloud Stream proporciona un canal de error para cada enlace entrante. ErrorMessage Se envía al canal de error. Para más información, consulte Control de errores en la documentación de Spring Cloud Stream.

    • Canal de error predeterminado

      Puede usar un canal de error global denominado errorChannel para consumir todos los mensajes de error de enlace entrantes. Para controlar estos mensajes, configure el siguiente punto de conexión de mensaje:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Canal de error específico del enlace

      Puede usar un canal de error específico para consumir los mensajes de error de enlace entrante específicos con una prioridad más alta que el canal de error predeterminado. Para controlar estos mensajes, configure el siguiente punto de conexión de mensaje:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Nota:

      El canal de error específico del enlace se excluye mutuamente con otros controladores de errores y canales proporcionados.

    Controlador de errores:

    Spring Cloud Stream expone un mecanismo para proporcionar un controlador de errores personalizado mediante la adición de un Consumer que acepta ErrorMessage instancias. Para obtener más información, consulte Control de errores en la documentación de Spring Cloud Stream.

    Nota:

    Cuando se configura cualquier controlador de errores de enlace, puede funcionar con el canal de error predeterminado y el controlador de errores del enlazador.

    • Controlador de errores predeterminado de enlace

      Configure un único Consumer bean para consumir todos los mensajes de error de enlace entrantes. La siguiente función predeterminada se suscribe a cada canal de error de enlace entrante:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      También debe establecer la spring.cloud.stream.default.error-handler-definition propiedad en el nombre de la función.

    • Controlador de errores específico del enlace

      Configure un Consumer bean para consumir los mensajes de error de enlace de entrada específicos. La función siguiente se suscribe al canal de error de enlace de entrada específico con una prioridad más alta que el controlador de errores predeterminado de enlace.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      También debe establecer la spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition propiedad en el nombre de la función.

Encabezados de mensaje de Service Bus

Para ver los encabezados de mensaje básicos admitidos, consulte la sección Encabezados de mensaje de Service Bus de Spring Cloud Soporte técnico de Azure for Spring Integration.

Nota:

Al establecer la clave de partición, la prioridad del encabezado del mensaje es mayor que la propiedad Spring Cloud Stream. Por lo tanto spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression , solo surte efecto cuando no se configura ninguno de los ServiceBusMessageHeaders#SESSION_ID encabezados y ServiceBusMessageHeaders#PARTITION_KEY .

Compatibilidad con varios enlazador

Conectar ion a varios espacios de nombres de Service Bus también se admite mediante el uso de varios enlazadores. Este ejemplo toma cadena de conexión como ejemplo. También se admiten credenciales de entidades de servicio e identidades administradas, los usuarios pueden establecer propiedades relacionadas en la configuración del entorno de cada enlazador.

  1. Para usar varios enlazadores de ServiceBus, configure las siguientes propiedades en el archivo application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nota:

    El archivo de aplicación anterior muestra cómo configurar un único sondeo predeterminado para la aplicación en todos los enlaces. Si desea configurar el sondeo para un enlace específico, puede usar una configuración como spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. necesitamos definir dos proveedores y dos consumidores

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Aprovisionamiento de recursos

El enlazador de Service Bus admite el aprovisionamiento de colas, temas y suscripciones, los usuarios podrían usar las siguientes propiedades para habilitar el aprovisionamiento.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Nota:

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.

Ejemplos

Para más información, consulte el repositorio azure-spring-boot-samples en GitHub.