Compartilhar via


Suporte do Spring Cloud Azure para Spring Cloud Stream

Este artigo se aplica a:✅ versão 4.19.0 ✅ versão 5.18.0

O Spring Cloud Stream é uma estrutura para criar microsserviços altamente escalonáveis controlados por eventos conectados com sistemas de mensagens compartilhadas.

A estrutura fornece um modelo de programação flexível baseado em linguagens spring já estabelecidas e familiares e práticas recomendadas. Essas práticas recomendadas incluem suporte para semântica de pub/sub persistente, grupos de consumidores e partições com estado.

As implementações atuais do associador incluem:

Spring Cloud Stream Binder para Hubs de Eventos do Azure

Principais conceitos

O Spring Cloud Stream Binder para Hubs de Eventos do Azure fornece a implementação de associação para a estrutura do Spring Cloud Stream. Essa implementação usa adaptadores de canal dos Hubs de Eventos do Spring Integration em sua base. Do ponto de vista do design, os Hubs de Eventos são semelhantes ao Kafka. Além disso, os Hubs de Eventos podem ser acessados por meio da API do Kafka. Se o projeto tiver uma dependência apertada da API do Kafka, você poderá tentar Hub de Eventos com o Exemplo de API do Kafka

Grupo de consumidores

Os Hubs de Eventos fornecem suporte semelhante ao grupo de consumidores como o Apache Kafka, mas com uma lógica ligeiramente diferente. Embora o Kafka armazene todos os deslocamentos confirmados no agente, você precisa armazenar deslocamentos de mensagens dos Hubs de Eventos sendo processadas manualmente. O SDK dos Hubs de Eventos fornece a função para armazenar esses deslocamentos dentro do Armazenamento do Azure.

Suporte ao particionamento

Os Hubs de Eventos fornecem um conceito semelhante de partição física como Kafka. Mas, ao contrário do reequilíbrio automático do Kafka entre consumidores e partições, os Hubs de Eventos fornecem uma espécie de modo preemptivo. A conta de armazenamento atua como uma concessão para determinar qual consumidor possui qual partição. Quando um novo consumidor é iniciado, ele tenta roubar algumas partições dos consumidores mais carregados para obter o equilíbrio da carga de trabalho.

Para especificar a estratégia de balanceamento de carga, as propriedades de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* são fornecidas. Para obter mais informações, consulte a seção propriedades do consumidor .

Suporte ao consumidor do lote

O associador de Hubs de Eventos do Spring Cloud Azure Stream dá suporte recurso consumidor do Lote do Spring Cloud Stream.

Para trabalhar com o modo de consumidor em lote, defina a propriedade spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode como true. Quando habilitada, uma mensagem com uma carga de uma lista de eventos em lote é recebida e passada para a função Consumer. Cada cabeçalho de mensagem também é convertido em uma lista, da qual o conteúdo é o valor de cabeçalho associado analisado de cada evento. Os cabeçalhos comuns da ID da partição, do ponto de verificação e das últimas propriedades enfileiradas são apresentados como um único valor porque todo o lote de eventos compartilha o mesmo valor. Para obter mais informações, consulte os cabeçalhos de mensagens dos Hubs de Eventos seção de suporte do Spring Cloud Azure para o Spring Integration.

Nota

O cabeçalho de ponto de verificação só existe quando o modo de ponto de verificação MANUAL é usado.

O ponto de verificação do consumidor em lotes dá suporte a dois modos: BATCH e MANUAL. BATCH modo é um modo de ponto de verificação automático para verificar o lote inteiro de eventos juntos depois que o associador os recebe. MANUAL modo é verificar os eventos pelos usuários. Quando usado, o Checkpointer é passado para o cabeçalho da mensagem e os usuários podem usá-lo para fazer o ponto de verificação.

Você pode especificar o tamanho do lote definindo as propriedades max-size e max-wait-time que têm um prefixo de spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. A propriedade max-size é necessária e a propriedade max-wait-time é opcional. Para obter mais informações, consulte a seção propriedades do consumidor .

Configuração de dependência

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Como alternativa, você também pode usar o Spring Cloud Azure Stream Event Hubs Starter, conforme mostrado no exemplo a seguir para o Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Configuração

O associador fornece as três partes a seguir das opções de configuração:

Propriedades de configuração de conexão

Esta seção contém as opções de configuração usadas para se conectar aos Hubs de Eventos do Azure.

