Partilhar via


Suporte do Spring Cloud Azure para o Spring Cloud Stream

Este artigo aplica-se a:✅ Versão 4.19.0 ✅ Versão 5.19.0

O Spring Cloud Stream é uma estrutura para a criação de microsserviços altamente escaláveis orientados a eventos conectados com sistemas de mensagens compartilhados.

A estrutura fornece um modelo de programação flexível baseado em expressões idiomáticas e melhores práticas Spring já estabelecidas e familiares. Essas práticas recomendadas incluem suporte para semântica pub/sub persistente, grupos de consumidores e partições com monitoração de estado.

As implementações atuais do fichário incluem:

Spring Cloud Stream Binder para Hubs de Eventos do Azure

Conceitos-chave

O Spring Cloud Stream Binder para Hubs de Eventos do Azure fornece a implementação de vinculação para a estrutura do Spring Cloud Stream. Esta implementação usa os adaptadores de canal do Spring Integration Event Hubs em sua base. Do ponto de vista do design, os Hubs de Eventos são semelhantes a Kafka. Além disso, os Hubs de Eventos podem ser acessados via Kafka API. Se o seu projeto tiver dependência total da API Kafka, você pode tentar Hub de Eventos com a API Kafka Sample

Grupo de consumidores

Os Hubs de Eventos fornecem suporte semelhante ao grupo de consumidores do Apache Kafka, mas com uma lógica ligeiramente diferente. Enquanto Kafka armazena todos os deslocamentos confirmados no broker, você precisa armazenar offsets de mensagens de Hubs de Eventos sendo processadas manualmente. O SDK dos Hubs de Eventos fornece a função para armazenar esses deslocamentos dentro do Armazenamento do Azure.

Suporte de particionamento

Os Hubs de Eventos fornecem um conceito de partição física semelhante ao Kafka. Mas, ao contrário do reequilíbrio automático entre consumidores e partições de Kafka, os Hubs de Eventos fornecem uma espécie de modo preventivo. A conta de armazenamento atua como uma locação para determinar qual consumidor possui qual partição. Quando um novo consumidor começa, ele tenta roubar algumas partições dos consumidores mais carregados para alcançar o equilíbrio da carga de trabalho.

Para especificar a estratégia de balanceamento de carga, as propriedades de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* são fornecidas. Para obter mais informações, consulte a seção propriedades do consumidor .

Suporte ao consumidor em lote

O fichário do Spring Cloud Azure Stream Event Hubs suporta recurso Spring Cloud Stream Batch Consumer.

Para trabalhar com o modo batch-consumer, defina a propriedade spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode como true. Quando habilitada, uma mensagem com uma carga de uma lista de eventos em lote é recebida e passada para a função Consumer. Cada cabeçalho de mensagem também é convertido em uma lista, cujo conteúdo é o valor de cabeçalho associado analisado de cada evento. Os cabeçalhos comuns de ID de partição, ponteiro de verificação e últimas propriedades enfileiradas são apresentados como um único valor porque todo o lote de eventos compartilha o mesmo valor. Para obter mais informações, consulte a seção cabeçalhos de mensagem dos Hubs de Eventos de suporte do Spring Cloud Azure para o Spring Integration.

Observação

O cabeçalho do ponto de verificação só existe quando o modo de ponto de verificação MANUAL é usado.

O ponto de verificação do consumidor em lote suporta dois modos: BATCH e MANUAL. BATCH modo é um modo de ponto de verificação automático para verificar todo o lote de eventos juntos assim que o fichário os recebe. MANUAL modo é verificar os eventos pelos usuários. Quando usado, o Checkpointer é passado para o cabeçalho da mensagem e os usuários podem usá-lo para fazer checkpointing.

Você pode especificar o tamanho do lote definindo as propriedades max-size e max-wait-time que têm um prefixo de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. A propriedade max-size é necessária e a propriedade max-wait-time é opcional. Para obter mais informações, consulte a seção propriedades do consumidor .

Configuração de dependência

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Como alternativa, você também pode usar o Spring Cloud Azure Stream Event Hubs Starter, conforme mostrado no exemplo a seguir para o Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Configuração

O fichário fornece as seguintes três partes das opções de configuração:

Propriedades de configuração de conexão

Esta seção contém as opções de configuração usadas para se conectar aos Hubs de Eventos do Azure.

Observação

Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar acesso com o Microsoft Entra ID para verificar se a entidade de segurança recebeu a permissão suficiente para acessar o recurso do Azure.

