Compartir vía


Compatibilidad de Spring Cloud azure con Spring Cloud Stream

Este artículo se aplica a:✅ versión 4.19.0 ✅ versión 5.19.0

Spring Cloud Stream es un marco para crear microservicios basados en eventos altamente escalables conectados con sistemas de mensajería compartidos.

El marco proporciona un modelo de programación flexible basado en expresiones y procedimientos recomendados de Spring ya establecidos y conocidos. Estos procedimientos recomendados incluyen compatibilidad con la semántica persistente de pub/sub, grupos de consumidores y particiones con estado.

Las implementaciones actuales del enlazador incluyen:

Spring Cloud Stream Binder para Azure Event Hubs

Conceptos clave

Spring Cloud Stream Binder para Azure Event Hubs proporciona la implementación de enlace para el marco de Spring Cloud Stream. Esta implementación usa adaptadores de canal de Spring Integration Event Hubs en su base. Desde la perspectiva del diseño, Event Hubs es similar a Kafka. Además, se puede acceder a Event Hubs a través de la API de Kafka. Si el proyecto tiene una dependencia estrecha de la API de Kafka, puede probar Centro de eventos con la API de Kafka

Grupo de consumidores

Event Hubs proporciona compatibilidad similar del grupo de consumidores como Apache Kafka, pero con una lógica ligeramente diferente. Aunque Kafka almacena todos los desplazamientos confirmados en el agente, debe almacenar los desplazamientos de los mensajes de Event Hubs que se procesan manualmente. El SDK de Event Hubs proporciona la función para almacenar estos desplazamientos dentro de Azure Storage.

Compatibilidad con la creación de particiones

Event Hubs proporciona un concepto similar de partición física como Kafka. Pero a diferencia del reequilibrio automático de Kafka entre consumidores y particiones, Event Hubs proporciona un tipo de modo preferente. La cuenta de almacenamiento actúa como concesión para determinar qué consumidor posee la partición. Cuando se inicia un nuevo consumidor, intenta robar algunas particiones de los consumidores más cargados para lograr el equilibrio de carga de trabajo.

Para especificar la estrategia de equilibrio de carga, se proporcionan las propiedades de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*. Para obtener más información, consulte la sección Propiedades del consumidor.

Compatibilidad con consumidores de Batch

El enlazador de Azure Stream Event Hubs de Spring Cloud admite característica de consumidor de Batch de Spring Cloud Stream.

Para trabajar con el modo de consumidor por lotes, establezca la propiedad spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode en true. Cuando está habilitado, se recibe un mensaje con una carga de una lista de eventos por lotes y se pasa a la función Consumer. Cada encabezado de mensaje también se convierte en una lista, de la que el contenido es el valor de encabezado asociado analizado de cada evento. Los encabezados comunes del identificador de partición, el punto de control y las últimas propiedades en cola se presentan como un valor único porque todo el lote de eventos comparte el mismo valor. Para obtener más información, consulte la sección encabezados de mensaje de Event Hubs de compatibilidad de Azure de Spring Cloud con Spring Integration.

Nota

El encabezado de punto de control solo existe cuando se usa el modo de punto de comprobación de MANUAL.

El punto de comprobación del consumidor por lotes admite dos modos: BATCH y MANUAL. BATCH modo es un modo de punto de comprobación automático para controlar todo el lote de eventos juntos una vez que el enlazador los recibe. MANUAL modo consiste en controlar los eventos por parte de los usuarios. Cuando se usa, el Checkpointer se pasa al encabezado del mensaje y los usuarios pueden usarlo para realizar puntos de control.

Puede especificar el tamaño del lote estableciendo las propiedades max-size y max-wait-time que tienen un prefijo de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. La propiedad max-size es necesaria y la propiedad max-wait-time es opcional. Para obtener más información, consulte la sección Propiedades del consumidor.

Configuración de dependencias

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Como alternativa, también puede usar Spring Cloud Azure Stream Event Hubs Starter, como se muestra en el ejemplo siguiente de Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Configuración

El enlazador proporciona las tres partes siguientes de las opciones de configuración:

Propiedades de configuración de conexión

Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Event Hubs.

Nota

Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorizar el acceso con el identificador de Entra de Microsoft para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.