Nota

Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar o acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.

Propriedades configuráveis de conexão de spring-cloud-azure-stream-binder-eventhubs:

Propriedade Tipo Descrição
spring.cloud.azure.eventhubs.enabled booleano Se um Hub de Eventos do Azure está habilitado.
spring.cloud.azure.eventhubs.connection-string Corda Valor da cadeia de conexão do Namespace dos Hubs de Eventos.
spring.cloud.azure.eventhubs.namespace Corda Valor do Namespace dos Hubs de Eventos, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Corda Nome de domínio de um valor de namespace dos Hubs de Eventos do Azure.
spring.cloud.azure.eventhubs.custom-endpoint-address Corda Endereço do ponto de extremidade personalizado.

Ponta

As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o associador dos Hubs de Eventos do Azure Stream do Spring Cloud. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure. ou o prefixo de spring.cloud.azure.eventhubs..

O associador também dá suporte a Spring Could Azure Resource Manager por padrão. Para saber mais sobre como recuperar a cadeia de conexão com entidades de segurança que não são concedidas com funções relacionadas Data, consulte a seção Uso básico de Spring Could Azure Resource Manager.

Propriedades de configuração de ponto de verificação

Esta seção contém as opções de configuração para o serviço Blobs de Armazenamento, que é usado para persistir a propriedade da partição e informações de ponto de verificação.

Nota

Na versão 4.0.0, quando a propriedade de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists não estiver habilitada manualmente, nenhum contêiner de armazenamento será criado automaticamente com o nome de spring.cloud.stream.bindings.binding-name.destination.

Propriedades configuráveis de ponto de verificação do spring-cloud-azure-stream-binder-eventhubs:

Propriedade Tipo Descrição
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleano Se deseja permitir a criação de contêineres se não existir.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Corda Nome da conta de armazenamento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Corda Chave de acesso da conta de armazenamento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Corda Nome do contêiner de armazenamento.

Ponta

As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o repositório de ponto de verificação de Blob de Armazenamento. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure. ou o prefixo de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propriedades de configuração de associação dos Hubs de Eventos do Azure

As opções a seguir são divididas em quatro seções: Propriedades do Consumidor, Configurações Avançadas do Consumidor, Propriedades do Produtor e Configurações Avançadas de Produtor.

Propriedades do consumidor

Essas propriedades são expostas por meio de EventHubsConsumerProperties.

Nota

Para evitar repetição, uma vez que as versões 4.19.0 e 5.18.0, os Hubs de Eventos do Azure Stream Binder do Spring Cloud dão suporte à definição de valores para todos os canais, no formato de spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Propriedades configuráveis do consumidor de spring-cloud-azure-stream-binder-eventhubs:

Propriedade Tipo Descrição
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Modo de ponto de verificação usado quando o consumidor decide como colocar a mensagem de ponto de verificação
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Inteiro Decide a quantidade de mensagem para cada partição fazer um ponto de verificação. Entrará em vigor somente quando PARTITION_COUNT modo de ponto de verificação for usado.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Duração Decide o intervalo de tempo para fazer um ponto de verificação. Entrará em vigor somente quando TIME modo de ponto de verificação for usado.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Inteiro O número máximo de eventos em um lote. Necessário para o modo de consumidor em lote.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Duração A duração máxima do tempo para consumo em lote. Entrará em vigor somente quando o modo de consumidor em lote estiver habilitado e for opcional.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Duração A duração do tempo de intervalo para atualização.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy A estratégia de balanceamento de carga.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Duração A duração do tempo após a qual a propriedade da partição expira.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Booleano Se o processador de eventos deve solicitar informações sobre o último evento enfileirado em sua partição associada e acompanhar essas informações à medida que os eventos são recebidos.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Inteiro A contagem usada pelo consumidor para controlar o número de eventos que o consumidor do Hub de Eventos receberá e enfileirará localmente.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Mapeie com a chave como a ID da partição e os valores de StartPositionProperties O mapa que contém a posição do evento a ser usada para cada partição se um ponto de verificação para a partição não existir no repositório de pontos de verificação. Esse mapa é retirado da ID da partição.

Nota

A configuração de initial-partition-event-position aceita uma map para especificar a posição inicial para cada hub de eventos. Portanto, sua chave é a ID da partição e o valor é de StartPositionProperties, que inclui propriedades de deslocamento, número de sequência, data e inclusão. Por exemplo, você pode defini-lo como

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Configuração avançada do consumidor