Propriedades configuráveis de conexão de spring-cloud-azure-stream-binder-eventhubs:

Propriedade Tipo Descrição
spring.cloud.azure.eventhubs.enabled Booleano Se um Hubs de Eventos do Azure está habilitado.
spring.cloud.azure.eventhubs.connection-string String Valor da cadeia de conexão de namespace de Hubs de Eventos.
spring.cloud.azure.eventhubs.namespace String Valor de namespace de Hubs de Eventos, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name String Nome de domínio de um valor de Namespace de Hubs de Eventos do Azure.
spring.cloud.azure.eventhubs.custom-endpoint-address String Endereço de ponto final personalizado.

Dica

As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o fichário dos Hubs de Eventos do Azure Stream do Spring Cloud. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure. ou o prefixo de spring.cloud.azure.eventhubs..

O fichário também dá suporte a Spring Could Azure Resource Manager por padrão. Para saber como recuperar a cadeia de conexão com entidades de segurança que não são concedidas com Data funções relacionadas, consulte a seção de uso básico de Spring Could Azure Resource Manager.

Propriedades de configuração do ponto de verificação

Esta seção contém as opções de configuração para o serviço de Blobs de Armazenamento, que é usado para manter a propriedade da partição e as informações do ponto de verificação.

Observação

A partir da versão 4.0.0, quando a propriedade de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists não estiver habilitada manualmente, nenhum contêiner de armazenamento será criado automaticamente com o nome de spring.cloud.stream.bindings.binding-name.destination.

Propriedades configuráveis de ponto de verificação de spring-cloud-azure-stream-binder-eventhubs:

Propriedade Tipo Descrição
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleano Se a criação de contêineres não existe, deve ser permitida.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name String Nome da conta de armazenamento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String Chave de acesso da conta de armazenamento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String Nome do recipiente de armazenamento.

Dica

As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o armazenamento de pontos de verificação de Blob de Armazenamento. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure. ou o prefixo de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propriedades de configuração de vinculação dos Hubs de Eventos do Azure

As opções a seguir estão divididas em quatro seções: Propriedades do consumidor, Configurações avançadas do consumidor, Propriedades do produtor e Configurações avançadas do produtor.

Propriedades de consumo

Estas propriedades são expostas através de EventHubsConsumerProperties.

Observação

Para evitar repetições, desde as versões 4.19.0 e 5.19.0, o Spring Cloud Azure Stream Binder Event Hubs oferece suporte à configuração de valores para todos os canais, no formato de spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Propriedades configuráveis pelo consumidor de spring-cloud-azure-stream-binder-eventhubs:

Propriedade Tipo Descrição
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Modo de ponto de verificação usado quando o consumidor decide como enviar a mensagem de ponto de verificação
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Inteiro Decide a quantidade de mensagem para cada partição para fazer um ponto de verificação. Entrará em vigor somente quando PARTITION_COUNT modo de ponto de verificação for usado.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Duração Decide o intervalo de tempo para fazer um ponto de verificação. Entrará em vigor somente quando TIME modo de ponto de verificação for usado.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Inteiro O número máximo de eventos em um lote. Necessário para o modo batch-consumer.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-tempo de espera Duração A duração máxima do consumo em lote. Entrará em vigor somente quando o modo batch-consumer estiver ativado e for opcional.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Duração A duração do intervalo de tempo para atualização.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy A estratégia de balanceamento de carga.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Duração A duração do tempo após o qual a propriedade da partição expira.
spring.cloud.stream.eventhubs.bindings.binding-name.consumerpropriedades .track-last-enqueued-event-properties Booleano Se o processador de eventos deve solicitar informações sobre o último evento enfileirado em sua partição associada e controlar essas informações à medida que os eventos são recebidos.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Inteiro A contagem usada pelo consumidor para controlar o número de eventos que o consumidor do Hub de Eventos receberá ativamente e enfileirará localmente.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Mapeie com a chave como ID da partição e valores de StartPositionProperties O mapa que contém a posição do evento a ser usada para cada partição se um ponto de verificação para a partição não existir no armazenamento de pontos de verificação. Este mapa é chaveado fora do ID da partição.

Observação

A configuração initial-partition-event-position aceita um map para especificar a posição inicial para cada hub de eventos. Assim, sua chave é o ID da partição, e o valor é de StartPositionProperties, que inclui propriedades de deslocamento, número de sequência, data e hora enfileirada e se inclusive. Por exemplo, você pode defini-lo como

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Configuração avançada do consumidor

