Suporte do Spring Cloud Azure para Spring Cloud Stream
Este artigo se aplica a:✅ versão 4.19.0 ✅ versão 5.18.0
O Spring Cloud Stream é uma estrutura para criar microsserviços altamente escalonáveis controlados por eventos conectados com sistemas de mensagens compartilhadas.
A estrutura fornece um modelo de programação flexível baseado em linguagens spring já estabelecidas e familiares e práticas recomendadas. Essas práticas recomendadas incluem suporte para semântica de pub/sub persistente, grupos de consumidores e partições com estado.
As implementações atuais do associador incluem:
-
spring-cloud-azure-stream-binder-eventhubs
- para obter mais informações, consulte Spring Cloud Stream Binder for Azure Event Hubs -
spring-cloud-azure-stream-binder-servicebus
- para obter mais informações, consulte Spring Cloud Stream Binder for Azure Service Bus
Spring Cloud Stream Binder para Hubs de Eventos do Azure
Principais conceitos
O Spring Cloud Stream Binder para Hubs de Eventos do Azure fornece a implementação de associação para a estrutura do Spring Cloud Stream. Essa implementação usa adaptadores de canal dos Hubs de Eventos do Spring Integration em sua base. Do ponto de vista do design, os Hubs de Eventos são semelhantes ao Kafka. Além disso, os Hubs de Eventos podem ser acessados por meio da API do Kafka. Se o projeto tiver uma dependência apertada da API do Kafka, você poderá tentar Hub de Eventos com o Exemplo de API do Kafka
Grupo de consumidores
Os Hubs de Eventos fornecem suporte semelhante ao grupo de consumidores como o Apache Kafka, mas com uma lógica ligeiramente diferente. Embora o Kafka armazene todos os deslocamentos confirmados no agente, você precisa armazenar deslocamentos de mensagens dos Hubs de Eventos sendo processadas manualmente. O SDK dos Hubs de Eventos fornece a função para armazenar esses deslocamentos dentro do Armazenamento do Azure.
Suporte ao particionamento
Os Hubs de Eventos fornecem um conceito semelhante de partição física como Kafka. Mas, ao contrário do reequilíbrio automático do Kafka entre consumidores e partições, os Hubs de Eventos fornecem uma espécie de modo preemptivo. A conta de armazenamento atua como uma concessão para determinar qual consumidor possui qual partição. Quando um novo consumidor é iniciado, ele tenta roubar algumas partições dos consumidores mais carregados para obter o equilíbrio da carga de trabalho.
Para especificar a estratégia de balanceamento de carga, as propriedades de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
são fornecidas. Para obter mais informações, consulte a seção propriedades do consumidor
Suporte ao consumidor do lote
O associador de Hubs de Eventos do Spring Cloud Azure Stream dá suporte recurso consumidor do Lote do Spring Cloud Stream.
Para trabalhar com o modo de consumidor em lote, defina a propriedade spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
como true
. Quando habilitada, uma mensagem com uma carga de uma lista de eventos em lote é recebida e passada para a função Consumer
. Cada cabeçalho de mensagem também é convertido em uma lista, da qual o conteúdo é o valor de cabeçalho associado analisado de cada evento. Os cabeçalhos comuns da ID da partição, do ponto de verificação e das últimas propriedades enfileiradas são apresentados como um único valor porque todo o lote de eventos compartilha o mesmo valor. Para obter mais informações, consulte os cabeçalhos de mensagens dos Hubs de Eventos seção de suporte do Spring Cloud Azure para o Spring Integration.
Nota
O cabeçalho de ponto de verificação só existe quando o modo de ponto de verificação MANUAL
é usado.
O ponto de verificação do consumidor em lotes dá suporte a dois modos: BATCH
e MANUAL
.
BATCH
modo é um modo de ponto de verificação automático para verificar o lote inteiro de eventos juntos depois que o associador os recebe.
MANUAL
modo é verificar os eventos pelos usuários. Quando usado, o Checkpointer
é passado para o cabeçalho da mensagem e os usuários podem usá-lo para fazer o ponto de verificação.
Você pode especificar o tamanho do lote definindo as propriedades max-size
e max-wait-time
que têm um prefixo de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
. A propriedade max-size
é necessária e a propriedade max-wait-time
é opcional. Para obter mais informações, consulte a seção propriedades do consumidor
Configuração de dependência
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Como alternativa, você também pode usar o Spring Cloud Azure Stream Event Hubs Starter, conforme mostrado no exemplo a seguir para o Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Configuração
O associador fornece as três partes a seguir das opções de configuração:
Propriedades de configuração de conexão
Esta seção contém as opções de configuração usadas para se conectar aos Hubs de Eventos do Azure.
Nota
Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar o acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.
Propriedades configuráveis de conexão de spring-cloud-azure-stream-binder-eventhubs:
Propriedade | Tipo | Descrição |
---|---|---|
spring.cloud.azure.eventhubs.enabled | booleano | Se um Hub de Eventos do Azure está habilitado. |
spring.cloud.azure.eventhubs.connection-string | Corda | Valor da cadeia de conexão do Namespace dos Hubs de Eventos. |
spring.cloud.azure.eventhubs.namespace | Corda | Valor do Namespace dos Hubs de Eventos, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Corda | Nome de domínio de um valor de namespace dos Hubs de Eventos do Azure. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Corda | Endereço do ponto de extremidade personalizado. |
Ponta
As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o associador dos Hubs de Eventos do Azure Stream do Spring Cloud. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure.
ou o prefixo de spring.cloud.azure.eventhubs.
.
O associador também dá suporte a Spring Could Azure Resource Manager por padrão. Para saber mais sobre como recuperar a cadeia de conexão com entidades de segurança que não são concedidas com funções relacionadas Data
, consulte a seção Uso básico de Spring Could Azure Resource Manager.
Propriedades de configuração de ponto de verificação
Esta seção contém as opções de configuração para o serviço Blobs de Armazenamento, que é usado para persistir a propriedade da partição e informações de ponto de verificação.
Nota
Na versão 4.0.0, quando a propriedade de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists não estiver habilitada manualmente, nenhum contêiner de armazenamento será criado automaticamente com o nome de spring.cloud.stream.bindings.binding-name.destination.
Propriedades configuráveis de ponto de verificação do spring-cloud-azure-stream-binder-eventhubs:
Propriedade | Tipo | Descrição |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booleano | Se deseja permitir a criação de contêineres se não existir. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Corda | Nome da conta de armazenamento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Corda | Chave de acesso da conta de armazenamento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Corda | Nome do contêiner de armazenamento. |
Ponta
As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o repositório de ponto de verificação de Blob de Armazenamento. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure.
ou o prefixo de spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Propriedades de configuração de associação dos Hubs de Eventos do Azure
As opções a seguir são divididas em quatro seções: Propriedades do Consumidor, Configurações Avançadas do Consumidor, Propriedades do Produtor e Configurações Avançadas de Produtor.
Propriedades do consumidor
Essas propriedades são expostas por meio de EventHubsConsumerProperties
.
Nota
Para evitar repetição, uma vez que as versões 4.19.0 e 5.18.0, os Hubs de Eventos do Azure Stream Binder do Spring Cloud dão suporte à definição de valores para todos os canais, no formato de spring.cloud.stream.eventhubs.default.consumer.<property>=<value>
.
Propriedades configuráveis do consumidor de spring-cloud-azure-stream-binder-eventhubs:
Propriedade | Tipo | Descrição |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | Modo de ponto de verificação usado quando o consumidor decide como colocar a mensagem de ponto de verificação |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Inteiro | Decide a quantidade de mensagem para cada partição fazer um ponto de verificação. Entrará em vigor somente quando PARTITION_COUNT modo de ponto de verificação for usado. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Duração | Decide o intervalo de tempo para fazer um ponto de verificação. Entrará em vigor somente quando TIME modo de ponto de verificação for usado. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | Inteiro | O número máximo de eventos em um lote. Necessário para o modo de consumidor em lote. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Duração | A duração máxima do tempo para consumo em lote. Entrará em vigor somente quando o modo de consumidor em lote estiver habilitado e for opcional. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Duração | A duração do tempo de intervalo para atualização. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | A estratégia de balanceamento de carga. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Duração | A duração do tempo após a qual a propriedade da partição expira. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Booleano | Se o processador de eventos deve solicitar informações sobre o último evento enfileirado em sua partição associada e acompanhar essas informações à medida que os eventos são recebidos. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Inteiro | A contagem usada pelo consumidor para controlar o número de eventos que o consumidor do Hub de Eventos receberá e enfileirará localmente. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Mapeie com a chave como a ID da partição e os valores de StartPositionProperties |
O mapa que contém a posição do evento a ser usada para cada partição se um ponto de verificação para a partição não existir no repositório de pontos de verificação. Esse mapa é retirado da ID da partição. |
Nota
A configuração de initial-partition-event-position
aceita uma map
para especificar a posição inicial para cada hub de eventos. Portanto, sua chave é a ID da partição e o valor é de StartPositionProperties
, que inclui propriedades de deslocamento, número de sequência, data e inclusão. Por exemplo, você pode defini-lo como
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Configuração avançada do consumidor
Ode conexão
Propriedades do produtor
Essas propriedades são expostas por meio de EventHubsProducerProperties
.
Nota
Para evitar repetição, uma vez que as versões 4.19.0 e 5.18.0, os Hubs de Eventos do Azure Stream Binder do Spring Cloud dão suporte à definição de valores para todos os canais, no formato de spring.cloud.stream.eventhubs.default.producer.<property>=<value>
.
Propriedades configuráveis do produtor de spring-cloud-azure-stream-binder-eventhubs:
Propriedade | Tipo | Descrição |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | booleano | O sinalizador de comutador para sincronização do produtor. Se for true, o produtor aguardará uma resposta após uma operação de envio. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | Longas | A quantidade de tempo para aguardar uma resposta após uma operação de envio. Entrará em vigor somente quando um produtor de sincronização estiver habilitado. |
Configuração avançada do produtor
O de conexão
Uso básico
Enviar e receber mensagens de/para Hubs de Eventos
Preencha as opções de configuração com informações de credencial.
Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Para credenciais como entidade de serviço, configure as seguintes propriedades em seu arquivo application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Nota
Os valores permitidos para tenant-id
são: common
, organizations
, consumers
ou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.
Para credenciais como identidades gerenciadas, configure as seguintes propriedades em seu arquivo application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Definir fornecedor e consumidor.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Suporte ao particionamento
Um PartitionSupplier
com informações de partição fornecidas pelo usuário é criado para configurar as informações de partição sobre a mensagem a ser enviada. O fluxograma a seguir mostra o processo de obtenção de prioridades diferentes para a ID de partição e a chave:
Suporte ao consumidor do lote
Forneça as opções de configuração em lote, conforme mostrado no exemplo a seguir:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Definir fornecedor e consumidor.
Para o modo de ponto de verificação como
BATCH
, você pode usar o código a seguir para enviar mensagens e consumir em lotes.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Para o modo de ponto de verificação como
MANUAL
, você pode usar o código a seguir para enviar mensagens e consumir/ponto de verificação em lotes.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Nota
No modo de consumo em lote, o tipo de conteúdo padrão do associador spring cloud stream é application/json
, portanto, verifique se o conteúdo da mensagem está alinhado com o tipo de conteúdo. Por exemplo, ao usar o tipo de conteúdo padrão de application/json
para receber mensagens com String
conteúdo, o conteúdo deve ser JSON String
, cercado de aspas duplas para o texto de String
original. Embora para text/plain
tipo de conteúdo, ele pode ser um objeto String
diretamente. Para obter mais informações, consulte de negociação de tipo de conteúdo do Spring Cloud Stream.
Manipular mensagens de erro
Manipular mensagens de erro de associação de saída
Por padrão, o Spring Integration cria um canal de erro global chamado
errorChannel
. Configure o ponto de extremidade de mensagem a seguir para lidar com mensagens de erro de associação de saída.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Manipular mensagens de erro de associação de entrada
O Spring Cloud Stream Event Hubs Binder dá suporte a uma solução para lidar com erros para as associações de mensagens de entrada: manipuladores de erros.
do Manipulador de Erros:
O Spring Cloud Stream expõe um mecanismo para que você forneça um manipulador de erros personalizado adicionando um
Consumer
que aceitaErrorMessage
instâncias. Para obter mais informações, consulte manipular mensagens de erro na documentação do Spring Cloud Stream.Manipulador de erros padrão de associação
Configure um único
Consumer
bean para consumir todas as mensagens de erro de associação de entrada. A função padrão a seguir assina cada canal de erro de associação de entrada:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Você também precisa definir a propriedade
spring.cloud.stream.default.error-handler-definition
como o nome da função.Manipulador de erros específico à associação
Configure um
Consumer
bean para consumir as mensagens de erro de associação de entrada específicas. A função a seguir assina o canal de erro de associação de entrada específico e tem uma prioridade maior do que o manipulador de erros padrão de associação:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
Você também precisa definir a propriedade
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
como o nome da função.
Cabeçalhos de mensagem dos Hubs de Eventos
Para obter os cabeçalhos de mensagem básicos com suporte, consulte os cabeçalhos de mensagens dos Hubs de Eventos seção de suporte do Spring Cloud Azure para o Spring Integration.
Suporte a vários associados
A conexão com vários namespaces de Hubs de Eventos também tem suporte usando vários associadores. Este exemplo usa uma cadeia de conexão como exemplo. Credenciais de entidades de serviço e identidades gerenciadas também são compatíveis. Você pode definir propriedades relacionadas nas configurações de ambiente de cada associador.
Para usar vários associadores com Hubs de Eventos, configure as seguintes propriedades em seu arquivo de application.yml:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Nota
O arquivo de aplicativo anterior mostra como configurar um único sondador padrão para aplicativo para todas as associações. Se você quiser configurar o poller para uma associação específica, poderá usar uma configuração como
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.Precisamos definir dois fornecedores e dois consumidores:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Provisionamento de recursos
O associador de Hubs de Eventos dá suporte ao provisionamento do hub de eventos e do grupo de consumidores, os usuários podem usar as propriedades a seguir para habilitar o provisionamento.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Nota
Os valores permitidos para tenant-id
são: common
, organizations
, consumers
ou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.
Amostras
Para obter mais informações, consulte o azure-spring-boot-samples repositório no GitHub.
Spring Cloud Stream Binder para Barramento de Serviço do Azure
Principais conceitos
O Spring Cloud Stream Binder para Barramento de Serviço do Azure fornece a implementação de associação para o Spring Cloud Stream Framework. Essa implementação usa adaptadores de canal do Barramento de Serviço do Spring Integration em sua base.
Mensagem agendada
Esse associador dá suporte ao envio de mensagens para um tópico para processamento atrasado. Os usuários podem enviar mensagens agendadas com cabeçalho x-delay
expressando em milissegundos um tempo de atraso para a mensagem. A mensagem será entregue aos respectivos tópicos após x-delay
milissegundos.
Grupo de consumidores
O Tópico do Barramento de Serviço fornece suporte semelhante ao grupo de consumidores como o Apache Kafka, mas com uma lógica ligeiramente diferente.
Esse associador depende de Subscription
de um tópico para atuar como um grupo de consumidores.
Configuração de dependência
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Como alternativa, você também pode usar o Spring Cloud Azure Stream Service Bus Starter, conforme mostrado no exemplo a seguir para o Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Configuração
O associador fornece as duas seguintes partes das opções de configuração:
Propriedades de configuração de conexão
Esta seção contém as opções de configuração usadas para se conectar ao Barramento de Serviço do Azure.
Nota
Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar o acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.
Propriedades configuráveis de conexão do spring-cloud-azure-stream-binder-servicebus:
Propriedade | Tipo | Descrição |
---|---|---|
spring.cloud.azure.servicebus.enabled | booleano | Se um Barramento de Serviço do Azure está habilitado. |
spring.cloud.azure.servicebus.connection-string | Corda | Valor da cadeia de conexão do Namespace do Barramento de Serviço. |
spring.cloud.azure.servicebus.custom-endpoint-address | Corda | O endereço de ponto de extremidade personalizado a ser usado ao se conectar ao Barramento de Serviço. |
spring.cloud.azure.servicebus.namespace | Corda | Valor do Namespace do Barramento de Serviço, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Corda | Nome de domínio de um valor de namespace do Barramento de Serviço do Azure. |
Nota
As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o associador do Barramento de Serviço de Fluxo do Azure no Spring Cloud. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure.
ou o prefixo de spring.cloud.azure.servicebus.
.
O associador também dá suporte a Spring Could Azure Resource Manager por padrão. Para saber mais sobre como recuperar a cadeia de conexão com entidades de segurança que não são concedidas com funções relacionadas Data
, consulte a seção Uso básico de Spring Could Azure Resource Manager.
Propriedades de configuração de associação do Barramento de Serviço do Azure
As opções a seguir são divididas em quatro seções: Propriedades do Consumidor, Configurações Avançadas do Consumidor, Propriedades do Produtor e Configurações Avançadas de Produtor.
Propriedades do consumidor
Essas propriedades são expostas por meio de ServiceBusConsumerProperties
.
Nota
Para evitar repetição, uma vez que as versões 4.19.0 e 5.18.0, o Barramento de Serviço do Azure Stream Binder do Spring Cloud dá suporte à definição de valores para todos os canais, no formato de spring.cloud.stream.servicebus.default.consumer.<property>=<value>
.
Propriedades configuráveis do consumidor do spring-cloud-azure-stream-binder-servicebus:
Propriedade | Tipo | Inadimplência | Descrição |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | booleano | falso | Se as mensagens com falha forem roteadas para o DLQ. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Inteiro | 1 | Mensagens simultâneas máximas que o cliente do processador do Barramento de Serviço deve processar. Quando a sessão está habilitada, ela se aplica a cada sessão. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Inteiro | zero | Número máximo de sessões simultâneas a serem processadas a qualquer momento. |
spring.cloud.stream.servicebus.bindings.binding-name.consumerhabilitado para .session | Booleano | zero | Se a sessão está habilitada. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Inteiro | 0 | A contagem de pré-busca do cliente do processador do Barramento de Serviço. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | SubQueue | nenhum | O tipo da sub fila à qual se conectar. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Duração | 5m | A quantidade de tempo para continuar renovando automaticamente o bloqueio. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | O modo de recebimento do cliente do processador do Barramento de Serviço. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Booleano | verdadeiro | Se as mensagens serão resolvidas automaticamente. Se definido como false, um cabeçalho de mensagem de Checkpointer será adicionado para permitir que os desenvolvedores resolvam mensagens manualmente. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes | long | 1024 | O tamanho máximo da fila/tópico em megabytes, que é o tamanho da memória alocada para a fila/tópico. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Duração | P10675199DT2H48M5.4775807S. (10675199 dias, 2 horas, 48 minutos, 5 segundos e 477 milissegundos) | A duração após a qual a mensagem expira, começando de quando a mensagem é enviada para o Barramento de Serviço. |
Importante
Ao usar o ARM (Azure Resource Manager), você deve configurar a propriedade spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type
. Para obter mais informações, consulte o exemplo servicebus-queue-binder-arm no GitHub.
Configuração avançada do consumidor
O de conexão
Propriedades do produtor
Essas propriedades são expostas por meio de ServiceBusProducerProperties
.
Nota
Para evitar repetição, uma vez que as versões 4.19.0 e 5.18.0, o Barramento de Serviço do Azure Stream Binder do Spring Cloud dá suporte à definição de valores para todos os canais, no formato de spring.cloud.stream.servicebus.default.producer.<property>=<value>
.
Propriedades configuráveis do produtor do spring-cloud-azure-stream-binder-servicebus:
Propriedade | Tipo | Inadimplência | Descrição |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | booleano | falso | Sinalizador de alternância para sincronização do produtor. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | Longas | 10000 | Valor de tempo limite para envio do produtor. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | zero | Tipo de entidade do Barramento de Serviço do produtor, necessário para o produtor de associação. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes | long | 1024 | O tamanho máximo da fila/tópico em megabytes, que é o tamanho da memória alocada para a fila/tópico. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Duração | P10675199DT2H48M5.4775807S. (10675199 dias, 2 horas, 48 minutos, 5 segundos e 477 milissegundos) | A duração após a qual a mensagem expira, começando de quando a mensagem é enviada para o Barramento de Serviço. |
Importante
Ao usar o produtor de associação, é necessário configurar a propriedade de spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
.
Configuração avançada do produtor
O de conexão
Uso básico
Enviar e receber mensagens de/para o Barramento de Serviço
Preencha as opções de configuração com informações de credencial.
Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Para credenciais como entidade de serviço, configure as seguintes propriedades em seu arquivo application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Nota
Os valores permitidos para tenant-id
são: common
, organizations
, consumers
ou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.
Para credenciais como identidades gerenciadas, configure as seguintes propriedades em seu arquivo application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Definir fornecedor e consumidor.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Suporte à chave de partição
O associador dá suporte de particionamento do Barramento de Serviço, permitindo a definição da chave de partição e da ID da sessão no cabeçalho da mensagem. Esta seção apresenta como definir a chave de partição para mensagens.
O Spring Cloud Stream fornece uma propriedade de expressão SpEL de chave de partição spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
. Por exemplo, definir essa propriedade como "'partitionKey-' + headers[<message-header-key>]"
e adicionar um cabeçalho chamado message-header-key. O Spring Cloud Stream usa o valor desse cabeçalho ao avaliar a expressão para atribuir uma chave de partição. O código a seguir fornece um produtor de exemplo:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Suporte à sessão
O associador dá suporte sessões de mensagem do Barramento de Serviço. A ID de sessão de uma mensagem pode ser definida por meio do cabeçalho da mensagem.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Nota
De acordo com de particionamento do Barramento de Serviço, a ID da sessão tem prioridade maior do que a chave de partição. Portanto, quando ambos os cabeçalhos ServiceBusMessageHeaders#SESSION_ID
e ServiceBusMessageHeaders#PARTITION_KEY
são definidos, o valor da ID da sessão é eventualmente usado para substituir o valor da chave de partição.
Manipular mensagens de erro
Manipular mensagens de erro de associação de saída
Por padrão, o Spring Integration cria um canal de erro global chamado
errorChannel
. Configure o ponto de extremidade de mensagem a seguir para lidar com a mensagem de erro de associação de saída.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Manipular mensagens de erro de associação de entrada
O Spring Cloud Stream Service Bus Binder dá suporte a duas soluções para lidar com erros para as associações de mensagens de entrada: o manipulador de erros e manipuladores do associador.
do manipulador de erros do
Binder: O manipulador de erros do associador padrão manipula a associação de entrada. Use esse manipulador para enviar mensagens com falha para a fila de mensagens mortas quando
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
estiver habilitado. Caso contrário, as mensagens com falha serão abandonadas. O manipulador de erros do associador é mutuamente exclusivo com outros manipuladores de erros fornecidos.do manipulador de erros do
: O Spring Cloud Stream expõe um mecanismo para que você forneça um manipulador de erros personalizado adicionando um
Consumer
que aceitaErrorMessage
instâncias. Para obter mais informações, consulte manipular mensagens de erro na documentação do Spring Cloud Stream.Manipulador de erros padrão de associação
Configure um único
Consumer
bean para consumir todas as mensagens de erro de associação de entrada. A função padrão a seguir assina cada canal de erro de associação de entrada:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Você também precisa definir a propriedade
spring.cloud.stream.default.error-handler-definition
como o nome da função.Manipulador de erros específico à associação
Configure um
Consumer
bean para consumir as mensagens de erro de associação de entrada específicas. A função a seguir assina o canal de erro de associação de entrada específico com uma prioridade maior do que o manipulador de erros padrão de associação.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Você também precisa definir a propriedade
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
como o nome da função.
Cabeçalhos de mensagem do Barramento de Serviço
Para obter os cabeçalhos de mensagem básicos com suporte, consulte os cabeçalhos de mensagem do Barramento de Serviço seção do suporte do Spring Cloud Azure para o Spring Integration.
Nota
Ao definir a chave de partição, a prioridade do cabeçalho da mensagem é maior do que a propriedade Spring Cloud Stream. Portanto, spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
entra em vigor somente quando nenhum dos cabeçalhos ServiceBusMessageHeaders#SESSION_ID
e ServiceBusMessageHeaders#PARTITION_KEY
estiver configurado.
Suporte a vários associados
A conexão com vários namespaces do Barramento de Serviço também tem suporte usando vários associadores. Este exemplo usa a cadeia de conexão como exemplo. Credenciais de entidades de serviço e identidades gerenciadas também têm suporte, os usuários podem definir propriedades relacionadas nas configurações de ambiente de cada associador.
Para usar vários associadores do ServiceBus, configure as seguintes propriedades em seu arquivo application.yml:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Nota
O arquivo de aplicativo anterior mostra como configurar um único sondador padrão para aplicativo para todas as associações. Se você quiser configurar o poller para uma associação específica, poderá usar uma configuração como
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.precisamos definir dois fornecedores e dois consumidores
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Provisionamento de recursos
O associador do barramento de serviço dá suporte ao provisionamento de fila, tópico e assinatura, os usuários podem usar as propriedades a seguir para habilitar o provisionamento.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Nota
Os valores permitidos para tenant-id
são: common
, organizations
, consumers
ou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.
Personalizar propriedades do cliente do Barramento de Serviço
Os desenvolvedores podem usar AzureServiceClientBuilderCustomizer
para personalizar as propriedades do Cliente do Barramento de Serviço. O exemplo a seguir personaliza a propriedade sessionIdleTimeout
em ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Amostras
Para obter mais informações, consulte o azure-spring-boot-samples repositório no GitHub.