Compatibilidad de Spring Cloud azure con Spring Cloud Stream
Este artículo se aplica a:✅ versión 4.19.0 ✅ versión 5.19.0
Spring Cloud Stream es un marco para crear microservicios basados en eventos altamente escalables conectados con sistemas de mensajería compartidos.
El marco proporciona un modelo de programación flexible basado en expresiones y procedimientos recomendados de Spring ya establecidos y conocidos. Estos procedimientos recomendados incluyen compatibilidad con la semántica persistente de pub/sub, grupos de consumidores y particiones con estado.
Las implementaciones actuales del enlazador incluyen:
-
spring-cloud-azure-stream-binder-eventhubs
: para más información, consulte Spring Cloud Stream Binder for Azure Event Hubs -
spring-cloud-azure-stream-binder-servicebus
: para obtener más información, consulte Spring Cloud Stream Binder for Azure Service Bus
Spring Cloud Stream Binder para Azure Event Hubs
Conceptos clave
Spring Cloud Stream Binder para Azure Event Hubs proporciona la implementación de enlace para el marco de Spring Cloud Stream. Esta implementación usa adaptadores de canal de Spring Integration Event Hubs en su base. Desde la perspectiva del diseño, Event Hubs es similar a Kafka. Además, se puede acceder a Event Hubs a través de la API de Kafka. Si el proyecto tiene una dependencia estrecha de la API de Kafka, puede probar Centro de eventos con la API de Kafka
Grupo de consumidores
Event Hubs proporciona compatibilidad similar del grupo de consumidores como Apache Kafka, pero con una lógica ligeramente diferente. Aunque Kafka almacena todos los desplazamientos confirmados en el agente, debe almacenar los desplazamientos de los mensajes de Event Hubs que se procesan manualmente. El SDK de Event Hubs proporciona la función para almacenar estos desplazamientos dentro de Azure Storage.
Compatibilidad con la creación de particiones
Event Hubs proporciona un concepto similar de partición física como Kafka. Pero a diferencia del reequilibrio automático de Kafka entre consumidores y particiones, Event Hubs proporciona un tipo de modo preferente. La cuenta de almacenamiento actúa como concesión para determinar qué consumidor posee la partición. Cuando se inicia un nuevo consumidor, intenta robar algunas particiones de los consumidores más cargados para lograr el equilibrio de carga de trabajo.
Para especificar la estrategia de equilibrio de carga, se proporcionan las propiedades de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
. Para obtener más información, consulte la sección
Compatibilidad con consumidores de Batch
El enlazador de Azure Stream Event Hubs de Spring Cloud admite característica de consumidor de Batch de Spring Cloud Stream.
Para trabajar con el modo de consumidor por lotes, establezca la propiedad spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
en true
. Cuando está habilitado, se recibe un mensaje con una carga de una lista de eventos por lotes y se pasa a la función Consumer
. Cada encabezado de mensaje también se convierte en una lista, de la que el contenido es el valor de encabezado asociado analizado de cada evento. Los encabezados comunes del identificador de partición, el punto de control y las últimas propiedades en cola se presentan como un valor único porque todo el lote de eventos comparte el mismo valor. Para obtener más información, consulte la sección encabezados de mensaje de Event Hubs de compatibilidad de Azure de Spring Cloud con Spring Integration.
Nota
El encabezado de punto de control solo existe cuando se usa el modo de punto de comprobación de MANUAL
.
El punto de comprobación del consumidor por lotes admite dos modos: BATCH
y MANUAL
.
BATCH
modo es un modo de punto de comprobación automático para controlar todo el lote de eventos juntos una vez que el enlazador los recibe.
MANUAL
modo consiste en controlar los eventos por parte de los usuarios. Cuando se usa, el Checkpointer
se pasa al encabezado del mensaje y los usuarios pueden usarlo para realizar puntos de control.
Puede especificar el tamaño del lote estableciendo las propiedades max-size
y max-wait-time
que tienen un prefijo de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
. La propiedad max-size
es necesaria y la propiedad max-wait-time
es opcional. Para obtener más información, consulte la sección
Configuración de dependencias
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Como alternativa, también puede usar Spring Cloud Azure Stream Event Hubs Starter, como se muestra en el ejemplo siguiente de Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Configuración
El enlazador proporciona las tres partes siguientes de las opciones de configuración:
Propiedades de configuración de conexión
Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Event Hubs.
Nota
Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorizar el acceso con el identificador de Entra de Microsoft para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.
Propiedades configurables de conexión de spring-cloud-azure-stream-binder-eventhubs:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.azure.eventhubs.enabled | booleano | Si una instancia de Azure Event Hubs está habilitada. |
spring.cloud.azure.eventhubs.connection-string | Cuerda | Valor de cadena de conexión del espacio de nombres de Event Hubs. |
spring.cloud.azure.eventhubsespacio de nombres .namespace | Cuerda | Valor del espacio de nombres de Event Hubs, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Cuerda | Nombre de dominio de un valor de espacio de nombres de Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Cuerda | Dirección del punto de conexión personalizado. |
Propina
Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el enlazador de Azure Stream Event Hubs de Spring Cloud. Las opciones de configuración admitidas se presentan en configuración de Azure de Spring Cloudy se pueden configurar con el prefijo unificado spring.cloud.azure.
o el prefijo de spring.cloud.azure.eventhubs.
.
El enlazador también admite Spring Podría Azure Resource Manager de forma predeterminada. Para obtener información sobre cómo recuperar la cadena de conexión con entidades de seguridad que no se conceden con roles relacionados con
Propiedades de configuración del punto de control
Esta sección contiene las opciones de configuración del servicio Storage Blobs, que se usa para conservar la propiedad de la partición y la información del punto de control.
Nota
Desde la versión 4.0.0, cuando la propiedad de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists no está habilitada manualmente, no se creará ningún contenedor de almacenamiento automáticamente con el nombre de spring.cloud.stream.bindings.binding-name.destination.
Propiedades configurables de puntos de control de spring-cloud-azure-stream-binder-eventhubs:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booleano | Si quiere permitir la creación de contenedores si no existe. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Cuerda | Nombre de la cuenta de almacenamiento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Cuerda | Clave de acceso de la cuenta de almacenamiento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Cuerda | Nombre del contenedor de almacenamiento. |
Propina
Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el almacén de puntos de control de Blob de Storage. Las opciones de configuración admitidas se presentan en configuración de Azure de Spring Cloudy se pueden configurar con el prefijo unificado spring.cloud.azure.
o el prefijo de spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Propiedades de configuración de enlace de Azure Event Hubs
Las siguientes opciones se dividen en cuatro secciones: Propiedades del consumidor, Configuraciones avanzadas de consumidor, Propiedades del productor y Configuraciones avanzadas de productor.
Propiedades del consumidor
Estas propiedades se exponen a través de EventHubsConsumerProperties
.
Nota
Para evitar la repetición, dado que la versión 4.19.0 y 5.19.0, Spring Cloud Azure Stream Binder Event Hubs admite la configuración de valores para todos los canales, en el formato de spring.cloud.stream.eventhubs.default.consumer.<property>=<value>
.
Propiedades configurables del consumidor de spring-cloud-azure-stream-binder-eventhubs:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | Modo de punto de control usado cuando el consumidor decide cómo controlar el mensaje |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Entero | Decide la cantidad de mensaje de cada partición para realizar un punto de control. Solo surtirá efecto cuando se use PARTITION_COUNT modo de punto de comprobación. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Duración | Decide el intervalo de tiempo para realizar un punto de control. Solo surtirá efecto cuando se use TIME modo de punto de comprobación. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | Entero | Número máximo de eventos en un lote. Necesario para el modo de consumidor por lotes. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Duración | Duración máxima de tiempo para el consumo por lotes. Solo surtirá efecto cuando el modo de consumidor por lotes esté habilitado y sea opcional. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Duración | Duración del intervalo para la actualización. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | Estrategia de equilibrio de carga. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Duración | Duración de tiempo después de la cual expira la propiedad de la partición. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Booleano | Si el procesador de eventos debe solicitar información sobre el último evento en cola en su partición asociada y realizar un seguimiento de esa información a medida que se reciben los eventos. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Entero | Recuento usado por el consumidor para controlar el número de eventos que el consumidor del centro de eventos recibirá y pondrá en cola de forma activa local. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Asignación con la clave como identificador de partición y valores de StartPositionProperties |
Mapa que contiene la posición del evento que se va a usar para cada partición si no existe un punto de control para la partición en el almacén de puntos de control. Este mapa se clave fuera del identificador de partición. |
Nota
La configuración de initial-partition-event-position
acepta un map
para especificar la posición inicial de cada centro de eventos. Por lo tanto, su clave es el identificador de partición y el valor es de StartPositionProperties
, que incluye propiedades de desplazamiento, número de secuencia, fecha y hora de puesta en cola y si incluyen. Por ejemplo, puede establecerlo como
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Configuración avanzada del consumidor
La conexión de anterior, punto de controly cliente común de Azure SDK la personalización de la configuración para cada consumidor de enlazador, que puede configurar con el prefijo spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
.
Propiedades del productor
Estas propiedades se exponen a través de EventHubsProducerProperties
.
Nota
Para evitar la repetición, dado que la versión 4.19.0 y 5.19.0, Spring Cloud Azure Stream Binder Event Hubs admite la configuración de valores para todos los canales, en el formato de spring.cloud.stream.eventhubs.default.producer.<property>=<value>
.
Propiedades configurables del productor de spring-cloud-azure-stream-binder-eventhubs:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | booleano | Marca de conmutador para la sincronización del productor. Si es true, el productor esperará una respuesta después de una operación de envío. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | largo | Cantidad de tiempo para esperar una respuesta después de una operación de envío. Solo surtirá efecto cuando un productor de sincronización esté habilitado. |
Configuración avanzada del productor
El de conexión de
Uso básico
Envío y recepción de mensajes desde y hacia Event Hubs
Rellene las opciones de configuración con información de credenciales.
Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Nota
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.
Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Defina proveedor y consumidor.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Compatibilidad con la creación de particiones
Se crea un PartitionSupplier
con información de partición proporcionada por el usuario para configurar la información de partición sobre el mensaje que se va a enviar. En el diagrama de flujo siguiente se muestra el proceso de obtención de diferentes prioridades para el identificador y la clave de partición:
Compatibilidad con consumidores de Batch
Proporcione las opciones de configuración por lotes, como se muestra en el ejemplo siguiente:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Defina proveedor y consumidor.
Para el modo de punto de comprobación como
BATCH
, puede usar el código siguiente para enviar mensajes y consumir en lotes.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Para el modo de punto de comprobación como
MANUAL
, puede usar el código siguiente para enviar mensajes y consumir o punto de control en lotes.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Nota
En el modo de consumo por lotes, el tipo de contenido predeterminado del enlazador de Spring Cloud Stream es application/json
, por lo que asegúrese de que la carga del mensaje esté alineada con el tipo de contenido. Por ejemplo, al usar el tipo de contenido predeterminado de application/json
para recibir mensajes con String
carga, la carga debe estar JSON String
, rodeada de comillas dobles para el texto de String
original. Aunque para text/plain
tipo de contenido, puede ser un objeto String
directamente. Para obtener más información, consulte negociación de tipos de contenido de Spring Cloud Stream.
Control de mensajes de error
Control de mensajes de error de enlace de salida
De forma predeterminada, Spring Integration crea un canal de error global denominado
errorChannel
. Configure el siguiente punto de conexión de mensaje para controlar los mensajes de error de enlace de salida.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Control de mensajes de error de enlace de entrada
Spring Cloud Stream Event Hubs Binder admite una solución para controlar los errores de los enlaces de mensajes entrantes: controladores de errores.
controlador de errores:
Spring Cloud Stream expone un mecanismo para proporcionar un controlador de errores personalizado agregando un
Consumer
que acepta instancias deErrorMessage
. Para obtener más información, consulte Controlar mensajes de error en la documentación de Spring Cloud Stream.Controlador de errores predeterminado de enlace
Configure un único
Consumer
bean para consumir todos los mensajes de error de enlace entrantes. La siguiente función predeterminada se suscribe a cada canal de error de enlace entrante:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
También debe establecer la propiedad
spring.cloud.stream.default.error-handler-definition
en el nombre de la función.Controlador de errores específico del enlace
Configure un
Consumer
bean para consumir los mensajes de error de enlace de entrada específicos. La siguiente función se suscribe al canal de error de enlace de entrada específico y tiene una prioridad más alta que el controlador de errores predeterminado de enlace:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
También debe establecer la propiedad
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
en el nombre de la función.
Encabezados de mensaje de Event Hubs
Para conocer los encabezados de mensaje básicos admitidos, consulte la sección encabezados de mensaje de Event Hubs de compatibilidad de Azure de Spring Cloud con Spring Integration.
Compatibilidad con varios enlazador
También se admite la conexión a varios espacios de nombres de Event Hubs mediante varios enlazadores. En este ejemplo se toma una cadena de conexión como ejemplo. También se admiten credenciales de entidades de servicio e identidades administradas. Puede establecer propiedades relacionadas en la configuración del entorno de cada enlazador.
Para usar varios enlazadores con Event Hubs, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Nota
El archivo de aplicación anterior muestra cómo configurar un único sondeo predeterminado para la aplicación en todos los enlaces. Si desea configurar el sondeo para un enlace específico, puede usar una configuración como
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.Necesitamos definir dos proveedores y dos consumidores:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Aprovisionamiento de recursos
El enlazador de Event Hubs admite el aprovisionamiento del centro de eventos y el grupo de consumidores, los usuarios podrían usar las siguientes propiedades para habilitar el aprovisionamiento.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Nota
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.
Muestras
Para más información, consulte el repositorio de azure-spring-boot-samples en GitHub.
Spring Cloud Stream Binder para Azure Service Bus
Conceptos clave
Spring Cloud Stream Binder para Azure Service Bus proporciona la implementación de enlace para Spring Cloud Stream Framework. Esta implementación usa adaptadores de canal de Spring Integration Service Bus en su base.
Mensaje programado
Este enlazador admite el envío de mensajes a un tema para el procesamiento retrasado. Los usuarios pueden enviar mensajes programados con encabezado x-delay
expresar en milisegundos un tiempo de retraso para el mensaje. El mensaje se entregará a los temas respectivos después de x-delay
milisegundos.
Grupo de consumidores
El tema de Service Bus proporciona compatibilidad similar al grupo de consumidores como Apache Kafka, pero con una lógica ligeramente diferente.
Este enlazador se basa en Subscription
de un tema para actuar como un grupo de consumidores.
Configuración de dependencias
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Como alternativa, también puede usar Spring Cloud Azure Stream Service Bus Starter, como se muestra en el ejemplo siguiente de Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Configuración
El enlazador proporciona las dos partes siguientes de las opciones de configuración:
Propiedades de configuración de conexión
Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Service Bus.
Nota
Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorizar el acceso con el identificador de Entra de Microsoft para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.
Propiedades configurables de conexión de spring-cloud-azure-stream-binder-servicebus:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.azure.servicebus.enabled | booleano | Indica si una instancia de Azure Service Bus está habilitada. |
spring.cloud.azure.servicebus.connection-string | Cuerda | Valor de cadena de conexión del espacio de nombres de Service Bus. |
spring.cloud.azure.servicebus.custom-endpoint-address | Cuerda | Dirección del punto de conexión personalizado que se va a usar al conectarse a Service Bus. |
espacio de nombres spring.cloud.azure.servicebus.namespace | Cuerda | Valor del espacio de nombres de Service Bus, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Cuerda | Nombre de dominio de un valor de espacio de nombres de Azure Service Bus. |
Nota
Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el enlazador de Spring Cloud Azure Stream Service Bus. Las opciones de configuración admitidas se presentan en configuración de Azure de Spring Cloudy se pueden configurar con el prefijo unificado spring.cloud.azure.
o el prefijo de spring.cloud.azure.servicebus.
.
El enlazador también admite Spring Podría Azure Resource Manager de forma predeterminada. Para obtener información sobre cómo recuperar la cadena de conexión con entidades de seguridad que no se conceden con roles relacionados con
Propiedades de configuración de enlace de Azure Service Bus
Las siguientes opciones se dividen en cuatro secciones: Propiedades del consumidor, Configuraciones avanzadas de consumidor, Propiedades del productor y Configuraciones avanzadas de productor.
Propiedades del consumidor
Estas propiedades se exponen a través de ServiceBusConsumerProperties
.
Nota
Para evitar la repetición, dado que la versión 4.19.0 y 5.19.0, Spring Cloud Azure Stream Binder Service Bus admite valores de configuración para todos los canales, en el formato de spring.cloud.stream.servicebus.default.consumer.<property>=<value>
.
Propiedades configurables del consumidor de spring-cloud-azure-stream-binder-servicebus:
Propiedad | Tipo | Predeterminado | Descripción |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | booleano | falso | Si los mensajes con errores se enrutan a dlq. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Entero | 1 | Número máximo de mensajes simultáneos que debe procesar el cliente del procesador de Service Bus. Cuando la sesión está habilitada, se aplica a cada sesión. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Entero | nulo | Número máximo de sesiones simultáneas que se van a procesar en un momento dado. |
spring.cloud.stream.servicebus.bindings.binding-name.consumerhabilitado para .session | Booleano | nulo | Indica si la sesión está habilitada. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Entero | 0 | Recuento de capturas previas del cliente de procesador de Service Bus. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | SubQueue | ninguno | Tipo de la sub cola a la que se va a conectar. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Duración | 5 m | Cantidad de tiempo para continuar la renovación automática del bloqueo. |
spring.cloud.stream.servicebus.bindings.binding-name.consumermodo de recepción | ServiceBusReceiveMode | peek_lock | Modo de recepción del cliente de procesador de Service Bus. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.autocompletar | Booleano | verdadero | Si se deben liquidar los mensajes automáticamente. Si se establece como false, se agregará un encabezado de mensaje de Checkpointer para permitir a los desarrolladores liquidar los mensajes manualmente. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes | long | 1024 | Tamaño máximo de la cola o tema en megabytes, que es el tamaño de la memoria asignada para la cola o tema. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Duración | P10675199DT2H48M5.4775807S. (10675199 días, 2 horas, 48 minutos, 5 segundos y 477 milisegundos) | Duración después de la cual expira el mensaje, comenzando desde cuándo se envía el mensaje a Service Bus. |
Importante
Al usar el azure Resource Manager (ARM), debe configurar la propiedad spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type
. Para obtener más información, consulte el ejemplo servicebus-queue-binder-arm en GitHub.
Configuración avanzada del consumidor
Los de conexión
Propiedades del productor
Estas propiedades se exponen a través de ServiceBusProducerProperties
.
Nota
Para evitar la repetición, dado que la versión 4.19.0 y 5.19.0, Spring Cloud Azure Stream Binder Service Bus admite valores de configuración para todos los canales, en el formato de spring.cloud.stream.servicebus.default.producer.<property>=<value>
.
Propiedades configurables del productor de spring-cloud-azure-stream-binder-servicebus:
Propiedad | Tipo | Predeterminado | Descripción |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | booleano | falso | Marca switch para la sincronización del productor. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | largo | 10000 | Valor de tiempo de espera para el envío del productor. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | nulo | Tipo de entidad de Service Bus del productor, necesario para el productor de enlace. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes | long | 1024 | Tamaño máximo de la cola o tema en megabytes, que es el tamaño de la memoria asignada para la cola o tema. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Duración | P10675199DT2H48M5.4775807S. (10675199 días, 2 horas, 48 minutos, 5 segundos y 477 milisegundos) | Duración después de la cual expira el mensaje, comenzando desde cuándo se envía el mensaje a Service Bus. |
Importante
Al usar el productor de enlaces, se requiere configurar la propiedad de spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
.
Configuración avanzada del productor
El de conexión de
Uso básico
Envío y recepción de mensajes desde o a Service Bus
Rellene las opciones de configuración con información de credenciales.
Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Nota
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.
Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Defina proveedor y consumidor.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Compatibilidad con claves de partición
El enlazador admite la creación de particiones de Service Bus, ya que permite establecer la clave de partición y el identificador de sesión en el encabezado del mensaje. En esta sección se presenta cómo establecer la clave de partición para los mensajes.
Spring Cloud Stream proporciona una propiedad de expresión spEL de clave de partición spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
. Por ejemplo, establezca esta propiedad como "'partitionKey-' + headers[<message-header-key>]"
y agregue un encabezado denominado message-header-key. Spring Cloud Stream usa el valor de este encabezado al evaluar la expresión para asignar una clave de partición. El código siguiente proporciona un productor de ejemplo:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Compatibilidad con sesiones
El enlazador admite sesiones de mensajes de Service Bus. El identificador de sesión de un mensaje se puede establecer a través del encabezado del mensaje.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Nota
Según creación de particiones de Service Bus, el identificador de sesión tiene una prioridad más alta que la clave de partición. Por lo tanto, cuando se establecen los encabezados ServiceBusMessageHeaders#SESSION_ID
y ServiceBusMessageHeaders#PARTITION_KEY
, el valor del identificador de sesión se usa finalmente para sobrescribir el valor de la clave de partición.
Control de mensajes de error
Control de mensajes de error de enlace de salida
De forma predeterminada, Spring Integration crea un canal de error global denominado
errorChannel
. Configure el siguiente punto de conexión de mensaje para controlar el mensaje de error de enlace de salida.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Control de mensajes de error de enlace de entrada
Spring Cloud Stream Service Bus Binder admite dos soluciones para controlar los errores de los enlaces de mensajes entrantes: el controlador de errores del enlazador y los controladores.
controlador de errores del enlazador:
El controlador de errores del enlazador predeterminado controla el enlace de entrada. Este controlador se usa para enviar mensajes con errores a la cola de mensajes fallidos cuando se habilita
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
. De lo contrario, los mensajes con error se abandonan. El controlador de errores del enlazador se excluye mutuamente con otros controladores de errores proporcionados.controlador de errores:
Spring Cloud Stream expone un mecanismo para proporcionar un controlador de errores personalizado agregando un
Consumer
que acepta instancias deErrorMessage
. Para obtener más información, consulte Controlar mensajes de error en la documentación de Spring Cloud Stream.Controlador de errores predeterminado de enlace
Configure un único
Consumer
bean para consumir todos los mensajes de error de enlace entrantes. La siguiente función predeterminada se suscribe a cada canal de error de enlace entrante:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
También debe establecer la propiedad
spring.cloud.stream.default.error-handler-definition
en el nombre de la función.Controlador de errores específico del enlace
Configure un
Consumer
bean para consumir los mensajes de error de enlace de entrada específicos. La función siguiente se suscribe al canal de error de enlace de entrada específico con una prioridad más alta que el controlador de errores predeterminado de enlace.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
También debe establecer la propiedad
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
en el nombre de la función.
Encabezados de mensaje de Service Bus
Para conocer los encabezados de mensaje básicos admitidos, consulte la sección encabezados de mensaje de Service Bus de compatibilidad de Spring Cloud con Azure para Spring Integration.
Nota
Al establecer la clave de partición, la prioridad del encabezado del mensaje es mayor que la propiedad Spring Cloud Stream. Por lo tanto, spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
surte efecto solo cuando no se configura ninguno de los encabezados ServiceBusMessageHeaders#SESSION_ID
y ServiceBusMessageHeaders#PARTITION_KEY
.
Compatibilidad con varios enlazador
También se admite la conexión a varios espacios de nombres de Service Bus mediante varios enlazadores. En este ejemplo se toma la cadena de conexión como ejemplo. También se admiten credenciales de entidades de servicio e identidades administradas, los usuarios pueden establecer propiedades relacionadas en la configuración del entorno de cada enlazador.
Para usar varios enlazadores de ServiceBus, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Nota
El archivo de aplicación anterior muestra cómo configurar un único sondeo predeterminado para la aplicación en todos los enlaces. Si desea configurar el sondeo para un enlace específico, puede usar una configuración como
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.necesitamos definir dos proveedores y dos consumidores
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Aprovisionamiento de recursos
El enlazador de Service Bus admite el aprovisionamiento de colas, temas y suscripciones, los usuarios podrían usar las siguientes propiedades para habilitar el aprovisionamiento.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Nota
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre cómo convertir la aplicación de un solo inquilino, consulte Convertir aplicación de un solo inquilino en multiinquilino en microsoft Entra ID.
Personalización de las propiedades del cliente de Service Bus
Los desarrolladores pueden usar AzureServiceClientBuilderCustomizer
para personalizar las propiedades del cliente de Service Bus. En el ejemplo siguiente se personaliza la propiedad sessionIdleTimeout
en ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Muestras
Para más información, consulte el repositorio de azure-spring-boot-samples en GitHub.