Osde conexão de acima, ponto de verificaçãoe cliente comum do SDK do Azure personalização de suporte à configuração para cada consumidor de fichário, que você pode configurar com o prefixo .

Propriedades do produtor

Estas propriedades são expostas através de EventHubsProducerProperties.

Observação

Para evitar repetições, desde as versões 4.19.0 e 5.19.0, o Spring Cloud Azure Stream Binder Event Hubs oferece suporte à configuração de valores para todos os canais, no formato de spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Propriedades configuráveis do produtor de spring-cloud-azure-stream-binder-eventhubs:

Propriedade Tipo Descrição
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync Booleano O sinalizador de switch para sincronização do produtor. Se verdadeiro, o produtor aguardará uma resposta após uma operação de envio.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout Longo A quantidade de tempo para aguardar uma resposta após uma operação de envio. Terá efeito somente quando um produtor de sincronização estiver habilitado.
Configuração avançada do produtor

O de conexão de acima e cliente comum do SDK do Azure personalização de suporte à configuração para cada produtor de fichário, que você pode configurar com o prefixo .

Utilização básica

Enviar e receber mensagens de/para Hubs de Eventos

  1. Preencha as opções de configuração com informações de credencial.

    • Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Para credenciais como entidade de serviço, configure as seguintes propriedades no arquivo application.yml:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Observação

Os valores permitidos para tenant-id são: common, organizations, consumersou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e de organização) de AADSTS50020 de erro - A conta de usuário do provedor de identidade não existe nodo locatário . Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  • Para credenciais como identidades gerenciadas, configure as seguintes propriedades em seu arquivo application.yml:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Definir fornecedor e consumidor.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Suporte de particionamento

Um PartitionSupplier com informações de partição fornecidas pelo usuário é criado para configurar as informações de partição sobre a mensagem a ser enviada. O fluxograma a seguir mostra o processo de obtenção de diferentes prioridades para o ID e a chave da partição:

Diagrama mostrando um fluxograma do processo de suporte ao particionamento.

Suporte ao consumidor em lote

  1. Forneça as opções de configuração de lote, conforme mostrado no exemplo a seguir:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Definir fornecedor e consumidor.

    Para o modo de ponto de verificação como BATCH, você pode usar o código a seguir para enviar mensagens e consumir em lotes.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Para o modo de ponto de verificação como MANUAL, você pode usar o código a seguir para enviar mensagens e consumir/ponto de verificação em lotes.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Observação

No modo de consumo em lote, o tipo de conteúdo padrão do fichário do Spring Cloud Stream é application/json, portanto, verifique se a carga útil da mensagem está alinhada com o tipo de conteúdo. Por exemplo, ao usar o tipo de conteúdo padrão de application/json para receber mensagens com String carga útil, a carga deve ser JSON String, cercada de aspas duplas para o texto String original. Enquanto para text/plain tipo de conteúdo, ele pode ser um objeto String diretamente. Para obter mais informações, consulte Spring Cloud Stream Content Type Negotiation.

Manipular mensagens de erro

  • Manipular mensagens de erro de vinculação de saída

    Por padrão, o Spring Integration cria um canal de erro global chamado errorChannel. Configure o ponto de extremidade de mensagem a seguir para lidar com mensagens de erro de vinculação de saída.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Manipular mensagens de erro de vinculação de entrada

    O Spring Cloud Stream Event Hubs Binder suporta uma solução para lidar com erros para as ligações de mensagens de entrada: manipuladores de erros.

    Manipulador de erros:

    O Spring Cloud Stream expõe um mecanismo para você fornecer um manipulador de erros personalizado adicionando um Consumer que aceita instâncias ErrorMessage. Para obter mais informações, consulte Manipular mensagens de erro na documentação do Spring Cloud Stream.

    • Manipulador de erro de vinculação-padrão

      Configure um único bean de Consumer para consumir todas as mensagens de erro de vinculação de entrada. A seguinte função padrão se inscreve em cada canal de erro de vinculação de entrada:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Você também precisa definir a propriedade spring.cloud.stream.default.error-handler-definition para o nome da função.

    • Manipulador de erros específico da vinculação

      Configure um Consumer bean para consumir as mensagens de erro de vinculação de entrada específicas. A função a seguir se inscreve no canal de erro de vinculação de entrada específico e tem uma prioridade maior do que o manipulador de erro vinculante-padrão:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Você também precisa definir a propriedade spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition para o nome da função.

