Compartir vía


Compatibilidad de Spring Cloud con Spring Integration

Este artículo se aplica a:✅ versión 4.19.0 ✅ versión 5.19.0

La extensión de integración de Spring para Azure proporciona adaptadores de integración de Spring para los distintos servicios proporcionados por el SDK de Azure para Java. Proporcionamos compatibilidad con Spring Integration para estos servicios de Azure: Event Hubs, Service Bus, Storage Queue. A continuación se muestra una lista de adaptadores admitidos:

Integración de Spring con Azure Event Hubs

Conceptos clave

Azure Event Hubs es una plataforma de streaming de macrodatos y un servicio de ingesta de eventos. Puede recibir y procesar millones de eventos por segundo. Los datos enviados a un centro de eventos se pueden transformar y almacenar mediante cualquier proveedor de análisis en tiempo real o adaptadores de procesamiento por lotes o almacenamiento.

Spring Integration permite la mensajería ligera dentro de las aplicaciones basadas en Spring y admite la integración con sistemas externos a través de adaptadores declarativos. Esos adaptadores proporcionan un nivel superior de abstracción a través de la compatibilidad de Spring con la comunicación remota, la mensajería y la programación. El proyecto de extensión Spring Integration for Event Hubs proporciona adaptadores de canal entrantes y salientes y puertas de enlace para Azure Event Hubs.

Nota

Las API de soporte técnico de RxJava se quitan de la versión 4.0.0. Consulte Javadoc para obtener más información.

Grupo de consumidores

Event Hubs proporciona compatibilidad similar del grupo de consumidores como Apache Kafka, pero con una lógica ligeramente diferente. Aunque Kafka almacena todos los desplazamientos confirmados en el agente, debe almacenar los desplazamientos de los mensajes de Event Hubs que se procesan manualmente. El SDK de Event Hubs proporciona la función para almacenar estos desplazamientos dentro de Azure Storage.

Compatibilidad con la creación de particiones

Event Hubs proporciona un concepto similar de partición física como Kafka. Pero a diferencia del reequilibrio automático de Kafka entre consumidores y particiones, Event Hubs proporciona un tipo de modo preferente. La cuenta de almacenamiento actúa como concesión para determinar qué partición es propiedad del consumidor. Cuando se inicia un nuevo consumidor, intentará robar algunas particiones de los consumidores más cargados para lograr el equilibrio de carga de trabajo.

Para especificar la estrategia de equilibrio de carga, los desarrolladores pueden usar EventHubsContainerProperties para la configuración. Consulte la sección siguiente para obtener un ejemplo de cómo configurar EventHubsContainerProperties.

Compatibilidad con consumidores de Batch

El EventHubsInboundChannelAdapter admite el modo de consumo por lotes. Para habilitarlo, los usuarios pueden especificar el modo de escucha como ListenerMode.BATCH al construir una instancia de EventHubsInboundChannelAdapter. Cuando está habilitada, se recibirá una mensaje de la que la carga es una lista de eventos por lotes y se pasará al canal de bajada. Cada encabezado de mensaje también se convierte como una lista, de la que el contenido es el valor de encabezado asociado analizado de cada evento. Para los encabezados comunes del identificador de partición, el punto de control y las últimas propiedades en cola, se presentan como un valor único para todo el lote de eventos que comparten el mismo. Para obtener más información, consulte la sección encabezados de mensajes de Event Hubs.

Nota

El encabezado de punto de control solo existe cuando se usa modo manual punto de control.

El punto de comprobación del consumidor por lotes admite dos modos: BATCH y MANUAL. BATCH modo es un modo de punto de comprobación automático para controlar todo el lote de eventos una vez recibidos. MANUAL modo consiste en controlar los eventos por parte de los usuarios. Cuando se usa, el Checkpointer se pasará al encabezado del mensaje y los usuarios podrían usarlo para realizar puntos de control.

La directiva de consumo por lotes se puede especificar mediante propiedades de max-size y max-wait-time, donde max-size es una propiedad necesaria mientras max-wait-time es opcional. Para especificar la estrategia de consumo por lotes, los desarrolladores pueden usar EventHubsContainerProperties para la configuración. Consulte la sección siguiente para obtener un ejemplo de cómo configurar EventHubsContainerProperties.

Configuración de dependencias

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Configuración

Este inicio proporciona las siguientes tres partes de las opciones de configuración:

Propiedades de configuración de conexión

Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Event Hubs.

Nota

Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorizar el acceso con el identificador de Entra de Microsoft para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.

Propiedades configurables de conexión de spring-cloud-azure-starter-integration-eventhubs:

Propiedad Tipo Descripción
spring.cloud.azure.eventhubs.enabled booleano Si una instancia de Azure Event Hubs está habilitada.
spring.cloud.azure.eventhubs.connection-string Cuerda Valor de cadena de conexión del espacio de nombres de Event Hubs.
spring.cloud.azure.eventhubsespacio de nombres .namespace Cuerda Valor del espacio de nombres de Event Hubs, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Cuerda Nombre de dominio de un valor de espacio de nombres de Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Cuerda Dirección del punto de conexión personalizado.
spring.cloud.azure.eventhubs.shared-connection Booleano Si EventProcessorClient y EventHubProducerAsyncClient subyacente usan la misma conexión. De forma predeterminada, se crea una nueva conexión y se usa para cada cliente del centro de eventos creado.

Propiedades de configuración del punto de control

Esta sección contiene las opciones de configuración del servicio Storage Blobs, que se usa para conservar la propiedad de la partición y la información del punto de control.

Nota

Desde la versión 4.0.0, cuando la propiedad de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists no está habilitada manualmente, no se creará automáticamente ningún contenedor de almacenamiento.

Propiedades configurables de punto de control de spring-cloud-azure-starter-integration-eventhubs:

Propiedad Tipo Descripción
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleano Si quiere permitir la creación de contenedores si no existe.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Cuerda Nombre de la cuenta de almacenamiento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Cuerda Clave de acceso de la cuenta de almacenamiento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Cuerda Nombre del contenedor de almacenamiento.

Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el almacén de puntos de control de Blob de Storage. Las opciones de configuración admitidas se presentan en configuración de Azure de Spring Cloudy se pueden configurar con el prefijo unificado spring.cloud.azure. o el prefijo de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propiedades de configuración del procesador del centro de eventos

El EventHubsInboundChannelAdapter usa el EventProcessorClient para consumir mensajes de un centro de eventos, para configurar las propiedades generales de un EventProcessorClient, los desarrolladores pueden usar EventHubsContainerProperties para la configuración. Consulte la sección siguiente sobre cómo trabajar con EventHubsInboundChannelAdapter.

Uso básico

Envío de mensajes a Azure Event Hubs

  1. Rellene las opciones de configuración de credenciales.

    • Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Nota

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.

  1. Cree DefaultMessageHandler con el EventHubsTemplate bean para enviar mensajes a Event Hubs.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Cree un enlace de puerta de enlace de mensajes con el controlador de mensajes anterior a través de un canal de mensajes.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Enviar mensajes mediante la puerta de enlace.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Recepción de mensajes de Azure Event Hubs

  1. Rellene las opciones de configuración de credenciales.

  2. Cree un bean del canal de mensajes como canal de entrada.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Cree EventHubsInboundChannelAdapter con el EventHubsMessageListenerContainer bean para recibir mensajes de Event Hubs.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Cree un enlace de receptor de mensajes con EventHubsInboundChannelAdapter a través del canal de mensajes creado antes.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configuración de EventHubsMessageConverter para personalizar objectMapper

EventHubsMessageConverter se realiza como un bean configurable para permitir a los usuarios personalizar ObjectMapper.

Compatibilidad con consumidores de Batch

Para consumir mensajes de Event Hubs en lotes es similar al ejemplo anterior, además de que los usuarios deben establecer las opciones de configuración relacionadas que consumen por lotes para EventHubsInboundChannelAdapter.

Al crear EventHubsInboundChannelAdapter, el modo de escucha debe establecerse como BATCH. Cuando cree bean de EventHubsMessageListenerContainer, establezca el modo de punto de comprobación como MANUAL o BATCH, y las opciones de lote se pueden configurar según sea necesario.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Encabezados de mensaje de Event Hubs

En la tabla siguiente se muestra cómo se asignan las propiedades del mensaje de Event Hubs a los encabezados de mensaje de Spring. Para Azure Event Hubs, se llama al mensaje como event.

Asignación entre las propiedades de evento o mensaje de Event Hubs y los encabezados de mensaje de Spring en modo de agente de escucha de registros:

Propiedades del evento de Event Hubs Constantes de encabezado de mensaje de Spring Tipo Descripción
Hora puesta en cola EventHubsHeaders#ENQUEUED_TIME Instante El instante, en UTC, de cuándo se puso en cola el evento en la partición del centro de eventos.
Compensar EventHubsHeaders#OFFSET Largo Desplazamiento del evento cuando se recibió de la partición del centro de eventos asociada.
Clave de partición AzureHeaders#PARTITION_KEY Cuerda Clave hash de partición si se estableció al publicar originalmente el evento.
Id. de partición AzureHeaders#RAW_PARTITION_ID Cuerda Identificador de partición del centro de eventos.
Número de secuencia EventHubsHeaders#SEQUENCE_NUMBER Largo Número de secuencia asignado al evento cuando se puso en cola en la partición del centro de eventos asociado.
Últimas propiedades de evento en cola EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Propiedades del último evento en cola en esta partición.
NA AzureHeaders#CHECKPOINTER Checkpointer Encabezado del punto de control del mensaje específico.

