Compatibilidad de Spring Cloud con Spring Integration
Este artículo se aplica a:✅ versión 4.19.0 ✅ versión 5.19.0
La extensión de integración de Spring para Azure proporciona adaptadores de integración de Spring para los distintos servicios proporcionados por el SDK de Azure para Java. Proporcionamos compatibilidad con Spring Integration para estos servicios de Azure: Event Hubs, Service Bus, Storage Queue. A continuación se muestra una lista de adaptadores admitidos:
-
spring-cloud-azure-starter-integration-eventhubs
: para más información, consulte Spring Integration con Azure Event Hubs -
spring-cloud-azure-starter-integration-servicebus
: para obtener más información, consulte Spring Integration with Azure Service Bus -
spring-cloud-azure-starter-integration-storage-queue
: para obtener más información, consulte Spring Integration with Azure Storage Queue
Integración de Spring con Azure Event Hubs
Conceptos clave
Azure Event Hubs es una plataforma de streaming de macrodatos y un servicio de ingesta de eventos. Puede recibir y procesar millones de eventos por segundo. Los datos enviados a un centro de eventos se pueden transformar y almacenar mediante cualquier proveedor de análisis en tiempo real o adaptadores de procesamiento por lotes o almacenamiento.
Spring Integration permite la mensajería ligera dentro de las aplicaciones basadas en Spring y admite la integración con sistemas externos a través de adaptadores declarativos. Esos adaptadores proporcionan un nivel superior de abstracción a través de la compatibilidad de Spring con la comunicación remota, la mensajería y la programación. El proyecto de extensión Spring Integration for Event Hubs proporciona adaptadores de canal entrantes y salientes y puertas de enlace para Azure Event Hubs.
Nota
Las API de soporte técnico de RxJava se quitan de la versión 4.0.0. Consulte Javadoc para obtener más información.
Grupo de consumidores
Event Hubs proporciona compatibilidad similar del grupo de consumidores como Apache Kafka, pero con una lógica ligeramente diferente. Aunque Kafka almacena todos los desplazamientos confirmados en el agente, debe almacenar los desplazamientos de los mensajes de Event Hubs que se procesan manualmente. El SDK de Event Hubs proporciona la función para almacenar estos desplazamientos dentro de Azure Storage.
Compatibilidad con la creación de particiones
Event Hubs proporciona un concepto similar de partición física como Kafka. Pero a diferencia del reequilibrio automático de Kafka entre consumidores y particiones, Event Hubs proporciona un tipo de modo preferente. La cuenta de almacenamiento actúa como concesión para determinar qué partición es propiedad del consumidor. Cuando se inicia un nuevo consumidor, intentará robar algunas particiones de los consumidores más cargados para lograr el equilibrio de carga de trabajo.
Para especificar la estrategia de equilibrio de carga, los desarrolladores pueden usar EventHubsContainerProperties
para la configuración. Consulte la sección siguiente para obtener un ejemplo de cómo configurar EventHubsContainerProperties
.
Compatibilidad con consumidores de Batch
El EventHubsInboundChannelAdapter
admite el modo de consumo por lotes. Para habilitarlo, los usuarios pueden especificar el modo de escucha como ListenerMode.BATCH
al construir una instancia de EventHubsInboundChannelAdapter
.
Cuando está habilitada, se recibirá una mensaje de la que la carga es una lista de eventos por lotes y se pasará al canal de bajada. Cada encabezado de mensaje también se convierte como una lista, de la que el contenido es el valor de encabezado asociado analizado de cada evento. Para los encabezados comunes del identificador de partición, el punto de control y las últimas propiedades en cola, se presentan como un valor único para todo el lote de eventos que comparten el mismo. Para obtener más información, consulte la sección encabezados de mensajes de Event Hubs.
Nota
El encabezado de punto de control solo existe cuando se usa modo manual punto de control.
El punto de comprobación del consumidor por lotes admite dos modos: BATCH
y MANUAL
.
BATCH
modo es un modo de punto de comprobación automático para controlar todo el lote de eventos una vez recibidos.
MANUAL
modo consiste en controlar los eventos por parte de los usuarios. Cuando se usa, el
La directiva de consumo por lotes se puede especificar mediante propiedades de max-size
y max-wait-time
, donde max-size
es una propiedad necesaria mientras max-wait-time
es opcional.
Para especificar la estrategia de consumo por lotes, los desarrolladores pueden usar EventHubsContainerProperties
para la configuración. Consulte la sección siguiente para obtener un ejemplo de cómo configurar EventHubsContainerProperties
.
Configuración de dependencias
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Configuración
Este inicio proporciona las siguientes tres partes de las opciones de configuración:
Propiedades de configuración de conexión
Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Event Hubs.
Nota
Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorizar el acceso con el identificador de Entra de Microsoft para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.
Propiedades configurables de conexión de spring-cloud-azure-starter-integration-eventhubs:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.azure.eventhubs.enabled | booleano | Si una instancia de Azure Event Hubs está habilitada. |
spring.cloud.azure.eventhubs.connection-string | Cuerda | Valor de cadena de conexión del espacio de nombres de Event Hubs. |
spring.cloud.azure.eventhubsespacio de nombres .namespace | Cuerda | Valor del espacio de nombres de Event Hubs, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Cuerda | Nombre de dominio de un valor de espacio de nombres de Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Cuerda | Dirección del punto de conexión personalizado. |
spring.cloud.azure.eventhubs.shared-connection | Booleano | Si EventProcessorClient y EventHubProducerAsyncClient subyacente usan la misma conexión. De forma predeterminada, se crea una nueva conexión y se usa para cada cliente del centro de eventos creado. |
Propiedades de configuración del punto de control
Esta sección contiene las opciones de configuración del servicio Storage Blobs, que se usa para conservar la propiedad de la partición y la información del punto de control.
Nota
Desde la versión 4.0.0, cuando la propiedad de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists no está habilitada manualmente, no se creará automáticamente ningún contenedor de almacenamiento.
Propiedades configurables de punto de control de spring-cloud-azure-starter-integration-eventhubs:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booleano | Si quiere permitir la creación de contenedores si no existe. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Cuerda | Nombre de la cuenta de almacenamiento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Cuerda | Clave de acceso de la cuenta de almacenamiento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Cuerda | Nombre del contenedor de almacenamiento. |
Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el almacén de puntos de control de Blob de Storage. Las opciones de configuración admitidas se presentan en configuración de Azure de Spring Cloudy se pueden configurar con el prefijo unificado spring.cloud.azure.
o el prefijo de spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Propiedades de configuración del procesador del centro de eventos
El EventHubsInboundChannelAdapter
usa el EventProcessorClient
para consumir mensajes de un centro de eventos, para configurar las propiedades generales de un EventProcessorClient
, los desarrolladores pueden usar EventHubsContainerProperties
para la configuración. Consulte la sección siguiente sobre cómo trabajar con EventHubsInboundChannelAdapter
.
Uso básico
Envío de mensajes a Azure Event Hubs
Rellene las opciones de configuración de credenciales.
Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Nota
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.
Cree
DefaultMessageHandler
con elEventHubsTemplate
bean para enviar mensajes a Event Hubs.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Cree un enlace de puerta de enlace de mensajes con el controlador de mensajes anterior a través de un canal de mensajes.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Enviar mensajes mediante la puerta de enlace.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Recepción de mensajes de Azure Event Hubs
Rellene las opciones de configuración de credenciales.
Cree un bean del canal de mensajes como canal de entrada.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Cree
EventHubsInboundChannelAdapter
con elEventHubsMessageListenerContainer
bean para recibir mensajes de Event Hubs.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Cree un enlace de receptor de mensajes con EventHubsInboundChannelAdapter a través del canal de mensajes creado antes.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Configuración de EventHubsMessageConverter para personalizar objectMapper
EventHubsMessageConverter
se realiza como un bean configurable para permitir a los usuarios personalizar ObjectMapper.
Compatibilidad con consumidores de Batch
Para consumir mensajes de Event Hubs en lotes es similar al ejemplo anterior, además de que los usuarios deben establecer las opciones de configuración relacionadas que consumen por lotes para EventHubsInboundChannelAdapter
.
Al crear EventHubsInboundChannelAdapter
, el modo de escucha debe establecerse como BATCH
. Cuando cree bean de EventHubsMessageListenerContainer
, establezca el modo de punto de comprobación como MANUAL
o BATCH
, y las opciones de lote se pueden configurar según sea necesario.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Encabezados de mensaje de Event Hubs
En la tabla siguiente se muestra cómo se asignan las propiedades del mensaje de Event Hubs a los encabezados de mensaje de Spring. Para Azure Event Hubs, se llama al mensaje como event
.
Asignación entre las propiedades de evento o mensaje de Event Hubs y los encabezados de mensaje de Spring en modo de agente de escucha de registros:
Propiedades del evento de Event Hubs | Constantes de encabezado de mensaje de Spring | Tipo | Descripción |
---|---|---|---|
Hora puesta en cola | EventHubsHeaders#ENQUEUED_TIME | Instante | El instante, en UTC, de cuándo se puso en cola el evento en la partición del centro de eventos. |
Compensar | EventHubsHeaders#OFFSET | Largo | Desplazamiento del evento cuando se recibió de la partición del centro de eventos asociada. |
Clave de partición | AzureHeaders#PARTITION_KEY | Cuerda | Clave hash de partición si se estableció al publicar originalmente el evento. |
Id. de partición | AzureHeaders#RAW_PARTITION_ID | Cuerda | Identificador de partición del centro de eventos. |
Número de secuencia | EventHubsHeaders#SEQUENCE_NUMBER | Largo | Número de secuencia asignado al evento cuando se puso en cola en la partición del centro de eventos asociado. |
Últimas propiedades de evento en cola | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Propiedades del último evento en cola en esta partición. |
NA | AzureHeaders#CHECKPOINTER | Checkpointer | Encabezado del punto de control del mensaje específico. |
Los usuarios pueden analizar los encabezados de mensaje para obtener la información relacionada de cada evento. Para establecer un encabezado de mensaje para el evento, todos los encabezados personalizados se colocarán como una propiedad de aplicación de un evento, donde el encabezado se establece como clave de propiedad. Cuando se reciben eventos de Event Hubs, todas las propiedades de la aplicación se convertirán en el encabezado del mensaje.
Nota
No se admiten los encabezados de mensaje de clave de partición, tiempo en cola, desplazamiento y número de secuencia para establecerse manualmente.
Cuando el modo de consumidor por lotes está habilitado, los encabezados específicos de los mensajes por lotes se enumeran a continuación, que contiene una lista de valores de cada evento de Event Hubs único.
Asignación entre event Hubs Message/Event Properties y Spring Message Headers en el modo de agente de escucha por lotes:
Propiedades del evento de Event Hubs | Constantes de encabezado de mensaje de Spring Batch | Tipo | Descripción |
---|---|---|---|
Hora puesta en cola | EventHubsHeaders#ENQUEUED_TIME | Lista de instantáneas | Lista del instante, en UTC, de cuándo se puso en cola cada evento en la partición del centro de eventos. |
Compensar | EventHubsHeaders#OFFSET | Lista de long | Lista del desplazamiento de cada evento cuando se recibió de la partición del centro de eventos asociada. |
Clave de partición | AzureHeaders#PARTITION_KEY | Lista de cadenas | Lista de la clave de hash de partición si se estableció al publicar originalmente cada evento. |
Número de secuencia | EventHubsHeaders#SEQUENCE_NUMBER | Lista de long | Lista del número de secuencia asignado a cada evento cuando se puso en cola en la partición del centro de eventos asociada. |
Propiedades del sistema | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Lista de mapa | Lista de las propiedades del sistema de cada evento. |
Propiedades de la aplicación | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Lista de mapa | Lista de las propiedades de la aplicación de cada evento, donde se colocan todos los encabezados de mensaje personalizados o propiedades de evento. |
Nota
Al publicar mensajes, todos los encabezados por lotes anteriores se quitarán de los mensajes si existen.
Muestras
Para más información, consulte el repositorio de azure-spring-boot-samples en GitHub.
Integración de Spring con Azure Service Bus
Conceptos clave
Spring Integration permite la mensajería ligera dentro de las aplicaciones basadas en Spring y admite la integración con sistemas externos a través de adaptadores declarativos.
El proyecto de extensión Spring Integration for Azure Service Bus proporciona adaptadores de canal entrantes y salientes para Azure Service Bus.
Nota
Las API de soporte técnico completableFuture han quedado en desuso de la versión 2.10.0 y se reemplazan por Reactor Core de la versión 4.0.0. Consulte Javadoc para obtener más información.
Configuración de dependencias
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Configuración
Este inicio proporciona las siguientes 2 partes de las opciones de configuración:
Propiedades de configuración de conexión
Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Service Bus.
Nota
Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorizar el acceso con el identificador de Entra de Microsoft para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.
Propiedades configurables de conexión de spring-cloud-azure-starter-integration-servicebus:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.azure.servicebus.enabled | booleano | Indica si una instancia de Azure Service Bus está habilitada. |
spring.cloud.azure.servicebus.connection-string | Cuerda | Valor de cadena de conexión del espacio de nombres de Service Bus. |
spring.cloud.azure.servicebus.custom-endpoint-address | Cuerda | Dirección del punto de conexión personalizado que se va a usar al conectarse a Service Bus. |
espacio de nombres spring.cloud.azure.servicebus.namespace | Cuerda | Valor del espacio de nombres de Service Bus, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Cuerda | Nombre de dominio de un valor de espacio de nombres de Azure Service Bus. |
Propiedades de configuración del procesador de Service Bus
El ServiceBusInboundChannelAdapter
usa el ServiceBusProcessorClient
para consumir mensajes, para configurar las propiedades generales de un ServiceBusProcessorClient
, los desarrolladores pueden usar ServiceBusContainerProperties
para la configuración. Consulte la sección siguiente sobre cómo trabajar con ServiceBusInboundChannelAdapter
.
Uso básico
Envío de mensajes a Azure Service Bus
Rellene las opciones de configuración de credenciales.
Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Nota
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.
Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Nota
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.
Cree
DefaultMessageHandler
con elServiceBusTemplate
bean para enviar mensajes a Service Bus y establezca el tipo de entidad para ServiceBusTemplate. En este ejemplo se toma la cola de Service Bus como ejemplo.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Cree un enlace de puerta de enlace de mensajes con el controlador de mensajes anterior a través de un canal de mensajes.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Enviar mensajes mediante la puerta de enlace.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Recepción de mensajes de Azure Service Bus
Rellene las opciones de configuración de credenciales.
Cree un bean del canal de mensajes como canal de entrada.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Cree
ServiceBusInboundChannelAdapter
con elServiceBusMessageListenerContainer
bean para recibir mensajes en Service Bus. En este ejemplo se toma la cola de Service Bus como ejemplo.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Cree un enlace de receptor de mensajes con
ServiceBusInboundChannelAdapter
a través del canal de mensajes que creamos antes.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Configuración de ServiceBusMessageConverter para personalizar objectMapper
ServiceBusMessageConverter
se realiza como un bean configurable para permitir a los usuarios personalizar ObjectMapper
.
Encabezados de mensaje de Service Bus
Para algunos encabezados de Service Bus que se pueden asignar a varias constantes de encabezado de Spring, se muestra la prioridad de distintos encabezados de Spring.
Asignación entre encabezados de Service Bus y encabezados de Spring:
Encabezados y propiedades del mensaje de Service Bus | Constantes de encabezado de mensaje spring | Tipo | Configurable | Descripción |
---|---|---|---|---|
Tipo de contenido | MessageHeaders#CONTENT_TYPE |
Cuerda | Sí | Descriptor de tipo de contenido RFC2045 del mensaje. |
Id. de correlación | ServiceBusMessageHeaders#CORRELATION_ID |
Cuerda | Sí | Identificador de correlación del mensaje |
Id. de mensaje | ServiceBusMessageHeaders#MESSAGE_ID |
Cuerda | Sí | El identificador de mensaje del mensaje, este encabezado tiene mayor prioridad que MessageHeaders#ID . |
Id. de mensaje | MessageHeaders#ID |
UUID | Sí | El identificador de mensaje del mensaje, este encabezado tiene una prioridad menor que ServiceBusMessageHeaders#MESSAGE_ID . |
Clave de partición | ServiceBusMessageHeaders#PARTITION_KEY |
Cuerda | Sí | Clave de partición para enviar el mensaje a una entidad con particiones. |
Responder a | MessageHeaders#REPLY_CHANNEL |
Cuerda | Sí | Dirección de una entidad a la que se van a enviar respuestas. |
Responder al identificador de sesión | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Cuerda | Sí | Valor de la propiedad ReplyToGroupId del mensaje. |
Hora de puesta en cola programada utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Sí | Fecha y hora en la que se debe poner el mensaje en cola en Service Bus, este encabezado tiene mayor prioridad que AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Hora de puesta en cola programada utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Entero | Sí | Fecha y hora en la que se debe poner en cola el mensaje en Service Bus, este encabezado tiene una prioridad menor que ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
Identificador de sesión | ServiceBusMessageHeaders#SESSION_ID |
Cuerda | Sí | Identificador de sesión para una entidad compatible con sesión. |
Período de vida | ServiceBusMessageHeaders#TIME_TO_LIVE |
Duración | Sí | Duración del tiempo antes de que expire este mensaje. |
Para | ServiceBusMessageHeaders#TO |
Cuerda | Sí | La dirección "to" del mensaje, reservada para su uso futuro en escenarios de enrutamiento y actualmente ignorada por el propio agente. |
Asunto | ServiceBusMessageHeaders#SUBJECT |
Cuerda | Sí | Asunto del mensaje. |
Descripción del error de mensajes fallidos | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Cuerda | No | La descripción de un mensaje que se ha escrito con mensajes fallidos. |
Motivo de mensajes fallidos | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Cuerda | No | La razón por la que se ha producido un mensaje fallido. |
Origen de mensajes fallidos | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Cuerda | No | La entidad en la que el mensaje estaba en mensajes fallidos. |
Recuento de entregas | ServiceBusMessageHeaders#DELIVERY_COUNT |
largo | No | Número de veces que este mensaje se entregó a los clientes. |
Número de secuencia en cola | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
largo | No | Número de secuencia en cola asignado a un mensaje por Service Bus. |
Hora puesta en cola | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | No | Fecha y hora en la que se puso en cola este mensaje en Service Bus. |
Expira en | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | No | Fecha y hora en la que expirará este mensaje. |
Token de bloqueo | ServiceBusMessageHeaders#LOCK_TOKEN |
Cuerda | No | Token de bloqueo del mensaje actual. |
Bloqueado hasta | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | No | Fecha y hora en la que expira el bloqueo de este mensaje. |
Número de secuencia | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
largo | No | Número único asignado a un mensaje por Service Bus. |
Estado | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | No | Estado del mensaje, que puede ser Activo, Diferido o Programado. |
Compatibilidad con claves de partición
Este inicio admite la creación de particiones de Service Bus, ya que permite establecer la clave de partición y el identificador de sesión en el encabezado del mensaje. En esta sección se presenta cómo establecer la clave de partición para los mensajes.
Recomendado: Usar ServiceBusMessageHeaders.PARTITION_KEY
como clave del encabezado.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
No se recomienda, pero actualmente se admite:AzureHeaders.PARTITION_KEY
como clave del encabezado.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nota
Cuando ServiceBusMessageHeaders.PARTITION_KEY
y AzureHeaders.PARTITION_KEY
se establecen en los encabezados del mensaje, se prefiere ServiceBusMessageHeaders.PARTITION_KEY
.
Compatibilidad con sesiones
En este ejemplo se muestra cómo establecer manualmente el identificador de sesión de un mensaje en la aplicación.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nota
Cuando el ServiceBusMessageHeaders.SESSION_ID
se establece en los encabezados del mensaje y también se establece un encabezado de ServiceBusMessageHeaders.PARTITION_KEY
diferente, el valor del identificador de sesión se usará finalmente para sobrescribir el valor de la clave de partición.
Personalización de las propiedades del cliente de Service Bus
Los desarrolladores pueden usar AzureServiceClientBuilderCustomizer
para personalizar las propiedades del cliente de Service Bus. En el ejemplo siguiente se personaliza la propiedad sessionIdleTimeout
en ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Muestras
Para más información, consulte el repositorio de azure-spring-boot-samples en GitHub.
Integración de Spring con la cola de Azure Storage
Conceptos clave
Azure Queue Storage es un servicio para almacenar un gran número de mensajes. Puede acceder a los mensajes desde cualquier lugar del mundo a través de llamadas autenticadas mediante HTTP o HTTPS. Un mensaje de cola puede tener un tamaño de hasta 64 KB. Una cola puede contener millones de mensajes, hasta el límite de capacidad total de una cuenta de almacenamiento. Las colas se usan normalmente para crear un trabajo pendiente para procesar de forma asincrónica.
Configuración de dependencias
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Configuración
Este inicio proporciona las siguientes opciones de configuración:
Propiedades de configuración de conexión
Esta sección contiene las opciones de configuración que se usan para conectarse a la cola de Azure Storage.
Nota
Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorizar el acceso con el identificador de Entra de Microsoft para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.
Propiedades configurables de conexión de spring-cloud-azure-starter-integration-storage-queue:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.azure.storage.queue.enabled | booleano | Si una cola de Azure Storage está habilitada. |
spring.cloud.azure.storage.queue.connection-string | Cuerda | Valor de cadena de conexión de espacio de nombres de cola de storage. |
spring.cloud.azure.storage.queue.accountName | Cuerda | Nombre de la cuenta de cola de Storage. |
spring.cloud.azure.storage.queue.accountKey | Cuerda | Clave de cuenta de cola de Storage. |
spring.cloud.azure.storage.queue.endpoint | Cuerda | Punto de conexión de Servicio de cola de Storage. |
spring.cloud.azure.storage.queue.sasToken | Cuerda | Credencial del token de Sas |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion que se usa al realizar solicitudes de API. |
spring.cloud.azure.storage.queue.messageEncoding | Cuerda | Codificación de mensajes de cola. |
Uso básico
Envío de mensajes a la cola de Azure Storage
Rellene las opciones de configuración de credenciales.
Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Nota
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.
Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Nota
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.
Cree
DefaultMessageHandler
con elStorageQueueTemplate
bean para enviar mensajes a la cola de Storage.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Cree un enlace de puerta de enlace de mensajes con el controlador de mensajes anterior a través de un canal de mensajes.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Enviar mensajes mediante la puerta de enlace.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Recepción de mensajes de la cola de Azure Storage
Rellene las opciones de configuración de credenciales.
Cree un bean del canal de mensajes como canal de entrada.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Cree
StorageQueueMessageSource
con elStorageQueueTemplate
bean para recibir mensajes en la cola de Storage.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Cree un enlace de receptor de mensajes con StorageQueueMessageSource creado en el último paso a través del canal de mensajes que creamos antes.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Muestras
Para más información, consulte el repositorio de azure-spring-boot-samples en GitHub.