Cabeçalhos de mensagem dos Hubs de Eventos

Para obter os cabeçalhos de mensagem básicos suportados, consulte a seção cabeçalhos de mensagem dos Hubs de Eventos de suporte do Spring Cloud Azure para o Spring Integration.

Suporte a vários fichários

A conexão com vários namespaces de Hubs de Eventos também é suportada usando vários fichários. Este exemplo usa uma cadeia de conexão como exemplo. Credenciais de entidades de serviço e identidades gerenciadas também são suportadas. Você pode definir propriedades relacionadas nas configurações de ambiente de cada fichário.

  1. Para usar vários fichários com Hubs de Eventos, configure as seguintes propriedades no arquivo application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Observação

    O arquivo de aplicativo anterior mostra como configurar um único poller padrão para o aplicativo para todas as associações. Se quiser configurar o poller para uma ligação específica, você pode usar uma configuração como spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Precisamos definir dois fornecedores e dois consumidores:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Provisionamento de recursos

O fichário de Hubs de Eventos oferece suporte ao provisionamento de hub de eventos e grupo de consumidores, os usuários podem usar as seguintes propriedades para habilitar o provisionamento.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Observação

Os valores permitidos para tenant-id são: common, organizations, consumersou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e de organização) de AADSTS50020 de erro - A conta de usuário do provedor de identidade não existe nodo locatário . Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

Amostras

Para obter mais informações, consulte o repositório azure-spring-boot-samples no GitHub.

Spring Cloud Stream Binder para o Barramento de Serviço do Azure

Conceitos-chave

O Spring Cloud Stream Binder for Azure Service Bus fornece a implementação de vinculação para o Spring Cloud Stream Framework. Esta implementação usa adaptadores de canal do Spring Integration Service Bus em sua base.

Mensagem agendada

Este fichário suporta o envio de mensagens para um tópico para processamento atrasado. Os usuários podem enviar mensagens agendadas com cabeçalho x-delay expressando em milissegundos um tempo de atraso para a mensagem. A mensagem será entregue aos respetivos tópicos após x-delay milissegundos.

Grupo de consumidores

O Service Bus Topic fornece suporte semelhante ao grupo de consumidores do Apache Kafka, mas com uma lógica ligeiramente diferente. Este aglutinante baseia-se na Subscription de um tópico para atuar como um grupo de consumidores.

Configuração de dependência

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Como alternativa, você também pode usar o Spring Cloud Azure Stream Service Bus Starter, conforme mostrado no exemplo a seguir para o Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Configuração

O fichário fornece as seguintes duas partes das opções de configuração:

Propriedades de configuração de conexão

Esta seção contém as opções de configuração usadas para se conectar ao Barramento de Serviço do Azure.

Observação

Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar acesso com o Microsoft Entra ID para verificar se a entidade de segurança recebeu a permissão suficiente para acessar o recurso do Azure.

Propriedades configuráveis de conexão de spring-cloud-azure-stream-binder-servicebus:

Propriedade Tipo Descrição
spring.cloud.azure.servicebus.enabled Booleano Se um Barramento de Serviço do Azure está habilitado.
spring.cloud.azure.servicebus.connection-string String Valor da cadeia de conexão do Namespace do Service Bus.
spring.cloud.azure.servicebus.custom-endpoint-address String O endereço de ponto de extremidade personalizado a ser usado ao se conectar ao Service Bus.
spring.cloud.azure.servicebus.namespace String Valor de Namespace do Service Bus, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name String Nome de domínio de um valor de Namespace do Barramento de Serviço do Azure.

Observação

As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o fichário do Spring Cloud Azure Stream Service Bus. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure. ou o prefixo de spring.cloud.azure.servicebus..

O fichário também dá suporte a Spring Could Azure Resource Manager por padrão. Para saber como recuperar a cadeia de conexão com entidades de segurança que não são concedidas com Data funções relacionadas, consulte a seção de uso básico de Spring Could Azure Resource Manager.

Propriedades de configuração de vinculação do Barramento de Serviço do Azure

As opções a seguir estão divididas em quatro seções: Propriedades do consumidor, Configurações avançadas do consumidor, Propriedades do produtor e Configurações avançadas do produtor.

Propriedades de consumo

Estas propriedades são expostas através de ServiceBusConsumerProperties.

Observação