Ode conexão acima, de ponto de verificação e cliente do SDK do Azure comum a personalização de suporte para cada consumidor associador, que você pode configurar com o prefixo .

Propriedades do produtor

Essas propriedades são expostas por meio de EventHubsProducerProperties.

Nota

Para evitar repetição, uma vez que as versões 4.19.0 e 5.18.0, os Hubs de Eventos do Azure Stream Binder do Spring Cloud dão suporte à definição de valores para todos os canais, no formato de spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Propriedades configuráveis do produtor de spring-cloud-azure-stream-binder-eventhubs:

Propriedade Tipo Descrição
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync booleano O sinalizador de comutador para sincronização do produtor. Se for true, o produtor aguardará uma resposta após uma operação de envio.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout Longas A quantidade de tempo para aguardar uma resposta após uma operação de envio. Entrará em vigor somente quando um produtor de sincronização estiver habilitado.
Configuração avançada do produtor

O de conexão acima e cliente do SDK do Azure comum a personalização de suporte para cada produtor associador, que você pode configurar com o prefixo .

Uso básico

Enviar e receber mensagens de/para Hubs de Eventos

  1. Preencha as opções de configuração com informações de credencial.

    • Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Para credenciais como entidade de serviço, configure as seguintes propriedades em seu arquivo application.yml:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Nota

Os valores permitidos para tenant-id são: common, organizations, consumersou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  • Para credenciais como identidades gerenciadas, configure as seguintes propriedades em seu arquivo application.yml:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Definir fornecedor e consumidor.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Suporte ao particionamento

Um PartitionSupplier com informações de partição fornecidas pelo usuário é criado para configurar as informações de partição sobre a mensagem a ser enviada. O fluxograma a seguir mostra o processo de obtenção de prioridades diferentes para a ID de partição e a chave:

Diagrama mostrando um fluxograma do processo de suporte de particionamento.

Suporte ao consumidor do lote

  1. Forneça as opções de configuração em lote, conforme mostrado no exemplo a seguir:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Definir fornecedor e consumidor.

    Para o modo de ponto de verificação como BATCH, você pode usar o código a seguir para enviar mensagens e consumir em lotes.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Para o modo de ponto de verificação como MANUAL, você pode usar o código a seguir para enviar mensagens e consumir/ponto de verificação em lotes.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Nota

No modo de consumo em lote, o tipo de conteúdo padrão do associador spring cloud stream é application/json, portanto, verifique se o conteúdo da mensagem está alinhado com o tipo de conteúdo. Por exemplo, ao usar o tipo de conteúdo padrão de application/json para receber mensagens com String conteúdo, o conteúdo deve ser JSON String, cercado de aspas duplas para o texto de String original. Embora para text/plain tipo de conteúdo, ele pode ser um objeto String diretamente. Para obter mais informações, consulte de negociação de tipo de conteúdo do Spring Cloud Stream.

Manipular mensagens de erro

  • do Azure 5.x do Spring Cloud
  • do Azure 4.x do Spring Cloud
  • Manipular mensagens de erro de associação de saída

    Por padrão, o Spring Integration cria um canal de erro global chamado errorChannel. Configure o ponto de extremidade de mensagem a seguir para lidar com mensagens de erro de associação de saída.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Manipular mensagens de erro de associação de entrada

    O Spring Cloud Stream Event Hubs Binder dá suporte a uma solução para lidar com erros para as associações de mensagens de entrada: manipuladores de erros.

    do Manipulador de Erros:

    O Spring Cloud Stream expõe um mecanismo para que você forneça um manipulador de erros personalizado adicionando um Consumer que aceita ErrorMessage instâncias. Para obter mais informações, consulte manipular mensagens de erro na documentação do Spring Cloud Stream.

    • Manipulador de erros padrão de associação

      Configure um único Consumer bean para consumir todas as mensagens de erro de associação de entrada. A função padrão a seguir assina cada canal de erro de associação de entrada:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Você também precisa definir a propriedade spring.cloud.stream.default.error-handler-definition como o nome da função.

    • Manipulador de erros específico à associação

      Configure um Consumer bean para consumir as mensagens de erro de associação de entrada específicas. A função a seguir assina o canal de erro de associação de entrada específico e tem uma prioridade maior do que o manipulador de erros padrão de associação:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Você também precisa definir a propriedade spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition como o nome da função.