Propiedades configurables de conexión de spring-cloud-azure-stream-binder-eventhubs:

Propiedad Tipo Descripción
spring.cloud.azure.eventhubs.enabled booleano Si una instancia de Azure Event Hubs está habilitada.
spring.cloud.azure.eventhubs.connection-string Cuerda Valor de cadena de conexión del espacio de nombres de Event Hubs.
spring.cloud.azure.eventhubsespacio de nombres .namespace Cuerda Valor del espacio de nombres de Event Hubs, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Cuerda Nombre de dominio de un valor de espacio de nombres de Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Cuerda Dirección del punto de conexión personalizado.

Propina

Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el enlazador de Azure Stream Event Hubs de Spring Cloud. Las opciones de configuración admitidas se presentan en configuración de Azure de Spring Cloudy se pueden configurar con el prefijo unificado spring.cloud.azure. o el prefijo de spring.cloud.azure.eventhubs..

El enlazador también admite Spring Podría Azure Resource Manager de forma predeterminada. Para obtener información sobre cómo recuperar la cadena de conexión con entidades de seguridad que no se conceden con roles relacionados con , consulte la sección uso básico de de Spring Podría Azure Resource Manager.

Propiedades de configuración del punto de control

Esta sección contiene las opciones de configuración del servicio Storage Blobs, que se usa para conservar la propiedad de la partición y la información del punto de control.

Nota

Desde la versión 4.0.0, cuando la propiedad de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists no está habilitada manualmente, no se creará ningún contenedor de almacenamiento automáticamente con el nombre de spring.cloud.stream.bindings.binding-name.destination.

Propiedades configurables de puntos de control de spring-cloud-azure-stream-binder-eventhubs:

Propiedad Tipo Descripción
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleano Si quiere permitir la creación de contenedores si no existe.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Cuerda Nombre de la cuenta de almacenamiento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Cuerda Clave de acceso de la cuenta de almacenamiento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Cuerda Nombre del contenedor de almacenamiento.

Propina

Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el almacén de puntos de control de Blob de Storage. Las opciones de configuración admitidas se presentan en configuración de Azure de Spring Cloudy se pueden configurar con el prefijo unificado spring.cloud.azure. o el prefijo de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propiedades de configuración de enlace de Azure Event Hubs

Las siguientes opciones se dividen en cuatro secciones: Propiedades del consumidor, Configuraciones avanzadas de consumidor, Propiedades del productor y Configuraciones avanzadas de productor.

Propiedades del consumidor

Estas propiedades se exponen a través de EventHubsConsumerProperties.

Nota

Para evitar la repetición, dado que la versión 4.19.0 y 5.19.0, Spring Cloud Azure Stream Binder Event Hubs admite la configuración de valores para todos los canales, en el formato de spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Propiedades configurables del consumidor de spring-cloud-azure-stream-binder-eventhubs:

Propiedad Tipo Descripción
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Modo de punto de control usado cuando el consumidor decide cómo controlar el mensaje
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Entero Decide la cantidad de mensaje de cada partición para realizar un punto de control. Solo surtirá efecto cuando se use PARTITION_COUNT modo de punto de comprobación.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Duración Decide el intervalo de tiempo para realizar un punto de control. Solo surtirá efecto cuando se use TIME modo de punto de comprobación.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Entero Número máximo de eventos en un lote. Necesario para el modo de consumidor por lotes.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Duración Duración máxima de tiempo para el consumo por lotes. Solo surtirá efecto cuando el modo de consumidor por lotes esté habilitado y sea opcional.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Duración Duración del intervalo para la actualización.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy Estrategia de equilibrio de carga.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Duración Duración de tiempo después de la cual expira la propiedad de la partición.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Booleano Si el procesador de eventos debe solicitar información sobre el último evento en cola en su partición asociada y realizar un seguimiento de esa información a medida que se reciben los eventos.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Entero Recuento usado por el consumidor para controlar el número de eventos que el consumidor del centro de eventos recibirá y pondrá en cola de forma activa local.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Asignación con la clave como identificador de partición y valores de StartPositionProperties Mapa que contiene la posición del evento que se va a usar para cada partición si no existe un punto de control para la partición en el almacén de puntos de control. Este mapa se clave fuera del identificador de partición.

Nota