Para evitar repetições, desde as versões 4.19.0 e 5.19.0, o Spring Cloud Azure Stream Binder Service Bus suporta a definição de valores para todos os canais, no formato de spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Propriedades configuráveis pelo consumidor de spring-cloud-azure-stream-binder-servicebus:

Propriedade Tipo Inadimplência Descrição
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected Booleano falso Se as mensagens com falha forem roteadas para o DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Inteiro 1 Max mensagens simultâneas que o cliente do processador do Service Bus deve processar. Quando a sessão está ativada, aplica-se a cada sessão.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Inteiro nulo Número máximo de sessões simultâneas a serem processadas a qualquer momento.
spring.cloud.stream.servicebus.bindings.binding-name.consumerhabilitado para .session Booleano nulo Se a sessão está habilitada.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Inteiro 0 A contagem de pré-busca do cliente do processador do Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.subfila Subfila nenhum O tipo de subfila à qual se conectar.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Duração 5 metros A quantidade de tempo para continuar a renovação automática do bloqueio.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock O modo de recebimento do cliente do processador do Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Booleano verdadeiro Se as mensagens devem ser liquidadas automaticamente. Se definido como false, um cabeçalho de mensagem de Checkpointer será adicionado para permitir que os desenvolvedores liquidem as mensagens manualmente.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes Longo 1024 O tamanho máximo da fila/tópico em megabytes, que é o tamanho da memória alocada para a fila/tópico.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Duração P10675199DT2H48M5.4775807S. (10675199 dias, 2 horas, 48 minutos, 5 segundos e 477 milissegundos) A duração após a qual a mensagem expira, a partir de quando a mensagem é enviada para o Service Bus.

Importante

Ao usar o Azure Resource Manager (ARM), você deve configurar a propriedade spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type. Para obter mais informações, consulte o exemplo de servicebus-queue-binder-arm no GitHub.

Configuração avançada do consumidor

O de conexão de acima e cliente comum do SDK do Azure a personalização de configuração para cada consumidor de fichário, que você pode configurar com o prefixo .

Propriedades do produtor

Estas propriedades são expostas através de ServiceBusProducerProperties.

Observação

Para evitar repetições, desde as versões 4.19.0 e 5.19.0, o Spring Cloud Azure Stream Binder Service Bus suporta a definição de valores para todos os canais, no formato de spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Propriedades configuráveis do produtor de spring-cloud-azure-stream-binder-servicebus:

Propriedade Tipo Inadimplência Descrição
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync Booleano falso Sinalizador de switch para sincronização do produtor.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout Longo 10000 Valor de tempo limite para envio do produtor.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType nulo Tipo de entidade do Service Bus do produtor, necessário para o produtor de ligação.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes Longo 1024 O tamanho máximo da fila/tópico em megabytes, que é o tamanho da memória alocada para a fila/tópico.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Duração P10675199DT2H48M5.4775807S. (10675199 dias, 2 horas, 48 minutos, 5 segundos e 477 milissegundos) A duração após a qual a mensagem expira, a partir de quando a mensagem é enviada para o Service Bus.

Importante

Ao usar o produtor de ligação, a propriedade de spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type deve ser configurada.

Configuração avançada do produtor

O de conexão de acima e cliente comum do SDK do Azure personalização de suporte à configuração para cada produtor de fichário, que você pode configurar com o prefixo .

Utilização básica

Enviar e receber mensagens de/para o Service Bus

  1. Preencha as opções de configuração com informações de credencial.

    • Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Para credenciais como entidade de serviço, configure as seguintes propriedades no arquivo application.yml:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Observação

Os valores permitidos para tenant-id são: common, organizations, consumersou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e de organização) de AADSTS50020 de erro - A conta de usuário do provedor de identidade não existe nodo locatário . Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  • Para credenciais como identidades gerenciadas, configure as seguintes propriedades em seu arquivo application.yml:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Definir fornecedor e consumidor.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Suporte de chave de partição

O fichário suporta de particionamento do Service Bus, permitindo definir a chave de partição e o ID da sessão no cabeçalho da mensagem. Esta seção apresenta como definir a chave de partição para mensagens.

O Spring Cloud Stream fornece uma propriedade de expressão SpEL de chave de partição spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Por exemplo, definir essa propriedade como "'partitionKey-' + headers[<message-header-key>]" e adicionar um cabeçalho chamado message-header-key. O Spring Cloud Stream usa o valor desse cabeçalho ao avaliar a expressão para atribuir uma chave de partição. O código a seguir fornece um exemplo de produtor:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Suporte de sessão