Cabeçalhos de mensagem dos Hubs de Eventos

Para obter os cabeçalhos de mensagem básicos com suporte, consulte os cabeçalhos de mensagens dos Hubs de Eventos seção de suporte do Spring Cloud Azure para o Spring Integration.

Suporte a vários associados

A conexão com vários namespaces de Hubs de Eventos também tem suporte usando vários associadores. Este exemplo usa uma cadeia de conexão como exemplo. Credenciais de entidades de serviço e identidades gerenciadas também são compatíveis. Você pode definir propriedades relacionadas nas configurações de ambiente de cada associador.

  1. Para usar vários associadores com Hubs de Eventos, configure as seguintes propriedades em seu arquivo de application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nota

    O arquivo de aplicativo anterior mostra como configurar um único sondador padrão para aplicativo para todas as associações. Se você quiser configurar o poller para uma associação específica, poderá usar uma configuração como spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Precisamos definir dois fornecedores e dois consumidores:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Provisionamento de recursos

O associador de Hubs de Eventos dá suporte ao provisionamento do hub de eventos e do grupo de consumidores, os usuários podem usar as propriedades a seguir para habilitar o provisionamento.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Nota

Os valores permitidos para tenant-id são: common, organizations, consumersou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

Amostras

Para obter mais informações, consulte o azure-spring-boot-samples repositório no GitHub.

Spring Cloud Stream Binder para Barramento de Serviço do Azure

Principais conceitos

O Spring Cloud Stream Binder para Barramento de Serviço do Azure fornece a implementação de associação para o Spring Cloud Stream Framework. Essa implementação usa adaptadores de canal do Barramento de Serviço do Spring Integration em sua base.

Mensagem agendada

Esse associador dá suporte ao envio de mensagens para um tópico para processamento atrasado. Os usuários podem enviar mensagens agendadas com cabeçalho x-delay expressando em milissegundos um tempo de atraso para a mensagem. A mensagem será entregue aos respectivos tópicos após x-delay milissegundos.

Grupo de consumidores

O Tópico do Barramento de Serviço fornece suporte semelhante ao grupo de consumidores como o Apache Kafka, mas com uma lógica ligeiramente diferente. Esse associador depende de Subscription de um tópico para atuar como um grupo de consumidores.

Configuração de dependência

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Como alternativa, você também pode usar o Spring Cloud Azure Stream Service Bus Starter, conforme mostrado no exemplo a seguir para o Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Configuração

O associador fornece as duas seguintes partes das opções de configuração:

Propriedades de configuração de conexão

Esta seção contém as opções de configuração usadas para se conectar ao Barramento de Serviço do Azure.

Nota

Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar o acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.

Propriedades configuráveis de conexão do spring-cloud-azure-stream-binder-servicebus:

Propriedade Tipo Descrição
spring.cloud.azure.servicebus.enabled booleano Se um Barramento de Serviço do Azure está habilitado.
spring.cloud.azure.servicebus.connection-string Corda Valor da cadeia de conexão do Namespace do Barramento de Serviço.
spring.cloud.azure.servicebus.custom-endpoint-address Corda O endereço de ponto de extremidade personalizado a ser usado ao se conectar ao Barramento de Serviço.
spring.cloud.azure.servicebus.namespace Corda Valor do Namespace do Barramento de Serviço, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Corda Nome de domínio de um valor de namespace do Barramento de Serviço do Azure.

Nota

As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o associador do Barramento de Serviço de Fluxo do Azure no Spring Cloud. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure. ou o prefixo de spring.cloud.azure.servicebus..

O associador também dá suporte a Spring Could Azure Resource Manager por padrão. Para saber mais sobre como recuperar a cadeia de conexão com entidades de segurança que não são concedidas com funções relacionadas Data, consulte a seção Uso básico de Spring Could Azure Resource Manager.

Propriedades de configuração de associação do Barramento de Serviço do Azure

As opções a seguir são divididas em quatro seções: Propriedades do Consumidor, Configurações Avançadas do Consumidor, Propriedades do Produtor e Configurações Avançadas de Produtor.

Propriedades do consumidor

Essas propriedades são expostas por meio de ServiceBusConsumerProperties.

Nota

Para evitar repetição, uma vez que as versões 4.19.0 e 5.18.0, o Barramento de Serviço do Azure Stream Binder do Spring Cloud dá suporte à definição de valores para todos os canais, no formato de spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Propriedades configuráveis do consumidor do spring-cloud-azure-stream-binder-servicebus:

Propriedade Tipo Inadimplência Descrição
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected booleano falso Se as mensagens com falha forem roteadas para o DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Inteiro 1 Mensagens simultâneas máximas que o cliente do processador do Barramento de Serviço deve processar. Quando a sessão está habilitada, ela se aplica a cada sessão.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Inteiro zero Número máximo de sessões simultâneas a serem processadas a qualquer momento.
spring.cloud.stream.servicebus.bindings.binding-name.consumerhabilitado para .session Booleano zero Se a sessão está habilitada.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Inteiro 0 A contagem de pré-busca do cliente do processador do Barramento de Serviço.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue SubQueue nenhum O tipo da sub fila à qual se conectar.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Duração 5m A quantidade de tempo para continuar renovando automaticamente o bloqueio.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock O modo de recebimento do cliente do processador do Barramento de Serviço.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Booleano verdadeiro Se as mensagens serão resolvidas automaticamente. Se definido como false, um cabeçalho de mensagem de Checkpointer será adicionado para permitir que os desenvolvedores resolvam mensagens manualmente.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes long 1024 O tamanho máximo da fila/tópico em megabytes, que é o tamanho da memória alocada para a fila/tópico.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Duração P10675199DT2H48M5.4775807S. (10675199 dias, 2 horas, 48 minutos, 5 segundos e 477 milissegundos) A duração após a qual a mensagem expira, começando de quando a mensagem é enviada para o Barramento de Serviço.

Importante

Ao usar o ARM (Azure Resource Manager), você deve configurar a propriedade spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type. Para obter mais informações, consulte o exemplo servicebus-queue-binder-arm no GitHub.

Configuração avançada do consumidor

O de conexão acima e cliente do SDK do Azure comum a personalização de suporte para cada consumidor associador, que você pode configurar com o prefixo .

Propriedades do produtor

Essas propriedades são expostas por meio de ServiceBusProducerProperties.

Nota

Para evitar repetição, uma vez que as versões 4.19.0 e 5.18.0, o Barramento de Serviço do Azure Stream Binder do Spring Cloud dá suporte à definição de valores para todos os canais, no formato de spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Propriedades configuráveis do produtor do spring-cloud-azure-stream-binder-servicebus:

Propriedade Tipo Inadimplência Descrição
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync booleano falso Sinalizador de alternância para sincronização do produtor.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout Longas 10000 Valor de tempo limite para envio do produtor.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType zero Tipo de entidade do Barramento de Serviço do produtor, necessário para o produtor de associação.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes long 1024 O tamanho máximo da fila/tópico em megabytes, que é o tamanho da memória alocada para a fila/tópico.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Duração P10675199DT2H48M5.4775807S. (10675199 dias, 2 horas, 48 minutos, 5 segundos e 477 milissegundos) A duração após a qual a mensagem expira, começando de quando a mensagem é enviada para o Barramento de Serviço.

Importante

Ao usar o produtor de associação, é necessário configurar a propriedade de spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type.

Configuração avançada do produtor

O de conexão acima e cliente do SDK do Azure comum a personalização de suporte para cada produtor associador, que você pode configurar com o prefixo .

Uso básico

Enviar e receber mensagens de/para o Barramento de Serviço

  1. Preencha as opções de configuração com informações de credencial.

    • Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Para credenciais como entidade de serviço, configure as seguintes propriedades em seu arquivo application.yml:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Nota

Os valores permitidos para tenant-id são: common, organizations, consumersou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  • Para credenciais como identidades gerenciadas, configure as seguintes propriedades em seu arquivo application.yml:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Definir fornecedor e consumidor.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Suporte à chave de partição

O associador dá suporte de particionamento do Barramento de Serviço, permitindo a definição da chave de partição e da ID da sessão no cabeçalho da mensagem. Esta seção apresenta como definir a chave de partição para mensagens.

O Spring Cloud Stream fornece uma propriedade de expressão SpEL de chave de partição spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Por exemplo, definir essa propriedade como "'partitionKey-' + headers[<message-header-key>]" e adicionar um cabeçalho chamado message-header-key. O Spring Cloud Stream usa o valor desse cabeçalho ao avaliar a expressão para atribuir uma chave de partição. O código a seguir fornece um produtor de exemplo:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Suporte à sessão