La configuración de initial-partition-event-position acepta un map para especificar la posición inicial de cada centro de eventos. Por lo tanto, su clave es el identificador de partición y el valor es de StartPositionProperties, que incluye propiedades de desplazamiento, número de secuencia, fecha y hora de puesta en cola y si incluyen. Por ejemplo, puede establecerlo como

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Configuración avanzada del consumidor

La conexión de anterior, punto de controly cliente común de Azure SDK la personalización de la configuración para cada consumidor de enlazador, que puede configurar con el prefijo spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Propiedades del productor

Estas propiedades se exponen a través de EventHubsProducerProperties.

Nota

Para evitar la repetición, dado que la versión 4.19.0 y 5.19.0, Spring Cloud Azure Stream Binder Event Hubs admite la configuración de valores para todos los canales, en el formato de spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Propiedades configurables del productor de spring-cloud-azure-stream-binder-eventhubs:

Propiedad Tipo Descripción
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync booleano Marca de conmutador para la sincronización del productor. Si es true, el productor esperará una respuesta después de una operación de envío.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout largo Cantidad de tiempo para esperar una respuesta después de una operación de envío. Solo surtirá efecto cuando un productor de sincronización esté habilitado.
Configuración avanzada del productor

El de conexión de anterior y cliente común de Azure SDK la configuración admite la personalización de cada productor de enlazador, que puede configurar con el prefijo .

Uso básico

Envío y recepción de mensajes desde y hacia Event Hubs

  1. Rellene las opciones de configuración con información de credenciales.

    • Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Nota

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.

  • Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Defina proveedor y consumidor.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Compatibilidad con la creación de particiones

Se crea un PartitionSupplier con información de partición proporcionada por el usuario para configurar la información de partición sobre el mensaje que se va a enviar. En el diagrama de flujo siguiente se muestra el proceso de obtención de diferentes prioridades para el identificador y la clave de partición:

Diagrama que muestra un diagrama de flujo del proceso de compatibilidad de creación de particiones.

Compatibilidad con consumidores de Batch

  1. Proporcione las opciones de configuración por lotes, como se muestra en el ejemplo siguiente:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Defina proveedor y consumidor.

    Para el modo de punto de comprobación como BATCH, puede usar el código siguiente para enviar mensajes y consumir en lotes.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Para el modo de punto de comprobación como MANUAL, puede usar el código siguiente para enviar mensajes y consumir o punto de control en lotes.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Nota

En el modo de consumo por lotes, el tipo de contenido predeterminado del enlazador de Spring Cloud Stream es application/json, por lo que asegúrese de que la carga del mensaje esté alineada con el tipo de contenido. Por ejemplo, al usar el tipo de contenido predeterminado de application/json para recibir mensajes con String carga, la carga debe estar JSON String, rodeada de comillas dobles para el texto de String original. Aunque para text/plain tipo de contenido, puede ser un objeto String directamente. Para obtener más información, consulte negociación de tipos de contenido de Spring Cloud Stream.

Control de mensajes de error

  • Control de mensajes de error de enlace de salida

    De forma predeterminada, Spring Integration crea un canal de error global denominado errorChannel. Configure el siguiente punto de conexión de mensaje para controlar los mensajes de error de enlace de salida.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Control de mensajes de error de enlace de entrada

    Spring Cloud Stream Event Hubs Binder admite una solución para controlar los errores de los enlaces de mensajes entrantes: controladores de errores.

    controlador de errores:

    Spring Cloud Stream expone un mecanismo para proporcionar un controlador de errores personalizado agregando un Consumer que acepta instancias de ErrorMessage. Para obtener más información, consulte Controlar mensajes de error en la documentación de Spring Cloud Stream.

    • Controlador de errores predeterminado de enlace

      Configure un único Consumer bean para consumir todos los mensajes de error de enlace entrantes. La siguiente función predeterminada se suscribe a cada canal de error de enlace entrante:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      También debe establecer la propiedad spring.cloud.stream.default.error-handler-definition en el nombre de la función.

    • Controlador de errores específico del enlace

      Configure un Consumer bean para consumir los mensajes de error de enlace de entrada específicos. La siguiente función se suscribe al canal de error de enlace de entrada específico y tiene una prioridad más alta que el controlador de errores predeterminado de enlace:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      También debe establecer la propiedad spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition en el nombre de la función.