O fichário suporta sessões de mensagens do Service Bus. O ID de sessão de uma mensagem pode ser definido através do cabeçalho da mensagem.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Observação

De acordo com de particionamento do Service Bus, o ID da sessão tem prioridade maior do que a chave de partição. Assim, quando os cabeçalhos ServiceBusMessageHeaders#SESSION_ID e ServiceBusMessageHeaders#PARTITION_KEY são definidos, o valor do ID da sessão é eventualmente usado para substituir o valor da chave de partição.

Manipular mensagens de erro

  • Manipular mensagens de erro de vinculação de saída

    Por padrão, o Spring Integration cria um canal de erro global chamado errorChannel. Configure o ponto de extremidade da seguinte mensagem para manipular a mensagem de erro de vinculação de saída.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Manipular mensagens de erro de vinculação de entrada

    O Spring Cloud Stream Service Bus Binder suporta duas soluções para lidar com erros para as ligações de mensagens de entrada: o manipulador de erros do fichário e os manipuladores.

    Manipulador de erro do fichário:

    O manipulador de erros de fichário padrão lida com a associação de entrada. Use esse manipulador para enviar mensagens com falha para a fila de mensagens mortas quando spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected estiver habilitado. Caso contrário, as mensagens com falha serão abandonadas. O manipulador de erros do fichário é mutuamente exclusivo com outros manipuladores de erro fornecidos.

    Manipulador de erros:

    O Spring Cloud Stream expõe um mecanismo para você fornecer um manipulador de erros personalizado adicionando um Consumer que aceita instâncias ErrorMessage. Para obter mais informações, consulte Manipular mensagens de erro na documentação do Spring Cloud Stream.

    • Manipulador de erro de vinculação-padrão

      Configure um único bean de Consumer para consumir todas as mensagens de erro de vinculação de entrada. A seguinte função padrão se inscreve em cada canal de erro de vinculação de entrada:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Você também precisa definir a propriedade spring.cloud.stream.default.error-handler-definition para o nome da função.

    • Manipulador de erros específico da vinculação

      Configure um Consumer bean para consumir as mensagens de erro de vinculação de entrada específicas. A função a seguir se inscreve no canal de erro de vinculação de entrada específico com uma prioridade maior do que o manipulador de erro de vinculação padrão.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Você também precisa definir a propriedade spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition para o nome da função.

Cabeçalhos de mensagens do Barramento de Serviço

Para obter os cabeçalhos de mensagem básicos suportados, consulte a seção cabeçalhos de mensagem do Service Bus do suporte do Spring Cloud Azure para o Spring Integration.

Observação

Ao definir a chave de partição, a prioridade do cabeçalho da mensagem é maior do que a propriedade do Spring Cloud Stream. Portanto, spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression só entra em vigor quando nenhum dos cabeçalhos ServiceBusMessageHeaders#SESSION_ID e ServiceBusMessageHeaders#PARTITION_KEY estiver configurado.

Suporte a vários fichários

A conexão com vários namespaces do Service Bus também é suportada usando vários fichários. Este exemplo usa a cadeia de conexão como exemplo. Credenciais de entidades de serviço e identidades gerenciadas também são suportadas, os usuários podem definir propriedades relacionadas nas configurações de ambiente de cada fichário.

  1. Para usar vários fichários do ServiceBus, configure as seguintes propriedades no arquivo application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Observação

    O arquivo de aplicativo anterior mostra como configurar um único poller padrão para o aplicativo para todas as associações. Se quiser configurar o poller para uma ligação específica, você pode usar uma configuração como spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. precisamos definir dois fornecedores e dois consumidores

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Provisionamento de recursos

O fichário do barramento de serviço suporta o provisionamento de fila, tópico e assinatura, os usuários podem usar as seguintes propriedades para habilitar o provisionamento.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Observação

Os valores permitidos para tenant-id são: common, organizations, consumersou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e de organização) de AADSTS50020 de erro - A conta de usuário do provedor de identidade não existe nodo locatário . Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

Personalizar as propriedades do cliente do Service Bus

Os desenvolvedores podem usar AzureServiceClientBuilderCustomizer para personalizar as propriedades do Service Bus Client. O exemplo a seguir personaliza a propriedade sessionIdleTimeout no ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Amostras

Para obter mais informações, consulte o repositório azure-spring-boot-samples no GitHub.