Spring Cloud Soporte técnico de Azure for Spring Cloud Stream
Este artículo se aplica a: ✔️ Versión 4.14.0 ✔️ versión 5.8.0
Spring Cloud Stream es un marco para crear microservicios basados en eventos altamente escalables conectados con sistemas de mensajería compartidos.
El marco proporciona un modelo de programación flexible basado en expresiones y procedimientos recomendados de Spring ya establecidos y conocidos. Estos procedimientos recomendados incluyen compatibilidad con la semántica persistente de pub/sub, grupos de consumidores y particiones con estado.
Las implementaciones actuales del enlazador incluyen:
spring-cloud-azure-stream-binder-eventhubs
: para más información, consulte Spring Cloud Stream Binder para Azure Event Hubs.spring-cloud-azure-stream-binder-servicebus
: para más información, consulte Spring Cloud Stream Binder para Azure Service Bus.
Spring Cloud Stream Binder para Azure Event Hubs
Conceptos clave
Spring Cloud Stream Binder para Azure Event Hubs proporciona la implementación de enlace para el marco de Spring Cloud Stream. Esta implementación usa adaptadores de canal de Spring Integration Event Hubs en su base. Desde la perspectiva del diseño, Event Hubs es similar a Kafka. Además, se puede acceder a Event Hubs a través de la API de Kafka. Si el proyecto tiene una dependencia estrecha en la API de Kafka, puede probar el centro de eventos con el ejemplo de API de Kafka.
Grupo de consumidores
Event Hubs proporciona compatibilidad similar del grupo de consumidores como Apache Kafka, pero con una lógica ligeramente diferente. Aunque Kafka almacena todos los desplazamientos confirmados en el agente, debe almacenar los desplazamientos de los mensajes de Event Hubs que se procesan manualmente. El SDK de Event Hubs proporciona la función para almacenar estos desplazamientos dentro de Azure Storage.
Compatibilidad con particiones
Event Hubs proporciona un concepto similar de partición física como Kafka. Pero a diferencia del reequilibrio automático de Kafka entre consumidores y particiones, Event Hubs proporciona un tipo de modo preferente. La cuenta de almacenamiento actúa como concesión para determinar qué consumidor posee la partición. Cuando se inicia un nuevo consumidor, intenta robar algunas particiones de los consumidores más cargados para lograr el equilibrio de carga de trabajo.
Para especificar la estrategia de equilibrio de carga, se proporcionan las propiedades de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
. Para obtener más información, consulte la sección Propiedades del consumidor.
Compatibilidad con consumidores de Batch
El enlazador de Event Hubs de Azure Stream de Spring Cloud admite la característica consumidor de Batch de Spring Cloud Stream.
Para trabajar con el modo de consumidor por lotes, establezca la spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
propiedad true
en . Cuando está habilitado, se recibe un mensaje con una carga de una lista de eventos por lotes y se pasa a la Consumer
función . Cada encabezado de mensaje también se convierte en una lista, de la que el contenido es el valor de encabezado asociado analizado de cada evento. Los encabezados comunes del identificador de partición, el punto de control y las últimas propiedades en cola se presentan como un valor único porque todo el lote de eventos comparte el mismo valor. Para obtener más información, consulte la sección Encabezados de mensajes de Event Hubs de Spring Cloud Soporte técnico de Azure for Spring Integration.
Nota:
El encabezado de punto de control solo existe cuando se usa el modo de MANUAL
punto de control.
El punto de comprobación del consumidor por lotes admite dos modos: BATCH
y MANUAL
. BATCH
mode es un modo de control automático para controlar todo el lote de eventos una vez que el enlazador los recibe. MANUAL
el modo consiste en controlar los eventos por parte de los usuarios. Cuando se usa, Checkpointer
se pasa al encabezado del mensaje y los usuarios pueden usarlo para realizar puntos de control.
Puede especificar el tamaño del lote estableciendo las max-size
propiedades y max-wait-time
que tienen un prefijo de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
. La max-size
propiedad es necesaria y la max-wait-time
propiedad es opcional. Para obtener más información, consulte la sección Propiedades del consumidor.
Configuración de dependencias
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Como alternativa, también puede usar Spring Cloud Azure Stream Event Hubs Starter, como se muestra en el ejemplo siguiente de Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Configuración
El enlazador proporciona las tres partes siguientes de las opciones de configuración:
propiedades de configuración de Conectar ion
Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Event Hubs.
Nota:
Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorización del acceso con el identificador de Microsoft Entra para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.
Conectar las propiedades configurables de spring-cloud-azure-stream-binder-eventhubs:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolean | Si una instancia de Azure Event Hubs está habilitada. |
spring.cloud.azure.eventhubs.connection-string | Cadena | Espacio de nombres de Event Hubs cadena de conexión valor. |
spring.cloud.azure.eventhubs.namespace | Cadena | Valor del espacio de nombres de Event Hubs, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Cadena | Nombre de dominio de un valor de espacio de nombres de Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Cadena | Dirección del punto de conexión personalizado. |
Sugerencia
Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el enlazador de Azure Stream Event Hubs de Spring Cloud. Las opciones de configuración admitidas se presentan en la configuración de Azure de Spring Cloud y se pueden configurar con el prefijo spring.cloud.azure.
unificado o el prefijo de spring.cloud.azure.eventhubs.
.
El enlazador también admite Spring Could Azure Resource Manager de forma predeterminada. Para obtener información sobre cómo recuperar el cadena de conexión con entidades de seguridad que no se conceden con Data
roles relacionados, consulte la sección Uso básico de Spring Could Azure Resource Manager.
Propiedades de configuración del punto de control
Esta sección contiene las opciones de configuración del servicio Storage Blobs, que se usa para conservar la propiedad de la partición y la información del punto de control.
Nota:
Desde la versión 4.0.0, cuando la propiedad de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists no está habilitada manualmente, no se creará ningún contenedor de almacenamiento automáticamente con el nombre de spring.cloud.stream.bindings.binding-name.destination.
Propiedades configurables de puntos de control de spring-cloud-azure-stream-binder-eventhubs:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booleano | Si quiere permitir la creación de contenedores si no existe. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Cadena | Nombre de la cuenta de almacenamiento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Cadena | Clave de acceso de la cuenta de almacenamiento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Cadena | Nombre del contenedor de almacenamiento. |
Sugerencia
Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el almacén de puntos de control de Blob de Storage. Las opciones de configuración admitidas se presentan en la configuración de Azure de Spring Cloud y se pueden configurar con el prefijo spring.cloud.azure.
unificado o el prefijo de spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Propiedades de configuración de enlace de Azure Event Hubs
Las siguientes opciones se dividen en cuatro secciones: Propiedades del consumidor, Configuraciones avanzadas de consumidor, Propiedades del productor y Configuraciones avanzadas de productor.
Propiedades del consumidor
Estas propiedades se exponen a través de EventHubsConsumerProperties
.
Propiedades configurables del consumidor de spring-cloud-azure-stream-binder-eventhubs:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | Modo de punto de control usado cuando el consumidor decide cómo controlar el mensaje |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Entero | Decide la cantidad de mensaje de cada partición para realizar un punto de control. Solo surtirá efecto cuando PARTITION_COUNT se use el modo de punto de control. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Duration | Decide el intervalo de tiempo para realizar un punto de control. Solo surtirá efecto cuando TIME se use el modo de punto de control. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | Entero | Número máximo de eventos en un lote. Necesario para el modo de consumidor por lotes. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Duration | Duración máxima de tiempo para el consumo por lotes. Solo surtirá efecto cuando el modo de consumidor por lotes esté habilitado y sea opcional. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Duration | Duración del intervalo para la actualización. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | Estrategia de equilibrio de carga. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Duration | Duración de tiempo después de la cual expira la propiedad de la partición. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Booleano | Si el procesador de eventos debe solicitar información sobre el último evento en cola en su partición asociada y realizar un seguimiento de esa información a medida que se reciben los eventos. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Entero | Recuento usado por el consumidor para controlar el número de eventos que el consumidor del centro de eventos recibirá y pondrá en cola de forma activa local. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Asignación con la clave como identificador de partición y valores de StartPositionProperties |
Mapa que contiene la posición del evento que se va a usar para cada partición si no existe un punto de control para la partición en el almacén de puntos de control. Este mapa se clave fuera del identificador de partición. |
Nota:
La initial-partition-event-position
configuración acepta para map
especificar la posición inicial de cada centro de eventos. Por lo tanto, su clave es el identificador de partición, y el valor es de StartPositionProperties
, que incluye propiedades de desplazamiento, número de secuencia, fecha y hora de puesta en cola y si son inclusivos. Por ejemplo, puede establecerlo como
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Configuración avanzada del consumidor
La conexión anterior, el punto de control y la configuración común del cliente del SDK de Azure admiten la personalización de cada consumidor de enlazador, que puede configurar con el prefijo spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
.
Propiedades del productor
Estas propiedades se exponen a través de EventHubsProducerProperties
.
Propiedades configurables del productor de spring-cloud-azure-stream-binder-eventhubs:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | boolean | Marca de conmutador para la sincronización del productor. Si es true, el productor esperará una respuesta después de una operación de envío. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | long | Cantidad de tiempo para esperar una respuesta después de una operación de envío. Solo surtirá efecto cuando un productor de sincronización esté habilitado. |
Configuración avanzada del productor
La conexión anterior y la configuración de cliente común del SDK de Azure admiten la personalización de cada productor de enlazador, que puede configurar con el prefijo spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
.
Uso básico
Envío y recepción de mensajes desde y hacia Event Hubs
Rellene las opciones de configuración con información de credenciales.
Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Nota:
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.
Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Defina proveedor y consumidor.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Compatibilidad con particiones
Se crea un PartitionSupplier
objeto con información de partición proporcionada por el usuario para configurar la información de partición sobre el mensaje que se va a enviar. En el diagrama de flujo siguiente se muestra el proceso de obtención de diferentes prioridades para el identificador y la clave de partición:
Compatibilidad con consumidores de Batch
Proporcione las opciones de configuración por lotes, como se muestra en el ejemplo siguiente:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Defina proveedor y consumidor.
Para el modo de punto de control como
BATCH
, puede usar el código siguiente para enviar mensajes y consumir en lotes.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Para el modo de punto de comprobación como
MANUAL
, puede usar el código siguiente para enviar mensajes y consumir o punto de control en lotes.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Nota:
En el modo de consumo por lotes, el tipo de contenido predeterminado del enlazador de Spring Cloud Stream es application/json
, por lo que asegúrese de que la carga del mensaje esté alineada con el tipo de contenido. Por ejemplo, cuando se usa el tipo de contenido predeterminado de application/json
para recibir mensajes con String
carga, la carga debe estar JSON String
rodeada de comillas dobles para el texto original String
. Mientras que para text/plain
el tipo de contenido, puede ser un String
objeto directamente. Para más información, consulte Negociación de tipos de contenido de Spring Cloud Stream.
Control de los mensajes de error
Control de mensajes de error de enlace de salida
De forma predeterminada, Spring Integration crea un canal de error global denominado
errorChannel
. Configure el siguiente punto de conexión de mensaje para controlar los mensajes de error de enlace salientes:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Control de mensajes de error de enlace de entrada
Spring Cloud Stream Event Hubs Binder admite dos soluciones para controlar los errores de los enlaces de mensajes entrantes: canales de error y controladores personalizados.
Canal de error:
Spring Cloud Stream proporciona un canal de error para cada enlace entrante.
ErrorMessage
Se envía al canal de error. Para más información, consulte Control de errores en la documentación de Spring Cloud Stream.Canal de error predeterminado
Puede usar un canal de error global denominado
errorChannel
para consumir todos los mensajes de error de enlace entrantes. Para controlar estos mensajes, configure el siguiente punto de conexión de mensaje:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Canal de error específico del enlace
Puede usar un canal de error específico para consumir los mensajes de error de enlace entrante específicos con una prioridad más alta que el canal de error predeterminado. Para controlar estos mensajes, configure el siguiente punto de conexión de mensaje:
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Nota:
El canal de error específico del enlace se excluye mutuamente con otros controladores de errores y canales proporcionados.
Controlador de errores:
Spring Cloud Stream expone un mecanismo para proporcionar un controlador de errores personalizado mediante la adición de un
Consumer
que aceptaErrorMessage
instancias. Para obtener más información, consulte Control de errores en la documentación de Spring Cloud Stream.Nota:
Cuando se configura cualquier controlador de errores de enlace, puede funcionar con el canal de error predeterminado.
Controlador de errores predeterminado de enlace
Configure un único
Consumer
bean para consumir todos los mensajes de error de enlace entrantes. La siguiente función predeterminada se suscribe a cada canal de error de enlace entrante:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
También debe establecer la
spring.cloud.stream.default.error-handler-definition
propiedad en el nombre de la función.Controlador de errores específico del enlace
Configure un
Consumer
bean para consumir los mensajes de error de enlace de entrada específicos. La siguiente función se suscribe al canal de error de enlace de entrada específico y tiene una prioridad más alta que el controlador de errores predeterminado de enlace:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
También debe establecer la
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
propiedad en el nombre de la función.
Encabezados de mensaje de Event Hubs
Para conocer los encabezados de mensaje básicos admitidos, consulte la sección Encabezados de mensaje de Event Hubs de Spring Cloud Soporte técnico de Azure for Spring Integration.
Compatibilidad con varios enlazador
Conectar ion a varios espacios de nombres de Event Hubs también se admite mediante varios enlazadores. En este ejemplo se toma un cadena de conexión como ejemplo. También se admiten credenciales de entidades de servicio e identidades administradas. Puede establecer propiedades relacionadas en la configuración del entorno de cada enlazador.
Para usar varios enlazadores con Event Hubs, configure las siguientes propiedades en el archivo application.yml :
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Nota:
El archivo de aplicación anterior muestra cómo configurar un único sondeo predeterminado para la aplicación en todos los enlaces. Si desea configurar el sondeo para un enlace específico, puede usar una configuración como
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.Necesitamos definir dos proveedores y dos consumidores:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Aprovisionamiento de recursos
El enlazador de Event Hubs admite el aprovisionamiento del centro de eventos y el grupo de consumidores, los usuarios podrían usar las siguientes propiedades para habilitar el aprovisionamiento.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Nota:
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.
Ejemplos
Para más información, consulte el repositorio azure-spring-boot-samples en GitHub.
Spring Cloud Stream Binder para Azure Service Bus
Conceptos clave
Spring Cloud Stream Binder para Azure Service Bus proporciona la implementación de enlace para Spring Cloud Stream Framework. Esta implementación usa adaptadores de canal de Spring Integration Service Bus en su base.
Mensaje programado
Este enlazador admite el envío de mensajes a un tema para el procesamiento retrasado. Los usuarios pueden enviar mensajes programados con encabezado x-delay
que expresa en milisegundos un tiempo de retraso para el mensaje. El mensaje se entregará a los temas respectivos después x-delay
de milisegundos.
Grupo de consumidores
El tema de Service Bus proporciona compatibilidad similar al grupo de consumidores como Apache Kafka, pero con una lógica ligeramente diferente.
Este enlazador se basa en Subscription
un tema para actuar como un grupo de consumidores.
Configuración de dependencias
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Como alternativa, también puede usar Spring Cloud Azure Stream Service Bus Starter, como se muestra en el ejemplo siguiente de Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Configuración
El enlazador proporciona las dos partes siguientes de las opciones de configuración:
propiedades de configuración de Conectar ion
Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Service Bus.
Nota:
Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorización del acceso con el identificador de Microsoft Entra para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.
Conectar las propiedades configurables de spring-cloud-azure-stream-binder-servicebus:
Propiedad | Tipo | Descripción |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolean | Indica si una instancia de Azure Service Bus está habilitada. |
spring.cloud.azure.servicebus.connection-string | Cadena | Espacio de nombres de Service Bus cadena de conexión valor. |
espacio de nombres spring.cloud.azure.servicebus.namespace | Cadena | Valor del espacio de nombres de Service Bus, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Cadena | Nombre de dominio de un valor de espacio de nombres de Azure Service Bus. |
Nota:
Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el enlazador de Spring Cloud Azure Stream Service Bus. Las opciones de configuración admitidas se presentan en la configuración de Azure de Spring Cloud y se pueden configurar con el prefijo spring.cloud.azure.
unificado o el prefijo de spring.cloud.azure.servicebus.
.
El enlazador también admite Spring Could Azure Resource Manager de forma predeterminada. Para obtener información sobre cómo recuperar el cadena de conexión con entidades de seguridad que no se conceden con Data
roles relacionados, consulte la sección Uso básico de Spring Could Azure Resource Manager.
Propiedades de configuración de enlace de Azure Service Bus
Las siguientes opciones se dividen en cuatro secciones: Propiedades del consumidor, Configuraciones avanzadas de consumidor, Propiedades del productor y Configuraciones avanzadas de productor.
Propiedades del consumidor
Estas propiedades se exponen a través de ServiceBusConsumerProperties
.
Propiedades configurables del consumidor de spring-cloud-azure-stream-binder-servicebus:
Propiedad | Tipo | Valor predeterminado | Descripción |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | boolean | false | Si los mensajes con errores se enrutan a dlq. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Entero | 1 | Número máximo de mensajes simultáneos que debe procesar el cliente del procesador de Service Bus. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Entero | nulo | Número máximo de sesiones simultáneas que se van a procesar en un momento dado. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Booleano | nulo | Indica si la sesión está habilitada. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Entero | 0 | Recuento de capturas previas del cliente de procesador de Service Bus. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | SubQueue | None | Tipo de la sub cola a la que se va a conectar. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Duration | 5 m | Cantidad de tiempo para continuar la renovación automática del bloqueo. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Modo de recepción del cliente de procesador de Service Bus. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.autocompletar | Boolean | true | Si se deben liquidar los mensajes automáticamente. Si se establece como false, se agregará un encabezado de mensaje de Checkpointer para permitir a los desarrolladores liquidar los mensajes manualmente. |
Configuración avanzada del consumidor
La conexión anterior y la configuración de cliente común del SDK de Azure admiten la personalización de cada consumidor de enlazador, que puede configurar con el prefijo spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
.
Propiedades del productor
Estas propiedades se exponen a través de ServiceBusProducerProperties
.
Propiedades configurables del productor de spring-cloud-azure-stream-binder-servicebus:
Propiedad | Tipo | Valor predeterminado | Descripción |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | boolean | false | Marca switch para la sincronización del productor. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | long | 10 000 | Valor de tiempo de espera para el envío del productor. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | nulo | Tipo de entidad de Service Bus del productor, necesario para el productor de enlace. |
Importante
Al usar el productor de enlace, se requiere configurar la propiedad de spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
.
Configuración avanzada del productor
La conexión anterior y la configuración de cliente común del SDK de Azure admiten la personalización de cada productor de enlazador, que puede configurar con el prefijo spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
.
Uso básico
Envío y recepción de mensajes desde o a Service Bus
Rellene las opciones de configuración con información de credenciales.
Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Nota:
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.
Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Defina proveedor y consumidor.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Compatibilidad con claves de partición
El enlazador admite la creación de particiones de Service Bus al permitir establecer la clave de partición y el identificador de sesión en el encabezado del mensaje. En esta sección se presenta cómo establecer la clave de partición para los mensajes.
Spring Cloud Stream proporciona una propiedad spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
de expresión spEL de clave de partición . Por ejemplo, establezca esta propiedad como "'partitionKey-' + headers[<message-header-key>]"
y agregue un encabezado denominado message-header-key. Spring Cloud Stream usa el valor de este encabezado al evaluar la expresión para asignar una clave de partición. El código siguiente proporciona un productor de ejemplo:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Compatibilidad con sesiones
El enlazador admite sesiones de mensajes de Service Bus. El identificador de sesión de un mensaje se puede establecer a través del encabezado del mensaje.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Nota:
Según la creación de particiones de Service Bus, el identificador de sesión tiene mayor prioridad que la clave de partición. Por lo tanto, cuando se establecen los ServiceBusMessageHeaders#SESSION_ID
encabezados y ServiceBusMessageHeaders#PARTITION_KEY
, el valor del identificador de sesión se usa finalmente para sobrescribir el valor de la clave de partición.
Control de los mensajes de error
Control de mensajes de error de enlace de salida
De forma predeterminada, Spring Integration crea un canal de error global denominado
errorChannel
. Configure el siguiente punto de conexión de mensaje para controlar el mensaje de error de enlace de salida.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Control de mensajes de error de enlace de entrada
Spring Cloud Stream Service Bus Binder admite tres soluciones para controlar los errores de los enlaces de mensajes entrantes: el controlador de errores del enlazador, los canales de error personalizados y los controladores.
Controlador de errores del enlazador:
El controlador de errores del enlazador predeterminado controla el enlace de entrada. Este controlador se usa para enviar mensajes con errores a la cola de mensajes fallidos cuando
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
está habilitado. De lo contrario, los mensajes con error se abandonan. Excepto para configurar el canal de error específico del enlace, el controlador de errores del enlazador siempre surte efecto independientemente de si hay otros controladores de errores personalizados o canales.Canal de error:
Spring Cloud Stream proporciona un canal de error para cada enlace entrante.
ErrorMessage
Se envía al canal de error. Para más información, consulte Control de errores en la documentación de Spring Cloud Stream.Canal de error predeterminado
Puede usar un canal de error global denominado
errorChannel
para consumir todos los mensajes de error de enlace entrantes. Para controlar estos mensajes, configure el siguiente punto de conexión de mensaje:@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Canal de error específico del enlace
Puede usar un canal de error específico para consumir los mensajes de error de enlace entrante específicos con una prioridad más alta que el canal de error predeterminado. Para controlar estos mensajes, configure el siguiente punto de conexión de mensaje:
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Nota:
El canal de error específico del enlace se excluye mutuamente con otros controladores de errores y canales proporcionados.
Controlador de errores:
Spring Cloud Stream expone un mecanismo para proporcionar un controlador de errores personalizado mediante la adición de un
Consumer
que aceptaErrorMessage
instancias. Para obtener más información, consulte Control de errores en la documentación de Spring Cloud Stream.Nota:
Cuando se configura cualquier controlador de errores de enlace, puede funcionar con el canal de error predeterminado y el controlador de errores del enlazador.
Controlador de errores predeterminado de enlace
Configure un único
Consumer
bean para consumir todos los mensajes de error de enlace entrantes. La siguiente función predeterminada se suscribe a cada canal de error de enlace entrante:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
También debe establecer la
spring.cloud.stream.default.error-handler-definition
propiedad en el nombre de la función.Controlador de errores específico del enlace
Configure un
Consumer
bean para consumir los mensajes de error de enlace de entrada específicos. La función siguiente se suscribe al canal de error de enlace de entrada específico con una prioridad más alta que el controlador de errores predeterminado de enlace.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
También debe establecer la
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
propiedad en el nombre de la función.
Encabezados de mensaje de Service Bus
Para ver los encabezados de mensaje básicos admitidos, consulte la sección Encabezados de mensaje de Service Bus de Spring Cloud Soporte técnico de Azure for Spring Integration.
Nota:
Al establecer la clave de partición, la prioridad del encabezado del mensaje es mayor que la propiedad Spring Cloud Stream. Por lo tanto spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
, solo surte efecto cuando no se configura ninguno de los ServiceBusMessageHeaders#SESSION_ID
encabezados y ServiceBusMessageHeaders#PARTITION_KEY
.
Compatibilidad con varios enlazador
Conectar ion a varios espacios de nombres de Service Bus también se admite mediante el uso de varios enlazadores. Este ejemplo toma cadena de conexión como ejemplo. También se admiten credenciales de entidades de servicio e identidades administradas, los usuarios pueden establecer propiedades relacionadas en la configuración del entorno de cada enlazador.
Para usar varios enlazadores de ServiceBus, configure las siguientes propiedades en el archivo application.yml :
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Nota:
El archivo de aplicación anterior muestra cómo configurar un único sondeo predeterminado para la aplicación en todos los enlaces. Si desea configurar el sondeo para un enlace específico, puede usar una configuración como
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.necesitamos definir dos proveedores y dos consumidores
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Aprovisionamiento de recursos
El enlazador de Service Bus admite el aprovisionamiento de colas, temas y suscripciones, los usuarios podrían usar las siguientes propiedades para habilitar el aprovisionamiento.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Nota:
Los valores permitidos para tenant-id
son: common
, organizations
, consumers
o el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.
Ejemplos
Para más información, consulte el repositorio azure-spring-boot-samples en GitHub.