Los usuarios pueden analizar los encabezados de mensaje para obtener la información relacionada de cada evento. Para establecer un encabezado de mensaje para el evento, todos los encabezados personalizados se colocarán como una propiedad de aplicación de un evento, donde el encabezado se establece como clave de propiedad. Cuando se reciben eventos de Event Hubs, todas las propiedades de la aplicación se convertirán en el encabezado del mensaje.

Nota

No se admiten los encabezados de mensaje de clave de partición, tiempo en cola, desplazamiento y número de secuencia para establecerse manualmente.

Cuando el modo de consumidor por lotes está habilitado, los encabezados específicos de los mensajes por lotes se enumeran a continuación, que contiene una lista de valores de cada evento de Event Hubs único.

Asignación entre event Hubs Message/Event Properties y Spring Message Headers en el modo de agente de escucha por lotes:

Propiedades del evento de Event Hubs Constantes de encabezado de mensaje de Spring Batch Tipo Descripción
Hora puesta en cola EventHubsHeaders#ENQUEUED_TIME Lista de instantáneas Lista del instante, en UTC, de cuándo se puso en cola cada evento en la partición del centro de eventos.
Compensar EventHubsHeaders#OFFSET Lista de long Lista del desplazamiento de cada evento cuando se recibió de la partición del centro de eventos asociada.
Clave de partición AzureHeaders#PARTITION_KEY Lista de cadenas Lista de la clave de hash de partición si se estableció al publicar originalmente cada evento.
Número de secuencia EventHubsHeaders#SEQUENCE_NUMBER Lista de long Lista del número de secuencia asignado a cada evento cuando se puso en cola en la partición del centro de eventos asociada.
Propiedades del sistema EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Lista de mapa Lista de las propiedades del sistema de cada evento.
Propiedades de la aplicación EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Lista de mapa Lista de las propiedades de la aplicación de cada evento, donde se colocan todos los encabezados de mensaje personalizados o propiedades de evento.

Nota

Al publicar mensajes, todos los encabezados por lotes anteriores se quitarán de los mensajes si existen.

Muestras

Para más información, consulte el repositorio de azure-spring-boot-samples en GitHub.

Integración de Spring con Azure Service Bus

Conceptos clave

Spring Integration permite la mensajería ligera dentro de las aplicaciones basadas en Spring y admite la integración con sistemas externos a través de adaptadores declarativos.

El proyecto de extensión Spring Integration for Azure Service Bus proporciona adaptadores de canal entrantes y salientes para Azure Service Bus.

Nota

Las API de soporte técnico completableFuture han quedado en desuso de la versión 2.10.0 y se reemplazan por Reactor Core de la versión 4.0.0. Consulte Javadoc para obtener más información.

Configuración de dependencias

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Configuración

Este inicio proporciona las siguientes 2 partes de las opciones de configuración:

Propiedades de configuración de conexión

Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Service Bus.

Nota

Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorizar el acceso con el identificador de Entra de Microsoft para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.

Propiedades configurables de conexión de spring-cloud-azure-starter-integration-servicebus:

Propiedad Tipo Descripción
spring.cloud.azure.servicebus.enabled booleano Indica si una instancia de Azure Service Bus está habilitada.
spring.cloud.azure.servicebus.connection-string Cuerda Valor de cadena de conexión del espacio de nombres de Service Bus.
spring.cloud.azure.servicebus.custom-endpoint-address Cuerda Dirección del punto de conexión personalizado que se va a usar al conectarse a Service Bus.
espacio de nombres spring.cloud.azure.servicebus.namespace Cuerda Valor del espacio de nombres de Service Bus, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Cuerda Nombre de dominio de un valor de espacio de nombres de Azure Service Bus.

Propiedades de configuración del procesador de Service Bus

El ServiceBusInboundChannelAdapter usa el ServiceBusProcessorClient para consumir mensajes, para configurar las propiedades generales de un ServiceBusProcessorClient, los desarrolladores pueden usar ServiceBusContainerProperties para la configuración. Consulte la sección siguiente sobre cómo trabajar con ServiceBusInboundChannelAdapter.

Uso básico