Encabezados de mensaje de Event Hubs

Para conocer los encabezados de mensaje básicos admitidos, consulte la sección encabezados de mensaje de Event Hubs de compatibilidad de Azure de Spring Cloud con Spring Integration.

Compatibilidad con varios enlazador

También se admite la conexión a varios espacios de nombres de Event Hubs mediante varios enlazadores. En este ejemplo se toma una cadena de conexión como ejemplo. También se admiten credenciales de entidades de servicio e identidades administradas. Puede establecer propiedades relacionadas en la configuración del entorno de cada enlazador.

  1. Para usar varios enlazadores con Event Hubs, configure las siguientes propiedades en el archivo application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nota

    El archivo de aplicación anterior muestra cómo configurar un único sondeo predeterminado para la aplicación en todos los enlaces. Si desea configurar el sondeo para un enlace específico, puede usar una configuración como spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Necesitamos definir dos proveedores y dos consumidores:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Aprovisionamiento de recursos

El enlazador de Event Hubs admite el aprovisionamiento del centro de eventos y el grupo de consumidores, los usuarios podrían usar las siguientes propiedades para habilitar el aprovisionamiento.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Nota

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.

Muestras

Para más información, consulte el repositorio de azure-spring-boot-samples en GitHub.

Spring Cloud Stream Binder para Azure Service Bus

Conceptos clave

Spring Cloud Stream Binder para Azure Service Bus proporciona la implementación de enlace para Spring Cloud Stream Framework. Esta implementación usa adaptadores de canal de Spring Integration Service Bus en su base.

Mensaje programado

Este enlazador admite el envío de mensajes a un tema para el procesamiento retrasado. Los usuarios pueden enviar mensajes programados con encabezado x-delay expresar en milisegundos un tiempo de retraso para el mensaje. El mensaje se entregará a los temas respectivos después de x-delay milisegundos.

Grupo de consumidores

El tema de Service Bus proporciona compatibilidad similar al grupo de consumidores como Apache Kafka, pero con una lógica ligeramente diferente. Este enlazador se basa en Subscription de un tema para actuar como un grupo de consumidores.

Configuración de dependencias

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Como alternativa, también puede usar Spring Cloud Azure Stream Service Bus Starter, como se muestra en el ejemplo siguiente de Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Configuración

El enlazador proporciona las dos partes siguientes de las opciones de configuración:

Propiedades de configuración de conexión

Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Service Bus.

Nota

Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorizar el acceso con el identificador de Entra de Microsoft para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.

Propiedades configurables de conexión de spring-cloud-azure-stream-binder-servicebus:

Propiedad Tipo Descripción
spring.cloud.azure.servicebus.enabled booleano Indica si una instancia de Azure Service Bus está habilitada.
spring.cloud.azure.servicebus.connection-string Cuerda Valor de cadena de conexión del espacio de nombres de Service Bus.
spring.cloud.azure.servicebus.custom-endpoint-address Cuerda Dirección del punto de conexión personalizado que se va a usar al conectarse a Service Bus.
espacio de nombres spring.cloud.azure.servicebus.namespace Cuerda Valor del espacio de nombres de Service Bus, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Cuerda Nombre de dominio de un valor de espacio de nombres de Azure Service Bus.

Nota

Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el enlazador de Spring Cloud Azure Stream Service Bus. Las opciones de configuración admitidas se presentan en configuración de Azure de Spring Cloudy se pueden configurar con el prefijo unificado spring.cloud.azure. o el prefijo de spring.cloud.azure.servicebus..

El enlazador también admite Spring Podría Azure Resource Manager de forma predeterminada. Para obtener información sobre cómo recuperar la cadena de conexión con entidades de seguridad que no se conceden con roles relacionados con , consulte la sección uso básico de de Spring Podría Azure Resource Manager.

Propiedades de configuración de enlace de Azure Service Bus

Las siguientes opciones se dividen en cuatro secciones: Propiedades del consumidor, Configuraciones avanzadas de consumidor, Propiedades del productor y Configuraciones avanzadas de productor.

Propiedades del consumidor

Estas propiedades se exponen a través de ServiceBusConsumerProperties.

Nota

