Suporte do Spring Cloud Azure para o Spring Cloud Stream
Este artigo aplica-se a:✅ Versão 4.19.0 ✅ Versão 5.19.0
O Spring Cloud Stream é uma estrutura para a criação de microsserviços altamente escaláveis orientados a eventos conectados com sistemas de mensagens compartilhados.
A estrutura fornece um modelo de programação flexível baseado em expressões idiomáticas e melhores práticas Spring já estabelecidas e familiares. Essas práticas recomendadas incluem suporte para semântica pub/sub persistente, grupos de consumidores e partições com monitoração de estado.
As implementações atuais do fichário incluem:
-
spring-cloud-azure-stream-binder-eventhubs
- para obter mais informações, consulte Spring Cloud Stream Binder for Azure Event Hubs -
spring-cloud-azure-stream-binder-servicebus
- para obter mais informações, consulte Spring Cloud Stream Binder for Azure Service Bus
Spring Cloud Stream Binder para Hubs de Eventos do Azure
Conceitos-chave
O Spring Cloud Stream Binder para Hubs de Eventos do Azure fornece a implementação de vinculação para a estrutura do Spring Cloud Stream. Esta implementação usa os adaptadores de canal do Spring Integration Event Hubs em sua base. Do ponto de vista do design, os Hubs de Eventos são semelhantes a Kafka. Além disso, os Hubs de Eventos podem ser acessados via Kafka API. Se o seu projeto tiver dependência total da API Kafka, você pode tentar Hub de Eventos com a API Kafka Sample
Grupo de consumidores
Os Hubs de Eventos fornecem suporte semelhante ao grupo de consumidores do Apache Kafka, mas com uma lógica ligeiramente diferente. Enquanto Kafka armazena todos os deslocamentos confirmados no broker, você precisa armazenar offsets de mensagens de Hubs de Eventos sendo processadas manualmente. O SDK dos Hubs de Eventos fornece a função para armazenar esses deslocamentos dentro do Armazenamento do Azure.
Suporte de particionamento
Os Hubs de Eventos fornecem um conceito de partição física semelhante ao Kafka. Mas, ao contrário do reequilíbrio automático entre consumidores e partições de Kafka, os Hubs de Eventos fornecem uma espécie de modo preventivo. A conta de armazenamento atua como uma locação para determinar qual consumidor possui qual partição. Quando um novo consumidor começa, ele tenta roubar algumas partições dos consumidores mais carregados para alcançar o equilíbrio da carga de trabalho.
Para especificar a estratégia de balanceamento de carga, as propriedades de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
são fornecidas. Para obter mais informações, consulte a seção propriedades do consumidor
Suporte ao consumidor em lote
O fichário do Spring Cloud Azure Stream Event Hubs suporta recurso Spring Cloud Stream Batch Consumer.
Para trabalhar com o modo batch-consumer, defina a propriedade spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
como true
. Quando habilitada, uma mensagem com uma carga de uma lista de eventos em lote é recebida e passada para a função Consumer
. Cada cabeçalho de mensagem também é convertido em uma lista, cujo conteúdo é o valor de cabeçalho associado analisado de cada evento. Os cabeçalhos comuns de ID de partição, ponteiro de verificação e últimas propriedades enfileiradas são apresentados como um único valor porque todo o lote de eventos compartilha o mesmo valor. Para obter mais informações, consulte a seção cabeçalhos de mensagem dos Hubs de Eventos de suporte do Spring Cloud Azure para o Spring Integration.
Observação
O cabeçalho do ponto de verificação só existe quando o modo de ponto de verificação MANUAL
é usado.
O ponto de verificação do consumidor em lote suporta dois modos: BATCH
e MANUAL
.
BATCH
modo é um modo de ponto de verificação automático para verificar todo o lote de eventos juntos assim que o fichário os recebe.
MANUAL
modo é verificar os eventos pelos usuários. Quando usado, o Checkpointer
é passado para o cabeçalho da mensagem e os usuários podem usá-lo para fazer checkpointing.
Você pode especificar o tamanho do lote definindo as propriedades max-size
e max-wait-time
que têm um prefixo de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
. A propriedade max-size
é necessária e a propriedade max-wait-time
é opcional. Para obter mais informações, consulte a seção propriedades do consumidor
Configuração de dependência
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Como alternativa, você também pode usar o Spring Cloud Azure Stream Event Hubs Starter, conforme mostrado no exemplo a seguir para o Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Configuração
O fichário fornece as seguintes três partes das opções de configuração:
Propriedades de configuração de conexão
Esta seção contém as opções de configuração usadas para se conectar aos Hubs de Eventos do Azure.
Observação
Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar acesso com o Microsoft Entra ID para verificar se a entidade de segurança recebeu a permissão suficiente para acessar o recurso do Azure.
Propriedades configuráveis de conexão de spring-cloud-azure-stream-binder-eventhubs:
Propriedade | Tipo | Descrição |
---|---|---|
spring.cloud.azure.eventhubs.enabled | Booleano | Se um Hubs de Eventos do Azure está habilitado. |
spring.cloud.azure.eventhubs.connection-string | String | Valor da cadeia de conexão de namespace de Hubs de Eventos. |
spring.cloud.azure.eventhubs.namespace | String | Valor de namespace de Hubs de Eventos, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | String | Nome de domínio de um valor de Namespace de Hubs de Eventos do Azure. |
spring.cloud.azure.eventhubs.custom-endpoint-address | String | Endereço de ponto final personalizado. |
Dica
As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o fichário dos Hubs de Eventos do Azure Stream do Spring Cloud. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure.
ou o prefixo de spring.cloud.azure.eventhubs.
.
O fichário também dá suporte a Spring Could Azure Resource Manager por padrão. Para saber como recuperar a cadeia de conexão com entidades de segurança que não são concedidas com Data
funções relacionadas, consulte a seção de uso básico de Spring Could Azure Resource Manager.
Propriedades de configuração do ponto de verificação
Esta seção contém as opções de configuração para o serviço de Blobs de Armazenamento, que é usado para manter a propriedade da partição e as informações do ponto de verificação.
Observação
A partir da versão 4.0.0, quando a propriedade de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists não estiver habilitada manualmente, nenhum contêiner de armazenamento será criado automaticamente com o nome de spring.cloud.stream.bindings.binding-name.destination.
Propriedades configuráveis de ponto de verificação de spring-cloud-azure-stream-binder-eventhubs:
Propriedade | Tipo | Descrição |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booleano | Se a criação de contêineres não existe, deve ser permitida. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | String | Nome da conta de armazenamento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | String | Chave de acesso da conta de armazenamento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | String | Nome do recipiente de armazenamento. |
Dica
As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o armazenamento de pontos de verificação de Blob de Armazenamento. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure.
ou o prefixo de spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Propriedades de configuração de vinculação dos Hubs de Eventos do Azure
As opções a seguir estão divididas em quatro seções: Propriedades do consumidor, Configurações avançadas do consumidor, Propriedades do produtor e Configurações avançadas do produtor.
Propriedades de consumo
Estas propriedades são expostas através de EventHubsConsumerProperties
.
Observação
Para evitar repetições, desde as versões 4.19.0 e 5.19.0, o Spring Cloud Azure Stream Binder Event Hubs oferece suporte à configuração de valores para todos os canais, no formato de spring.cloud.stream.eventhubs.default.consumer.<property>=<value>
.
Propriedades configuráveis pelo consumidor de spring-cloud-azure-stream-binder-eventhubs:
Propriedade | Tipo | Descrição |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | Modo de ponto de verificação usado quando o consumidor decide como enviar a mensagem de ponto de verificação |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Inteiro | Decide a quantidade de mensagem para cada partição para fazer um ponto de verificação. Entrará em vigor somente quando PARTITION_COUNT modo de ponto de verificação for usado. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Duração | Decide o intervalo de tempo para fazer um ponto de verificação. Entrará em vigor somente quando TIME modo de ponto de verificação for usado. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | Inteiro | O número máximo de eventos em um lote. Necessário para o modo batch-consumer. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-tempo de espera | Duração | A duração máxima do consumo em lote. Entrará em vigor somente quando o modo batch-consumer estiver ativado e for opcional. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Duração | A duração do intervalo de tempo para atualização. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | A estratégia de balanceamento de carga. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Duração | A duração do tempo após o qual a propriedade da partição expira. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumerpropriedades .track-last-enqueued-event-properties | Booleano | Se o processador de eventos deve solicitar informações sobre o último evento enfileirado em sua partição associada e controlar essas informações à medida que os eventos são recebidos. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Inteiro | A contagem usada pelo consumidor para controlar o número de eventos que o consumidor do Hub de Eventos receberá ativamente e enfileirará localmente. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Mapeie com a chave como ID da partição e valores de StartPositionProperties |
O mapa que contém a posição do evento a ser usada para cada partição se um ponto de verificação para a partição não existir no armazenamento de pontos de verificação. Este mapa é chaveado fora do ID da partição. |
Observação
A configuração initial-partition-event-position
aceita um map
para especificar a posição inicial para cada hub de eventos. Assim, sua chave é o ID da partição, e o valor é de StartPositionProperties
, que inclui propriedades de deslocamento, número de sequência, data e hora enfileirada e se inclusive. Por exemplo, você pode defini-lo como
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Configuração avançada do consumidor
Osde conexão de
Propriedades do produtor
Estas propriedades são expostas através de EventHubsProducerProperties
.
Observação
Para evitar repetições, desde as versões 4.19.0 e 5.19.0, o Spring Cloud Azure Stream Binder Event Hubs oferece suporte à configuração de valores para todos os canais, no formato de spring.cloud.stream.eventhubs.default.producer.<property>=<value>
.
Propriedades configuráveis do produtor de spring-cloud-azure-stream-binder-eventhubs:
Propriedade | Tipo | Descrição |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | Booleano | O sinalizador de switch para sincronização do produtor. Se verdadeiro, o produtor aguardará uma resposta após uma operação de envio. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | Longo | A quantidade de tempo para aguardar uma resposta após uma operação de envio. Terá efeito somente quando um produtor de sincronização estiver habilitado. |
Configuração avançada do produtor
O de conexão de
Utilização básica
Enviar e receber mensagens de/para Hubs de Eventos
Preencha as opções de configuração com informações de credencial.
Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Para credenciais como entidade de serviço, configure as seguintes propriedades no arquivo application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Observação
Os valores permitidos para tenant-id
são: common
, organizations
, consumers
ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção
Para credenciais como identidades gerenciadas, configure as seguintes propriedades em seu arquivo application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Definir fornecedor e consumidor.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Suporte de particionamento
Um PartitionSupplier
com informações de partição fornecidas pelo usuário é criado para configurar as informações de partição sobre a mensagem a ser enviada. O fluxograma a seguir mostra o processo de obtenção de diferentes prioridades para o ID e a chave da partição:
Suporte ao consumidor em lote
Forneça as opções de configuração de lote, conforme mostrado no exemplo a seguir:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Definir fornecedor e consumidor.
Para o modo de ponto de verificação como
BATCH
, você pode usar o código a seguir para enviar mensagens e consumir em lotes.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Para o modo de ponto de verificação como
MANUAL
, você pode usar o código a seguir para enviar mensagens e consumir/ponto de verificação em lotes.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Observação
No modo de consumo em lote, o tipo de conteúdo padrão do fichário do Spring Cloud Stream é application/json
, portanto, verifique se a carga útil da mensagem está alinhada com o tipo de conteúdo. Por exemplo, ao usar o tipo de conteúdo padrão de application/json
para receber mensagens com String
carga útil, a carga deve ser JSON String
, cercada de aspas duplas para o texto String
original. Enquanto para text/plain
tipo de conteúdo, ele pode ser um objeto String
diretamente. Para obter mais informações, consulte Spring Cloud Stream Content Type Negotiation.
Manipular mensagens de erro
Manipular mensagens de erro de vinculação de saída
Por padrão, o Spring Integration cria um canal de erro global chamado
errorChannel
. Configure o ponto de extremidade de mensagem a seguir para lidar com mensagens de erro de vinculação de saída.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Manipular mensagens de erro de vinculação de entrada
O Spring Cloud Stream Event Hubs Binder suporta uma solução para lidar com erros para as ligações de mensagens de entrada: manipuladores de erros.
Manipulador de erros:
O Spring Cloud Stream expõe um mecanismo para você fornecer um manipulador de erros personalizado adicionando um
Consumer
que aceita instânciasErrorMessage
. Para obter mais informações, consulte Manipular mensagens de erro na documentação do Spring Cloud Stream.Manipulador de erro de vinculação-padrão
Configure um único bean de
Consumer
para consumir todas as mensagens de erro de vinculação de entrada. A seguinte função padrão se inscreve em cada canal de erro de vinculação de entrada:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Você também precisa definir a propriedade
spring.cloud.stream.default.error-handler-definition
para o nome da função.Manipulador de erros específico da vinculação
Configure um
Consumer
bean para consumir as mensagens de erro de vinculação de entrada específicas. A função a seguir se inscreve no canal de erro de vinculação de entrada específico e tem uma prioridade maior do que o manipulador de erro vinculante-padrão:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
Você também precisa definir a propriedade
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
para o nome da função.
Cabeçalhos de mensagem dos Hubs de Eventos
Para obter os cabeçalhos de mensagem básicos suportados, consulte a seção cabeçalhos de mensagem dos Hubs de Eventos de suporte do Spring Cloud Azure para o Spring Integration.
Suporte a vários fichários
A conexão com vários namespaces de Hubs de Eventos também é suportada usando vários fichários. Este exemplo usa uma cadeia de conexão como exemplo. Credenciais de entidades de serviço e identidades gerenciadas também são suportadas. Você pode definir propriedades relacionadas nas configurações de ambiente de cada fichário.
Para usar vários fichários com Hubs de Eventos, configure as seguintes propriedades no arquivo application.yml:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Observação
O arquivo de aplicativo anterior mostra como configurar um único poller padrão para o aplicativo para todas as associações. Se quiser configurar o poller para uma ligação específica, você pode usar uma configuração como
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.Precisamos definir dois fornecedores e dois consumidores:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Provisionamento de recursos
O fichário de Hubs de Eventos oferece suporte ao provisionamento de hub de eventos e grupo de consumidores, os usuários podem usar as seguintes propriedades para habilitar o provisionamento.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Observação
Os valores permitidos para tenant-id
são: common
, organizations
, consumers
ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção
Amostras
Para obter mais informações, consulte o repositório azure-spring-boot-samples
Spring Cloud Stream Binder para o Barramento de Serviço do Azure
Conceitos-chave
O Spring Cloud Stream Binder for Azure Service Bus fornece a implementação de vinculação para o Spring Cloud Stream Framework. Esta implementação usa adaptadores de canal do Spring Integration Service Bus em sua base.
Mensagem agendada
Este fichário suporta o envio de mensagens para um tópico para processamento atrasado. Os usuários podem enviar mensagens agendadas com cabeçalho x-delay
expressando em milissegundos um tempo de atraso para a mensagem. A mensagem será entregue aos respetivos tópicos após x-delay
milissegundos.
Grupo de consumidores
O Service Bus Topic fornece suporte semelhante ao grupo de consumidores do Apache Kafka, mas com uma lógica ligeiramente diferente.
Este aglutinante baseia-se na Subscription
de um tópico para atuar como um grupo de consumidores.
Configuração de dependência
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Como alternativa, você também pode usar o Spring Cloud Azure Stream Service Bus Starter, conforme mostrado no exemplo a seguir para o Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Configuração
O fichário fornece as seguintes duas partes das opções de configuração:
Propriedades de configuração de conexão
Esta seção contém as opções de configuração usadas para se conectar ao Barramento de Serviço do Azure.
Observação
Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar acesso com o Microsoft Entra ID para verificar se a entidade de segurança recebeu a permissão suficiente para acessar o recurso do Azure.
Propriedades configuráveis de conexão de spring-cloud-azure-stream-binder-servicebus:
Propriedade | Tipo | Descrição |
---|---|---|
spring.cloud.azure.servicebus.enabled | Booleano | Se um Barramento de Serviço do Azure está habilitado. |
spring.cloud.azure.servicebus.connection-string | String | Valor da cadeia de conexão do Namespace do Service Bus. |
spring.cloud.azure.servicebus.custom-endpoint-address | String | O endereço de ponto de extremidade personalizado a ser usado ao se conectar ao Service Bus. |
spring.cloud.azure.servicebus.namespace | String | Valor de Namespace do Service Bus, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | String | Nome de domínio de um valor de Namespace do Barramento de Serviço do Azure. |
Observação
As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o fichário do Spring Cloud Azure Stream Service Bus. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure.
ou o prefixo de spring.cloud.azure.servicebus.
.
O fichário também dá suporte a Spring Could Azure Resource Manager por padrão. Para saber como recuperar a cadeia de conexão com entidades de segurança que não são concedidas com Data
funções relacionadas, consulte a seção de uso básico de Spring Could Azure Resource Manager.
Propriedades de configuração de vinculação do Barramento de Serviço do Azure
As opções a seguir estão divididas em quatro seções: Propriedades do consumidor, Configurações avançadas do consumidor, Propriedades do produtor e Configurações avançadas do produtor.
Propriedades de consumo
Estas propriedades são expostas através de ServiceBusConsumerProperties
.
Observação
Para evitar repetições, desde as versões 4.19.0 e 5.19.0, o Spring Cloud Azure Stream Binder Service Bus suporta a definição de valores para todos os canais, no formato de spring.cloud.stream.servicebus.default.consumer.<property>=<value>
.
Propriedades configuráveis pelo consumidor de spring-cloud-azure-stream-binder-servicebus:
Propriedade | Tipo | Inadimplência | Descrição |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | Booleano | falso | Se as mensagens com falha forem roteadas para o DLQ. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Inteiro | 1 | Max mensagens simultâneas que o cliente do processador do Service Bus deve processar. Quando a sessão está ativada, aplica-se a cada sessão. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Inteiro | nulo | Número máximo de sessões simultâneas a serem processadas a qualquer momento. |
spring.cloud.stream.servicebus.bindings.binding-name.consumerhabilitado para .session | Booleano | nulo | Se a sessão está habilitada. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Inteiro | 0 | A contagem de pré-busca do cliente do processador do Service Bus. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.subfila | Subfila | nenhum | O tipo de subfila à qual se conectar. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Duração | 5 metros | A quantidade de tempo para continuar a renovação automática do bloqueio. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | O modo de recebimento do cliente do processador do Service Bus. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Booleano | verdadeiro | Se as mensagens devem ser liquidadas automaticamente. Se definido como false, um cabeçalho de mensagem de Checkpointer será adicionado para permitir que os desenvolvedores liquidem as mensagens manualmente. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes | Longo | 1024 | O tamanho máximo da fila/tópico em megabytes, que é o tamanho da memória alocada para a fila/tópico. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Duração | P10675199DT2H48M5.4775807S. (10675199 dias, 2 horas, 48 minutos, 5 segundos e 477 milissegundos) | A duração após a qual a mensagem expira, a partir de quando a mensagem é enviada para o Service Bus. |
Importante
Ao usar o Azure Resource Manager (ARM), você deve configurar a propriedade spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type
. Para obter mais informações, consulte o exemplo de servicebus-queue-binder-arm no GitHub.
Configuração avançada do consumidor
O de conexão de
Propriedades do produtor
Estas propriedades são expostas através de ServiceBusProducerProperties
.
Observação
Para evitar repetições, desde as versões 4.19.0 e 5.19.0, o Spring Cloud Azure Stream Binder Service Bus suporta a definição de valores para todos os canais, no formato de spring.cloud.stream.servicebus.default.producer.<property>=<value>
.
Propriedades configuráveis do produtor de spring-cloud-azure-stream-binder-servicebus:
Propriedade | Tipo | Inadimplência | Descrição |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | Booleano | falso | Sinalizador de switch para sincronização do produtor. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | Longo | 10000 | Valor de tempo limite para envio do produtor. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | nulo | Tipo de entidade do Service Bus do produtor, necessário para o produtor de ligação. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes | Longo | 1024 | O tamanho máximo da fila/tópico em megabytes, que é o tamanho da memória alocada para a fila/tópico. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Duração | P10675199DT2H48M5.4775807S. (10675199 dias, 2 horas, 48 minutos, 5 segundos e 477 milissegundos) | A duração após a qual a mensagem expira, a partir de quando a mensagem é enviada para o Service Bus. |
Importante
Ao usar o produtor de ligação, a propriedade de spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
deve ser configurada.
Configuração avançada do produtor
O de conexão de
Utilização básica
Enviar e receber mensagens de/para o Service Bus
Preencha as opções de configuração com informações de credencial.
Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Para credenciais como entidade de serviço, configure as seguintes propriedades no arquivo application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Observação
Os valores permitidos para tenant-id
são: common
, organizations
, consumers
ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção
Para credenciais como identidades gerenciadas, configure as seguintes propriedades em seu arquivo application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Definir fornecedor e consumidor.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Suporte de chave de partição
O fichário suporta de particionamento do Service Bus, permitindo definir a chave de partição e o ID da sessão no cabeçalho da mensagem. Esta seção apresenta como definir a chave de partição para mensagens.
O Spring Cloud Stream fornece uma propriedade de expressão SpEL de chave de partição spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
. Por exemplo, definir essa propriedade como "'partitionKey-' + headers[<message-header-key>]"
e adicionar um cabeçalho chamado message-header-key. O Spring Cloud Stream usa o valor desse cabeçalho ao avaliar a expressão para atribuir uma chave de partição. O código a seguir fornece um exemplo de produtor:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Suporte de sessão
O fichário suporta sessões de mensagens do Service Bus. O ID de sessão de uma mensagem pode ser definido através do cabeçalho da mensagem.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Observação
De acordo com de particionamento do Service Bus, o ID da sessão tem prioridade maior do que a chave de partição. Assim, quando os cabeçalhos ServiceBusMessageHeaders#SESSION_ID
e ServiceBusMessageHeaders#PARTITION_KEY
são definidos, o valor do ID da sessão é eventualmente usado para substituir o valor da chave de partição.
Manipular mensagens de erro
Manipular mensagens de erro de vinculação de saída
Por padrão, o Spring Integration cria um canal de erro global chamado
errorChannel
. Configure o ponto de extremidade da seguinte mensagem para manipular a mensagem de erro de vinculação de saída.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Manipular mensagens de erro de vinculação de entrada
O Spring Cloud Stream Service Bus Binder suporta duas soluções para lidar com erros para as ligações de mensagens de entrada: o manipulador de erros do fichário e os manipuladores.
Manipulador de erro do fichário:
O manipulador de erros de fichário padrão lida com a associação de entrada. Use esse manipulador para enviar mensagens com falha para a fila de mensagens mortas quando
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
estiver habilitado. Caso contrário, as mensagens com falha serão abandonadas. O manipulador de erros do fichário é mutuamente exclusivo com outros manipuladores de erro fornecidos.Manipulador de erros:
O Spring Cloud Stream expõe um mecanismo para você fornecer um manipulador de erros personalizado adicionando um
Consumer
que aceita instânciasErrorMessage
. Para obter mais informações, consulte Manipular mensagens de erro na documentação do Spring Cloud Stream.Manipulador de erro de vinculação-padrão
Configure um único bean de
Consumer
para consumir todas as mensagens de erro de vinculação de entrada. A seguinte função padrão se inscreve em cada canal de erro de vinculação de entrada:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Você também precisa definir a propriedade
spring.cloud.stream.default.error-handler-definition
para o nome da função.Manipulador de erros específico da vinculação
Configure um
Consumer
bean para consumir as mensagens de erro de vinculação de entrada específicas. A função a seguir se inscreve no canal de erro de vinculação de entrada específico com uma prioridade maior do que o manipulador de erro de vinculação padrão.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Você também precisa definir a propriedade
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
para o nome da função.
Cabeçalhos de mensagens do Barramento de Serviço
Para obter os cabeçalhos de mensagem básicos suportados, consulte a seção cabeçalhos de mensagem do
Observação
Ao definir a chave de partição, a prioridade do cabeçalho da mensagem é maior do que a propriedade do Spring Cloud Stream. Portanto, spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
só entra em vigor quando nenhum dos cabeçalhos ServiceBusMessageHeaders#SESSION_ID
e ServiceBusMessageHeaders#PARTITION_KEY
estiver configurado.
Suporte a vários fichários
A conexão com vários namespaces do Service Bus também é suportada usando vários fichários. Este exemplo usa a cadeia de conexão como exemplo. Credenciais de entidades de serviço e identidades gerenciadas também são suportadas, os usuários podem definir propriedades relacionadas nas configurações de ambiente de cada fichário.
Para usar vários fichários do ServiceBus, configure as seguintes propriedades no arquivo application.yml:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Observação
O arquivo de aplicativo anterior mostra como configurar um único poller padrão para o aplicativo para todas as associações. Se quiser configurar o poller para uma ligação específica, você pode usar uma configuração como
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.precisamos definir dois fornecedores e dois consumidores
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Provisionamento de recursos
O fichário do barramento de serviço suporta o provisionamento de fila, tópico e assinatura, os usuários podem usar as seguintes propriedades para habilitar o provisionamento.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Observação
Os valores permitidos para tenant-id
são: common
, organizations
, consumers
ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção
Personalizar as propriedades do cliente do Service Bus
Os desenvolvedores podem usar AzureServiceClientBuilderCustomizer
para personalizar as propriedades do Service Bus Client. O exemplo a seguir personaliza a propriedade sessionIdleTimeout
no ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Amostras
Para obter mais informações, consulte o repositório azure-spring-boot-samples