Envío de mensajes a Azure Service Bus

  1. Rellene las opciones de configuración de credenciales.

    • Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Nota

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.

  • Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Nota

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.

  1. Cree DefaultMessageHandler con el ServiceBusTemplate bean para enviar mensajes a Service Bus y establezca el tipo de entidad para ServiceBusTemplate. En este ejemplo se toma la cola de Service Bus como ejemplo.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Cree un enlace de puerta de enlace de mensajes con el controlador de mensajes anterior a través de un canal de mensajes.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Enviar mensajes mediante la puerta de enlace.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Recepción de mensajes de Azure Service Bus

  1. Rellene las opciones de configuración de credenciales.

  2. Cree un bean del canal de mensajes como canal de entrada.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Cree ServiceBusInboundChannelAdapter con el ServiceBusMessageListenerContainer bean para recibir mensajes en Service Bus. En este ejemplo se toma la cola de Service Bus como ejemplo.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Cree un enlace de receptor de mensajes con ServiceBusInboundChannelAdapter a través del canal de mensajes que creamos antes.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configuración de ServiceBusMessageConverter para personalizar objectMapper

ServiceBusMessageConverter se realiza como un bean configurable para permitir a los usuarios personalizar ObjectMapper.

Encabezados de mensaje de Service Bus

Para algunos encabezados de Service Bus que se pueden asignar a varias constantes de encabezado de Spring, se muestra la prioridad de distintos encabezados de Spring.

Asignación entre encabezados de Service Bus y encabezados de Spring:

Encabezados y propiedades del mensaje de Service Bus Constantes de encabezado de mensaje spring Tipo Configurable Descripción
Tipo de contenido MessageHeaders#CONTENT_TYPE Cuerda Descriptor de tipo de contenido RFC2045 del mensaje.
Id. de correlación ServiceBusMessageHeaders#CORRELATION_ID Cuerda Identificador de correlación del mensaje
Id. de mensaje ServiceBusMessageHeaders#MESSAGE_ID Cuerda El identificador de mensaje del mensaje, este encabezado tiene mayor prioridad que MessageHeaders#ID.
Id. de mensaje MessageHeaders#ID UUID El identificador de mensaje del mensaje, este encabezado tiene una prioridad menor que ServiceBusMessageHeaders#MESSAGE_ID.
Clave de partición ServiceBusMessageHeaders#PARTITION_KEY Cuerda Clave de partición para enviar el mensaje a una entidad con particiones.
Responder a MessageHeaders#REPLY_CHANNEL Cuerda Dirección de una entidad a la que se van a enviar respuestas.
Responder al identificador de sesión ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Cuerda Valor de la propiedad ReplyToGroupId del mensaje.
Hora de puesta en cola programada utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Fecha y hora en la que se debe poner el mensaje en cola en Service Bus, este encabezado tiene mayor prioridad que AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Hora de puesta en cola programada utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Entero Fecha y hora en la que se debe poner en cola el mensaje en Service Bus, este encabezado tiene una prioridad menor que ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Identificador de sesión ServiceBusMessageHeaders#SESSION_ID Cuerda Identificador de sesión para una entidad compatible con sesión.
Período de vida ServiceBusMessageHeaders#TIME_TO_LIVE Duración Duración del tiempo antes de que expire este mensaje.
Para ServiceBusMessageHeaders#TO Cuerda La dirección "to" del mensaje, reservada para su uso futuro en escenarios de enrutamiento y actualmente ignorada por el propio agente.
Asunto ServiceBusMessageHeaders#SUBJECT Cuerda Asunto del mensaje.
Descripción del error de mensajes fallidos ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Cuerda No La descripción de un mensaje que se ha escrito con mensajes fallidos.
Motivo de mensajes fallidos ServiceBusMessageHeaders#DEAD_LETTER_REASON Cuerda No La razón por la que se ha producido un mensaje fallido.
Origen de mensajes fallidos ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Cuerda No La entidad en la que el mensaje estaba en mensajes fallidos.
Recuento de entregas ServiceBusMessageHeaders#DELIVERY_COUNT largo No Número de veces que este mensaje se entregó a los clientes.
Número de secuencia en cola ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER largo No Número de secuencia en cola asignado a un mensaje por Service Bus.
Hora puesta en cola ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime No Fecha y hora en la que se puso en cola este mensaje en Service Bus.
Expira en ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime No Fecha y hora en la que expirará este mensaje.
Token de bloqueo ServiceBusMessageHeaders#LOCK_TOKEN Cuerda No Token de bloqueo del mensaje actual.
Bloqueado hasta ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime No Fecha y hora en la que expira el bloqueo de este mensaje.
Número de secuencia ServiceBusMessageHeaders#SEQUENCE_NUMBER largo No Número único asignado a un mensaje por Service Bus.
Estado ServiceBusMessageHeaders#STATE ServiceBusMessageState No Estado del mensaje, que puede ser Activo, Diferido o Programado.