O associador dá suporte sessões de mensagem do Barramento de Serviço. A ID de sessão de uma mensagem pode ser definida por meio do cabeçalho da mensagem.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Nota

De acordo com de particionamento do Barramento de Serviço, a ID da sessão tem prioridade maior do que a chave de partição. Portanto, quando ambos os cabeçalhos ServiceBusMessageHeaders#SESSION_ID e ServiceBusMessageHeaders#PARTITION_KEY são definidos, o valor da ID da sessão é eventualmente usado para substituir o valor da chave de partição.

Manipular mensagens de erro

  • do Azure 5.x do Spring Cloud
  • do Azure 4.x do Spring Cloud
  • Manipular mensagens de erro de associação de saída

    Por padrão, o Spring Integration cria um canal de erro global chamado errorChannel. Configure o ponto de extremidade de mensagem a seguir para lidar com a mensagem de erro de associação de saída.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Manipular mensagens de erro de associação de entrada

    O Spring Cloud Stream Service Bus Binder dá suporte a duas soluções para lidar com erros para as associações de mensagens de entrada: o manipulador de erros e manipuladores do associador.

    do manipulador de erros do Binder:

    O manipulador de erros do associador padrão manipula a associação de entrada. Use esse manipulador para enviar mensagens com falha para a fila de mensagens mortas quando spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected estiver habilitado. Caso contrário, as mensagens com falha serão abandonadas. O manipulador de erros do associador é mutuamente exclusivo com outros manipuladores de erros fornecidos.

    do manipulador de erros do :

    O Spring Cloud Stream expõe um mecanismo para que você forneça um manipulador de erros personalizado adicionando um Consumer que aceita ErrorMessage instâncias. Para obter mais informações, consulte manipular mensagens de erro na documentação do Spring Cloud Stream.

    • Manipulador de erros padrão de associação

      Configure um único Consumer bean para consumir todas as mensagens de erro de associação de entrada. A função padrão a seguir assina cada canal de erro de associação de entrada:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Você também precisa definir a propriedade spring.cloud.stream.default.error-handler-definition como o nome da função.

    • Manipulador de erros específico à associação

      Configure um Consumer bean para consumir as mensagens de erro de associação de entrada específicas. A função a seguir assina o canal de erro de associação de entrada específico com uma prioridade maior do que o manipulador de erros padrão de associação.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Você também precisa definir a propriedade spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition como o nome da função.

Cabeçalhos de mensagem do Barramento de Serviço

Para obter os cabeçalhos de mensagem básicos com suporte, consulte os cabeçalhos de mensagem do Barramento de Serviço seção do suporte do Spring Cloud Azure para o Spring Integration.

Nota

Ao definir a chave de partição, a prioridade do cabeçalho da mensagem é maior do que a propriedade Spring Cloud Stream. Portanto, spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression entra em vigor somente quando nenhum dos cabeçalhos ServiceBusMessageHeaders#SESSION_ID e ServiceBusMessageHeaders#PARTITION_KEY estiver configurado.

Suporte a vários associados

A conexão com vários namespaces do Barramento de Serviço também tem suporte usando vários associadores. Este exemplo usa a cadeia de conexão como exemplo. Credenciais de entidades de serviço e identidades gerenciadas também têm suporte, os usuários podem definir propriedades relacionadas nas configurações de ambiente de cada associador.

  1. Para usar vários associadores do ServiceBus, configure as seguintes propriedades em seu arquivo application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nota

    O arquivo de aplicativo anterior mostra como configurar um único sondador padrão para aplicativo para todas as associações. Se você quiser configurar o poller para uma associação específica, poderá usar uma configuração como spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. precisamos definir dois fornecedores e dois consumidores

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Provisionamento de recursos

O associador do barramento de serviço dá suporte ao provisionamento de fila, tópico e assinatura, os usuários podem usar as propriedades a seguir para habilitar o provisionamento.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Nota

Os valores permitidos para tenant-id são: common, organizations, consumersou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

Personalizar propriedades do cliente do Barramento de Serviço

Os desenvolvedores podem usar AzureServiceClientBuilderCustomizer para personalizar as propriedades do Cliente do Barramento de Serviço. O exemplo a seguir personaliza a propriedade sessionIdleTimeout em ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Amostras

Para obter mais informações, consulte o azure-spring-boot-samples repositório no GitHub.