Para evitar la repetición, dado que la versión 4.19.0 y 5.19.0, Spring Cloud Azure Stream Binder Service Bus admite valores de configuración para todos los canales, en el formato de spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Propiedades configurables del consumidor de spring-cloud-azure-stream-binder-servicebus:

Propiedad Tipo Predeterminado Descripción
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected booleano falso Si los mensajes con errores se enrutan a dlq.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Entero 1 Número máximo de mensajes simultáneos que debe procesar el cliente del procesador de Service Bus. Cuando la sesión está habilitada, se aplica a cada sesión.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Entero nulo Número máximo de sesiones simultáneas que se van a procesar en un momento dado.
spring.cloud.stream.servicebus.bindings.binding-name.consumerhabilitado para .session Booleano nulo Indica si la sesión está habilitada.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Entero 0 Recuento de capturas previas del cliente de procesador de Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue SubQueue ninguno Tipo de la sub cola a la que se va a conectar.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Duración 5 m Cantidad de tiempo para continuar la renovación automática del bloqueo.
spring.cloud.stream.servicebus.bindings.binding-name.consumermodo de recepción ServiceBusReceiveMode peek_lock Modo de recepción del cliente de procesador de Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.autocompletar Booleano verdadero Si se deben liquidar los mensajes automáticamente. Si se establece como false, se agregará un encabezado de mensaje de Checkpointer para permitir a los desarrolladores liquidar los mensajes manualmente.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes long 1024 Tamaño máximo de la cola o tema en megabytes, que es el tamaño de la memoria asignada para la cola o tema.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Duración P10675199DT2H48M5.4775807S. (10675199 días, 2 horas, 48 minutos, 5 segundos y 477 milisegundos) Duración después de la cual expira el mensaje, comenzando desde cuándo se envía el mensaje a Service Bus.

Importante

Al usar el azure Resource Manager (ARM), debe configurar la propiedad spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type. Para obtener más información, consulte el ejemplo servicebus-queue-binder-arm en GitHub.

Configuración avanzada del consumidor

Los de conexión anteriores y cliente común de Azure SDK la configuración admiten la personalización de cada consumidor del enlazador, que puede configurar con el prefijo .

Propiedades del productor

Estas propiedades se exponen a través de ServiceBusProducerProperties.

Nota

Para evitar la repetición, dado que la versión 4.19.0 y 5.19.0, Spring Cloud Azure Stream Binder Service Bus admite valores de configuración para todos los canales, en el formato de spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Propiedades configurables del productor de spring-cloud-azure-stream-binder-servicebus:

Propiedad Tipo Predeterminado Descripción
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync booleano falso Marca switch para la sincronización del productor.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout largo 10000 Valor de tiempo de espera para el envío del productor.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType nulo Tipo de entidad de Service Bus del productor, necesario para el productor de enlace.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes long 1024 Tamaño máximo de la cola o tema en megabytes, que es el tamaño de la memoria asignada para la cola o tema.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Duración P10675199DT2H48M5.4775807S. (10675199 días, 2 horas, 48 minutos, 5 segundos y 477 milisegundos) Duración después de la cual expira el mensaje, comenzando desde cuándo se envía el mensaje a Service Bus.

Importante

Al usar el productor de enlaces, se requiere configurar la propiedad de spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type.

Configuración avanzada del productor

El de conexión de anterior y cliente común de Azure SDK la configuración admite la personalización de cada productor de enlazador, que puede configurar con el prefijo .

Uso básico

Envío y recepción de mensajes desde o a Service Bus

  1. Rellene las opciones de configuración con información de credenciales.

    • Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Nota

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.

  • Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Defina proveedor y consumidor.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Compatibilidad con claves de partición

El enlazador admite la creación de particiones de Service Bus, ya que permite establecer la clave de partición y el identificador de sesión en el encabezado del mensaje. En esta sección se presenta cómo establecer la clave de partición para los mensajes.

Spring Cloud Stream proporciona una propiedad de expresión spEL de clave de partición spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Por ejemplo, establezca esta propiedad como "'partitionKey-' + headers[<message-header-key>]" y agregue un encabezado denominado message-header-key. Spring Cloud Stream usa el valor de este encabezado al evaluar la expresión para asignar una clave de partición. El código siguiente proporciona un productor de ejemplo:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Compatibilidad con sesiones