Compatibilidad con claves de partición

Este inicio admite la creación de particiones de Service Bus, ya que permite establecer la clave de partición y el identificador de sesión en el encabezado del mensaje. En esta sección se presenta cómo establecer la clave de partición para los mensajes.

Recomendado: Usar ServiceBusMessageHeaders.PARTITION_KEY como clave del encabezado.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

No se recomienda, pero actualmente se admite:AzureHeaders.PARTITION_KEY como clave del encabezado.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota

Cuando ServiceBusMessageHeaders.PARTITION_KEY y AzureHeaders.PARTITION_KEY se establecen en los encabezados del mensaje, se prefiere ServiceBusMessageHeaders.PARTITION_KEY.

Compatibilidad con sesiones

En este ejemplo se muestra cómo establecer manualmente el identificador de sesión de un mensaje en la aplicación.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota

Cuando el ServiceBusMessageHeaders.SESSION_ID se establece en los encabezados del mensaje y también se establece un encabezado de ServiceBusMessageHeaders.PARTITION_KEY diferente, el valor del identificador de sesión se usará finalmente para sobrescribir el valor de la clave de partición.

Personalización de las propiedades del cliente de Service Bus

Los desarrolladores pueden usar AzureServiceClientBuilderCustomizer para personalizar las propiedades del cliente de Service Bus. En el ejemplo siguiente se personaliza la propiedad sessionIdleTimeout en ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Muestras

Para más información, consulte el repositorio de azure-spring-boot-samples en GitHub.

Integración de Spring con la cola de Azure Storage

Conceptos clave

Azure Queue Storage es un servicio para almacenar un gran número de mensajes. Puede acceder a los mensajes desde cualquier lugar del mundo a través de llamadas autenticadas mediante HTTP o HTTPS. Un mensaje de cola puede tener un tamaño de hasta 64 KB. Una cola puede contener millones de mensajes, hasta el límite de capacidad total de una cuenta de almacenamiento. Las colas se usan normalmente para crear un trabajo pendiente para procesar de forma asincrónica.

Configuración de dependencias

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Configuración

Este inicio proporciona las siguientes opciones de configuración:

Propiedades de configuración de conexión

Esta sección contiene las opciones de configuración que se usan para conectarse a la cola de Azure Storage.

Nota

Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorizar el acceso con el identificador de Entra de Microsoft para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.

Propiedades configurables de conexión de spring-cloud-azure-starter-integration-storage-queue:

Propiedad Tipo Descripción
spring.cloud.azure.storage.queue.enabled booleano Si una cola de Azure Storage está habilitada.
spring.cloud.azure.storage.queue.connection-string Cuerda Valor de cadena de conexión de espacio de nombres de cola de storage.
spring.cloud.azure.storage.queue.accountName Cuerda Nombre de la cuenta de cola de Storage.
spring.cloud.azure.storage.queue.accountKey Cuerda Clave de cuenta de cola de Storage.
spring.cloud.azure.storage.queue.endpoint Cuerda Punto de conexión de Servicio de cola de Storage.
spring.cloud.azure.storage.queue.sasToken Cuerda Credencial del token de Sas
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion que se usa al realizar solicitudes de API.
spring.cloud.azure.storage.queue.messageEncoding Cuerda Codificación de mensajes de cola.

Uso básico

Envío de mensajes a la cola de Azure Storage

  1. Rellene las opciones de configuración de credenciales.

    • Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      
    • Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Nota

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.

  • Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Nota

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.

  1. Cree DefaultMessageHandler con el StorageQueueTemplate bean para enviar mensajes a la cola de Storage.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Cree un enlace de puerta de enlace de mensajes con el controlador de mensajes anterior a través de un canal de mensajes.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Enviar mensajes mediante la puerta de enlace.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Recepción de mensajes de la cola de Azure Storage

  1. Rellene las opciones de configuración de credenciales.

  2. Cree un bean del canal de mensajes como canal de entrada.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Cree StorageQueueMessageSource con el StorageQueueTemplate bean para recibir mensajes en la cola de Storage.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Cree un enlace de receptor de mensajes con StorageQueueMessageSource creado en el último paso a través del canal de mensajes que creamos antes.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Muestras

Para más información, consulte el repositorio de azure-spring-boot-samples en GitHub.