El enlazador admite sesiones de mensajes de Service Bus. El identificador de sesión de un mensaje se puede establecer a través del encabezado del mensaje.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Nota

Según creación de particiones de Service Bus, el identificador de sesión tiene una prioridad más alta que la clave de partición. Por lo tanto, cuando se establecen los encabezados ServiceBusMessageHeaders#SESSION_ID y ServiceBusMessageHeaders#PARTITION_KEY, el valor del identificador de sesión se usa finalmente para sobrescribir el valor de la clave de partición.

Control de mensajes de error

  • Control de mensajes de error de enlace de salida

    De forma predeterminada, Spring Integration crea un canal de error global denominado errorChannel. Configure el siguiente punto de conexión de mensaje para controlar el mensaje de error de enlace de salida.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Control de mensajes de error de enlace de entrada

    Spring Cloud Stream Service Bus Binder admite dos soluciones para controlar los errores de los enlaces de mensajes entrantes: el controlador de errores del enlazador y los controladores.

    controlador de errores del enlazador:

    El controlador de errores del enlazador predeterminado controla el enlace de entrada. Este controlador se usa para enviar mensajes con errores a la cola de mensajes fallidos cuando se habilita spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected. De lo contrario, los mensajes con error se abandonan. El controlador de errores del enlazador se excluye mutuamente con otros controladores de errores proporcionados.

    controlador de errores:

    Spring Cloud Stream expone un mecanismo para proporcionar un controlador de errores personalizado agregando un Consumer que acepta instancias de ErrorMessage. Para obtener más información, consulte Controlar mensajes de error en la documentación de Spring Cloud Stream.

    • Controlador de errores predeterminado de enlace

      Configure un único Consumer bean para consumir todos los mensajes de error de enlace entrantes. La siguiente función predeterminada se suscribe a cada canal de error de enlace entrante:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      También debe establecer la propiedad spring.cloud.stream.default.error-handler-definition en el nombre de la función.

    • Controlador de errores específico del enlace

      Configure un Consumer bean para consumir los mensajes de error de enlace de entrada específicos. La función siguiente se suscribe al canal de error de enlace de entrada específico con una prioridad más alta que el controlador de errores predeterminado de enlace.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      También debe establecer la propiedad spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition en el nombre de la función.

Encabezados de mensaje de Service Bus

Para conocer los encabezados de mensaje básicos admitidos, consulte la sección encabezados de mensaje de Service Bus de compatibilidad de Spring Cloud con Azure para Spring Integration.

Nota

Al establecer la clave de partición, la prioridad del encabezado del mensaje es mayor que la propiedad Spring Cloud Stream. Por lo tanto, spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression surte efecto solo cuando no se configura ninguno de los encabezados ServiceBusMessageHeaders#SESSION_ID y ServiceBusMessageHeaders#PARTITION_KEY.

Compatibilidad con varios enlazador

También se admite la conexión a varios espacios de nombres de Service Bus mediante varios enlazadores. En este ejemplo se toma la cadena de conexión como ejemplo. También se admiten credenciales de entidades de servicio e identidades administradas, los usuarios pueden establecer propiedades relacionadas en la configuración del entorno de cada enlazador.

  1. Para usar varios enlazadores de ServiceBus, configure las siguientes propiedades en el archivo application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nota

    El archivo de aplicación anterior muestra cómo configurar un único sondeo predeterminado para la aplicación en todos los enlaces. Si desea configurar el sondeo para un enlace específico, puede usar una configuración como spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. necesitamos definir dos proveedores y dos consumidores

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Aprovisionamiento de recursos

El enlazador de Service Bus admite el aprovisionamiento de colas, temas y suscripciones, los usuarios podrían usar las siguientes propiedades para habilitar el aprovisionamiento.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Nota

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.

Personalización de las propiedades del cliente de Service Bus

Los desarrolladores pueden usar AzureServiceClientBuilderCustomizer para personalizar las propiedades del cliente de Service Bus. En el ejemplo siguiente se personaliza la propiedad sessionIdleTimeout en ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Muestras

Para más información, consulte el repositorio de azure-spring-boot-samples en GitHub.