Condividi tramite


Supporto di Spring Cloud Azure per Spring Cloud Stream

Questo articolo si applica a:✅ versione 4.19.0 ✅ versione 5.19.0

Spring Cloud Stream è un framework per la creazione di microservizi altamente scalabili basati su eventi connessi con sistemi di messaggistica condivisa.

Il framework fornisce un modello di programmazione flessibile basato su idiomi e procedure consigliate Spring già stabilite e familiari. Queste procedure consigliate includono il supporto per semantica pub/sub persistente, gruppi di consumer e partizioni con stato.

Le implementazioni correnti del gestore di associazione includono:

Spring Cloud Stream Binder per Hub eventi di Azure

Concetti chiave

Spring Cloud Stream Binder per Hub eventi di Azure fornisce l'implementazione dell'associazione per il framework Spring Cloud Stream. Questa implementazione usa Spring Integration Event Hubs Channel Adapters alla sua base. Dal punto di vista della progettazione, Hub eventi è simile a Kafka. È anche possibile accedere a Hub eventi tramite l'API Kafka. Se il progetto ha una dipendenza stretta dall'API Kafka, è possibile provare Hub eventi con l'esempio di API Kafka

Gruppo di consumer

Hub eventi offre un supporto simile al gruppo di consumer di Apache Kafka, ma con logica leggermente diversa. Mentre Kafka archivia tutti gli offset di cui è stato eseguito il commit nel broker, è necessario archiviare gli offset dei messaggi di Hub eventi elaborati manualmente. Hub eventi SDK fornisce la funzione per archiviare tali offset all'interno di Archiviazione di Azure.

Supporto del partizionamento

Hub eventi offre un concetto simile a quello della partizione fisica di Kafka. Tuttavia, a differenza del ribilanciamento automatico di Kafka tra consumer e partizioni, Hub eventi offre una modalità preemptive. L'account di archiviazione funge da lease per determinare quale consumer possiede la partizione. All'avvio di un nuovo consumer, tenta di rubare alcune partizioni dai consumer più caricati per ottenere il bilanciamento del carico di lavoro.

Per specificare la strategia di bilanciamento del carico, vengono fornite proprietà di spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*. Per altre informazioni, vedere la sezione proprietà consumer.

Supporto consumer batch

Lo strumento di associazione spring cloud di Hub eventi di Flusso di Azure supporta funzionalità consumer batch di Spring Cloud Stream.

Per usare la modalità batch-consumer, impostare la proprietà spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode su true. Se abilitata, viene ricevuto un messaggio con un payload di un elenco di eventi in batch e passato alla funzione Consumer. Ogni intestazione di messaggio viene convertita anche in un elenco, di cui il contenuto è il valore di intestazione associato analizzato da ogni evento. Le intestazioni comuni di ID partizione, checkpointer e ultime proprietà accodate vengono presentate come un singolo valore perché l'intero batch di eventi condivide lo stesso valore. Per altre informazioni, vedere la sezione intestazioni dei messaggi di Hub eventi di Spring Cloud Azure per Spring Integration.

Nota

L'intestazione del checkpoint esiste solo quando viene usata la modalità di checkpoint MANUAL.

Il checkpoint del consumer batch supporta due modalità: BATCH e MANUAL. BATCH modalità è una modalità di checkpoint automatico per checkpoint dell'intero batch di eventi insieme dopo che il gestore di associazione li riceve. MANUAL modalità consiste nel checkpoint degli eventi da parte degli utenti. Se usato, il Checkpointer viene passato nell'intestazione del messaggio e gli utenti possono usarlo per eseguire il checkpoint.

È possibile specificare le dimensioni del batch impostando le proprietà max-size e max-wait-time con prefisso spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. La proprietà max-size è necessaria e la proprietà max-wait-time è facoltativa. Per altre informazioni, vedere la sezione proprietà consumer.

Configurazione delle dipendenze

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

In alternativa, è anche possibile usare Spring Cloud Azure Stream Event Hubs Starter, come illustrato nell'esempio seguente per Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Configurazione

Il binder fornisce le tre parti seguenti delle opzioni di configurazione:

Proprietà di configurazione della connessione

Questa sezione contiene le opzioni di configurazione usate per la connessione a Hub eventi di Azure.

Nota

Se si sceglie di usare un'entità di sicurezza per eseguire l'autenticazione e l'autorizzazione con Microsoft Entra ID per l'accesso a una risorsa di Azure, vedere Autorizzare l'accesso con Microsoft Entra ID per assicurarsi che all'entità di sicurezza sia stata concessa l'autorizzazione sufficiente per accedere alla risorsa di Azure.

Proprietà configurabili della connessione di spring-cloud-azure-stream-binder-eventhubs:

Proprietà Digitare Descrizione
spring.cloud.azure.eventhubsabilitato booleano Indica se hub eventi di Azure è abilitato.
spring.cloud.azure.eventhubsstringa di connessione Corda Valore della stringa di connessione dello spazio dei nomi di Hub eventi.
spring.cloud.azure.eventhubs.namespace Corda Valore spazio dei nomi di Hub eventi, ovvero il prefisso del nome di dominio completo. Un nome di dominio completo deve essere composto da NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Corda Nome di dominio di un valore dello spazio dei nomi di Hub eventi di Azure.
spring.cloud.azure.eventhubs.custom-endpoint-address Corda Indirizzo endpoint personalizzato.

Mancia

Le opzioni di configurazione di Common Azure Service SDK sono configurabili anche per lo strumento di associazione di Hub eventi di Stream di Spring Cloud. Le opzioni di configurazione supportate vengono introdotte in configurazione di Spring Cloud Azuree possono essere configurate con il prefisso unificato spring.cloud.azure. o il prefisso di spring.cloud.azure.eventhubs..

Il binder supporta anche Spring Could Azure Resource Manager per impostazione predefinita. Per informazioni su come recuperare la stringa di connessione con le entità di sicurezza non concesse con ruoli correlati Data, vedere la sezione utilizzo di base di Spring Potrebbe azure Resource Manager.

Proprietà di configurazione del checkpoint

Questa sezione contiene le opzioni di configurazione per il servizio BLOB di archiviazione, che viene usato per rendere persistenti le informazioni sulla proprietà della partizione e sul checkpoint.

Nota

Dalla versione 4.0.0, quando la proprietà di spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-exists non è abilitata manualmente, nessun contenitore di archiviazione verrà creato automaticamente con il nome da spring.cloud.stream.bindings.binding-name.destination.

Checkpoint delle proprietà configurabili di spring-cloud-azure-stream-binder-eventhubs:

Proprietà Digitare Descrizione
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-exists Booleano Indica se consentire la creazione di contenitori se non esiste.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Corda Nome dell'account di archiviazione.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Corda Chiave di accesso dell'account di archiviazione.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Corda Nome del contenitore di archiviazione.

Mancia

Le opzioni di configurazione di Common Azure Service SDK sono configurabili anche per l'archivio checkpoint BLOB di archiviazione. Le opzioni di configurazione supportate vengono introdotte in configurazione di Spring Cloud Azuree possono essere configurate con il prefisso unificato spring.cloud.azure. o il prefisso di spring.cloud.azure.eventhubs.processor.checkpoint-store.

Proprietà di configurazione dell'associazione di Hub eventi di Azure

Le opzioni seguenti sono suddivise in quattro sezioni: Proprietà consumer, Configurazioni consumer avanzate, Proprietà producer e Configurazioni producer avanzate.

Proprietà consumer

Queste proprietà vengono esposte tramite EventHubsConsumerProperties.

Nota

Per evitare ripetizioni, poiché la versione 4.19.0 e 5.19.0, Spring Cloud Stream Binder Event Hubs supporta l'impostazione dei valori per tutti i canali, nel formato di spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Proprietà configurabili dal consumer di spring-cloud-azure-stream-binder-eventhubs:

Proprietà Digitare Descrizione
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Modalità checkpoint usata quando l'utente decide come eseguire il checkpoint del messaggio
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Numero intero Decide la quantità di messaggio per ogni partizione per eseguire un checkpoint. Avrà effetto solo quando viene usata PARTITION_COUNT modalità checkpoint.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Durata Decide l'intervallo di tempo per eseguire un checkpoint. Avrà effetto solo quando viene usata TIME modalità checkpoint.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Numero intero Numero massimo di eventi in un batch. Obbligatorio per la modalità batch-consumer.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Durata Durata massima del tempo per l'utilizzo in batch. Avrà effetto solo quando la modalità batch-consumer è abilitata ed è facoltativa.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Durata Durata dell'intervallo per l'aggiornamento.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy Strategia di bilanciamento del carico.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Durata Durata dell'ora successiva alla quale scade la proprietà della partizione.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Booleano Indica se il processore di eventi deve richiedere informazioni sull'ultimo evento accodato nella partizione associata e tenere traccia di tali informazioni come eventi ricevuti.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Numero intero Il conteggio usato dal consumer per controllare il numero di eventi che il consumer dell'hub eventi riceverà e accoderà attivamente in locale.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Eseguire il mapping con la chiave come ID partizione e valori di StartPositionProperties Mappa contenente la posizione dell'evento da usare per ogni partizione se non esiste un checkpoint per la partizione nell'archivio checkpoint. Questa mappa viene chiaveta all'esterno dell'ID partizione.

Nota

La configurazione initial-partition-event-position accetta un map per specificare la posizione iniziale per ogni hub eventi. Pertanto, la chiave è l'ID partizione e il valore è di StartPositionProperties, che include proprietà di offset, numero di sequenza, data accodata e se inclusivo. Ad esempio, è possibile impostarlo come

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Configurazione consumer avanzata

Il connessione precedente, checkpointe client di Azure SDK comune la personalizzazione della configurazione per ogni consumer di binder, che è possibile configurare con il prefisso spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Proprietà del producer

Queste proprietà vengono esposte tramite EventHubsProducerProperties.

Nota

Per evitare ripetizioni, poiché la versione 4.19.0 e 5.19.0, Spring Cloud Stream Binder Event Hubs supporta l'impostazione dei valori per tutti i canali, nel formato di spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Proprietà configurabili dal producer di spring-cloud-azure-stream-binder-eventhubs:

Proprietà Digitare Descrizione
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync booleano Flag switch per la sincronizzazione del producer. Se true, il producer attenderà una risposta dopo un'operazione di invio.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout lungo Quantità di tempo di attesa per una risposta dopo un'operazione di invio. Avrà effetto solo quando è abilitato un producer di sincronizzazione.
Configurazione avanzata del producer

Il connessione precedente e client di Azure SDK comune la personalizzazione del supporto della configurazione per ogni producer di binder, che è possibile configurare con il prefisso spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Utilizzo di base

Invio e ricezione di messaggi da/a Hub eventi

  1. Compilare le opzioni di configurazione con le informazioni sulle credenziali.

    • Per le credenziali come stringa di connessione, configurare le proprietà seguenti nel file application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Per le credenziali come entità servizio, configurare le proprietà seguenti nel file application.yml:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Nota

I valori consentiti per tenant-id sono: common, organizations, consumerso l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.

  • Per le credenziali come identità gestite, configurare le proprietà seguenti nel file application.yml:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Definire i fornitori e i consumatori.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Supporto del partizionamento

Viene creata una PartitionSupplier con informazioni sulla partizione fornite dall'utente per configurare le informazioni sulla partizione sul messaggio da inviare. Il diagramma di flusso seguente illustra il processo di recupero di priorità diverse per l'ID e la chiave di partizione:

Diagramma che mostra un diagramma di flusso del processo di supporto del partizionamento.

Supporto consumer batch

  1. Specificare le opzioni di configurazione batch, come illustrato nell'esempio seguente:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Definire i fornitori e i consumatori.

    Per la modalità di checkpoint come BATCH, è possibile usare il codice seguente per inviare messaggi e utilizzarli in batch.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Per la modalità di checkpoint come MANUAL, è possibile usare il codice seguente per inviare messaggi e utilizzare/checkpoint in batch.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Nota

Nella modalità di utilizzo batch, il tipo di contenuto predefinito di Spring Cloud Stream Binder è application/json, quindi assicurarsi che il payload del messaggio sia allineato al tipo di contenuto. Ad esempio, quando si usa il tipo di contenuto predefinito di application/json per ricevere messaggi con payload String, il payload deve essere JSON String, racchiuso tra virgolette doppie per il testo originale String. Anche se per text/plain tipo di contenuto, può essere direttamente un oggetto String. Per altre informazioni, vedere Spring Cloud Stream Content Type Negotiation.

Gestire i messaggi di errore

  • Spring Cloud Azure 5.x
  • Spring Cloud Azure 4.x
  • Gestire i messaggi di errore di associazione in uscita

    Per impostazione predefinita, Spring Integration crea un canale di errore globale denominato errorChannel. Configurare l'endpoint del messaggio seguente per gestire i messaggi di errore di associazione in uscita.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Gestire i messaggi di errore di associazione in ingresso

    Spring Cloud Stream Event Hubs Binder supporta una soluzione per gestire gli errori per le associazioni di messaggi in ingresso: gestori degli errori.

    gestore errori:

    Spring Cloud Stream espone un meccanismo per fornire un gestore degli errori personalizzato aggiungendo un Consumer che accetta istanze di ErrorMessage. Per altre informazioni, vedere Gestire i messaggi di errore nella documentazione di Spring Cloud Stream.

    • Gestore degli errori predefinito dell'associazione

      Configurare un singolo Consumer bean per utilizzare tutti i messaggi di errore di associazione in ingresso. La funzione predefinita seguente sottoscrive ogni canale di errore di associazione in ingresso:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      È anche necessario impostare la proprietà spring.cloud.stream.default.error-handler-definition sul nome della funzione.

    • Gestore degli errori specifici dell'associazione

      Configurare un Consumer bean per utilizzare i messaggi di errore di associazione in ingresso specifici. La funzione seguente sottoscrive il canale di errore di associazione in ingresso specifico e ha una priorità più alta rispetto al gestore degli errori di binding predefinito:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      È anche necessario impostare la proprietà spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition sul nome della funzione.

Intestazioni dei messaggi di Hub eventi

Per le intestazioni dei messaggi di base supportate, vedere la sezione intestazioni dei messaggi di Hub eventi di supporto di Spring Cloud Azure per Spring Integration.

Supporto di più binder

La connessione a più spazi dei nomi di Hub eventi è supportata anche tramite più binder. Questo esempio accetta una stringa di connessione come esempio. Sono supportate anche le credenziali delle entità servizio e delle identità gestite. È possibile impostare proprietà correlate nelle impostazioni dell'ambiente di ogni gestore di associazione.

  1. Per usare più binder con Hub eventi, configurare le proprietà seguenti nel file application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nota

    Il file dell'applicazione precedente illustra come configurare un singolo poller predefinito per l'applicazione in tutte le associazioni. Se si vuole configurare il poller per un'associazione specifica, è possibile usare una configurazione come spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Abbiamo bisogno di definire due fornitori e due consumatori:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Provisioning delle risorse

Lo strumento di associazione di Hub eventi supporta il provisioning dell'hub eventi e del gruppo di consumer, gli utenti possono usare le proprietà seguenti per abilitare il provisioning.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Nota

I valori consentiti per tenant-id sono: common, organizations, consumerso l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.

Campioni

Per altre informazioni, vedere il repository azure-spring-boot-samples in GitHub.

Spring Cloud Stream Binder per il bus di servizio di Azure

Concetti chiave

Spring Cloud Stream Binder per il bus di servizio di Azure fornisce l'implementazione dell'associazione per Spring Cloud Stream Framework. Questa implementazione usa gli adapter del canale del bus di servizio Spring Integration alla base.

Messaggio pianificato

Questo gestore di associazione supporta l'invio di messaggi a un argomento per l'elaborazione ritardata. Gli utenti possono inviare messaggi pianificati con intestazione x-delay espressa in millisecondi un tempo di ritardo per il messaggio. Il messaggio verrà recapitato ai rispettivi argomenti dopo x-delay millisecondi.

Gruppo di consumer

Argomento del bus di servizio offre un supporto simile al gruppo di consumer di Apache Kafka, ma con logica leggermente diversa. Questo binder si basa su Subscription di un argomento per fungere da gruppo di consumer.

Configurazione delle dipendenze

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

In alternativa, è anche possibile usare Spring Cloud Azure Stream Service Bus Starter, come illustrato nell'esempio seguente per Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Configurazione

Il binder fornisce le due parti seguenti delle opzioni di configurazione:

Proprietà di configurazione della connessione

Questa sezione contiene le opzioni di configurazione usate per la connessione al bus di servizio di Azure.

Nota

Se si sceglie di usare un'entità di sicurezza per eseguire l'autenticazione e l'autorizzazione con Microsoft Entra ID per l'accesso a una risorsa di Azure, vedere Autorizzare l'accesso con Microsoft Entra ID per assicurarsi che all'entità di sicurezza sia stata concessa l'autorizzazione sufficiente per accedere alla risorsa di Azure.

Proprietà configurabili della connessione di spring-cloud-azure-stream-binder-servicebus:

Proprietà Digitare Descrizione
spring.cloud.azure.servicebusabilitato booleano Indica se un bus di servizio di Azure è abilitato.
spring.cloud.azure.servicebus.connection-string Corda Valore della stringa di connessione dello spazio dei nomi del bus di servizio.
spring.cloud.azure.servicebus.custom-endpoint-address Corda Indirizzo dell'endpoint personalizzato da usare per la connessione al bus di servizio.
spring.cloud.azure.servicebus.namespace Corda Valore spazio dei nomi del bus di servizio, ovvero il prefisso del nome di dominio completo. Un nome di dominio completo deve essere composto da NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Corda Nome di dominio di un valore dello spazio dei nomi del bus di servizio di Azure.

Nota

Le opzioni di configurazione di Common Azure Service SDK sono configurabili anche per lo strumento di associazione del bus di servizio spring cloud di Azure Stream. Le opzioni di configurazione supportate vengono introdotte in configurazione di Spring Cloud Azuree possono essere configurate con il prefisso unificato spring.cloud.azure. o il prefisso di spring.cloud.azure.servicebus..

Il binder supporta anche Spring Could Azure Resource Manager per impostazione predefinita. Per informazioni su come recuperare la stringa di connessione con le entità di sicurezza non concesse con ruoli correlati Data, vedere la sezione utilizzo di base di Spring Potrebbe azure Resource Manager.

Proprietà di configurazione dell'associazione del bus di servizio di Azure

Le opzioni seguenti sono suddivise in quattro sezioni: Proprietà consumer, Configurazioni consumer avanzate, Proprietà producer e Configurazioni producer avanzate.

Proprietà consumer

Queste proprietà vengono esposte tramite ServiceBusConsumerProperties.

Nota

Per evitare ripetizioni, poiché la versione 4.19.0 e 5.19.0, Spring Cloud Stream Binder Service Bus supporta l'impostazione dei valori per tutti i canali, nel formato di spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Proprietà configurabili dal consumer di spring-cloud-azure-stream-binder-servicebus:

Proprietà Digitare Default Descrizione
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected booleano falso Se i messaggi non riusciti vengono indirizzati alla DQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Numero intero 1 Numero massimo di messaggi simultanei che il client processore del bus di servizio deve elaborare. Quando la sessione è abilitata, si applica a ogni sessione.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Numero intero nullo Numero massimo di sessioni simultanee da elaborare in un determinato momento.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Booleano nullo Indica se la sessione è abilitata.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Numero intero 0 Numero di prelettura del client del processore del bus di servizio.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue Accodamento secondario nessuno Tipo della coda secondaria a cui connettersi.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Durata 5m Quantità di tempo per continuare il rinnovo automatico del blocco.
spring.cloud.stream.servicebus.bindings.binding-name.consumermodalità ricezione ServiceBusReceiveMode peek_lock Modalità di ricezione del client del processore del bus di servizio.
spring.cloud.stream.servicebus.bindings.binding-name.consumercompletamento automatico Booleano vero Indica se risolvere automaticamente i messaggi. Se impostato su false, verrà aggiunta un'intestazione di messaggio di Checkpointer per consentire agli sviluppatori di risolvere manualmente i messaggi.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabyte Lungo 1024 Dimensioni massime della coda o dell'argomento in megabyte, ovvero la dimensione della memoria allocata per la coda o l'argomento.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Durata P10675199DT2H48M5.4775807S. (10675199 giorni, 2 ore, 48 minuti, 5 secondi e 477 millisecondi) Durata dopo la quale il messaggio scade, a partire da quando il messaggio viene inviato al bus di servizio.

Importante

Quando si usa Azure Resource Manager (ARM), è necessario configurare la proprietà spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type. Per altre informazioni, vedere l'esempio di servicebus-queue-binder-arm su GitHub.

Configurazione consumer avanzata

Il connessione precedente e client di Azure SDK comune la personalizzazione del supporto della configurazione per ogni consumer di binder, che è possibile configurare con il prefisso spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Proprietà del producer

Queste proprietà vengono esposte tramite ServiceBusProducerProperties.

Nota

Per evitare ripetizioni, poiché la versione 4.19.0 e 5.19.0, Spring Cloud Stream Binder Service Bus supporta l'impostazione dei valori per tutti i canali, nel formato di spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Proprietà configurabili dal producer di spring-cloud-azure-stream-binder-servicebus:

Proprietà Digitare Default Descrizione
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync booleano falso Flag switch per la sincronizzazione del producer.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout lungo 10000 Valore di timeout per l'invio del producer.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType nullo Tipo di entità del bus di servizio del producer, necessario per il producer di associazione.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabyte Lungo 1024 Dimensioni massime della coda o dell'argomento in megabyte, ovvero la dimensione della memoria allocata per la coda o l'argomento.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Durata P10675199DT2H48M5.4775807S. (10675199 giorni, 2 ore, 48 minuti, 5 secondi e 477 millisecondi) Durata dopo la quale il messaggio scade, a partire da quando il messaggio viene inviato al bus di servizio.

Importante

Quando si usa il producer di associazione, è necessario configurare la proprietà di spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type.

Configurazione avanzata del producer

Il connessione precedente e client di Azure SDK comune la personalizzazione del supporto della configurazione per ogni producer di binder, che è possibile configurare con il prefisso spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Utilizzo di base

Invio e ricezione di messaggi da/al bus di servizio

  1. Compilare le opzioni di configurazione con le informazioni sulle credenziali.

    • Per le credenziali come stringa di connessione, configurare le proprietà seguenti nel file application.yml:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Per le credenziali come entità servizio, configurare le proprietà seguenti nel file application.yml:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Nota

I valori consentiti per tenant-id sono: common, organizations, consumerso l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.

  • Per le credenziali come identità gestite, configurare le proprietà seguenti nel file application.yml:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Definire i fornitori e i consumatori.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Supporto della chiave di partizione

Il gestore di associazione supporta di partizionamento del bus di servizio consentendo di impostare la chiave di partizione e l'ID sessione nell'intestazione del messaggio. Questa sezione illustra come impostare la chiave di partizione per i messaggi.

Spring Cloud Stream fornisce una proprietà dell'espressione SpEL della chiave di partizione spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Ad esempio, impostando questa proprietà come "'partitionKey-' + headers[<message-header-key>]" e aggiungendo un'intestazione denominata message-header-key. Spring Cloud Stream usa il valore per questa intestazione durante la valutazione dell'espressione per assegnare una chiave di partizione. Il codice seguente fornisce un producer di esempio:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Supporto delle sessioni

Il gestore di associazione supporta sessioni di messaggi del bus di servizio. L'ID sessione di un messaggio può essere impostato tramite l'intestazione del messaggio.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Nota

In base a partizionamento del bus di servizio, l'ID sessione ha una priorità più alta rispetto alla chiave di partizione. Pertanto, quando vengono impostate entrambe le intestazioni ServiceBusMessageHeaders#SESSION_ID e ServiceBusMessageHeaders#PARTITION_KEY, il valore dell'ID sessione viene infine usato per sovrascrivere il valore della chiave di partizione.

Gestire i messaggi di errore

  • Spring Cloud Azure 5.x
  • Spring Cloud Azure 4.x
  • Gestire i messaggi di errore di associazione in uscita

    Per impostazione predefinita, Spring Integration crea un canale di errore globale denominato errorChannel. Configurare l'endpoint del messaggio seguente per gestire il messaggio di errore di associazione in uscita.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Gestire i messaggi di errore di associazione in ingresso

    Spring Cloud Stream Service Bus Binder supporta due soluzioni per gestire gli errori per le associazioni di messaggi in ingresso: il gestore degli errori del binder e i gestori.

    gestore errori binder:

    Il gestore degli errori del gestore di associazione predefinito gestisce l'associazione in ingresso. Questo gestore viene usato per inviare messaggi non riusciti alla coda dei messaggi non recapitabili quando spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected è abilitato. In caso contrario, i messaggi non riusciti vengono abbandonati. Il gestore degli errori del gestore di associazione si escludono a vicenda con altri gestori errori forniti.

    gestore degli errori:

    Spring Cloud Stream espone un meccanismo per fornire un gestore degli errori personalizzato aggiungendo un Consumer che accetta istanze di ErrorMessage. Per altre informazioni, vedere Gestire i messaggi di errore nella documentazione di Spring Cloud Stream.

    • Gestore degli errori predefinito dell'associazione

      Configurare un singolo Consumer bean per utilizzare tutti i messaggi di errore di associazione in ingresso. La funzione predefinita seguente sottoscrive ogni canale di errore di associazione in ingresso:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      È anche necessario impostare la proprietà spring.cloud.stream.default.error-handler-definition sul nome della funzione.

    • Gestore degli errori specifici dell'associazione

      Configurare un Consumer bean per utilizzare i messaggi di errore di associazione in ingresso specifici. La funzione seguente sottoscrive il canale di errore di associazione in ingresso specifico con una priorità più alta rispetto al gestore degli errori di binding predefinito.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      È anche necessario impostare la proprietà spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition sul nome della funzione.

Intestazioni dei messaggi del bus di servizio

Per le intestazioni dei messaggi di base supportate, vedere la sezione intestazioni dei messaggi del bus di servizio di supporto di Spring Cloud Azure per Spring Integration.

Nota

Quando si imposta la chiave di partizione, la priorità dell'intestazione del messaggio è superiore alla proprietà Spring Cloud Stream. Pertanto, spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression diventa effettivo solo quando non è configurata nessuna delle intestazioni ServiceBusMessageHeaders#SESSION_ID e ServiceBusMessageHeaders#PARTITION_KEY.

Supporto di più binder

La connessione a più spazi dei nomi del bus di servizio è supportata anche tramite più binder. Questo esempio accetta la stringa di connessione come esempio. Sono supportate anche le credenziali delle entità servizio e delle identità gestite. Gli utenti possono impostare proprietà correlate nelle impostazioni dell'ambiente di ogni binder.

  1. Per usare più binder di ServiceBus, configurare le proprietà seguenti nel file application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nota

    Il file dell'applicazione precedente illustra come configurare un singolo poller predefinito per l'applicazione in tutte le associazioni. Se si vuole configurare il poller per un'associazione specifica, è possibile usare una configurazione come spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. abbiamo bisogno di definire due fornitori e due consumatori

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Provisioning delle risorse

Lo strumento di associazione del bus di servizio supporta il provisioning di code, argomenti e sottoscrizioni, gli utenti possono usare le proprietà seguenti per abilitare il provisioning.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Nota

I valori consentiti per tenant-id sono: common, organizations, consumerso l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.

Personalizzare le proprietà client del bus di servizio

Gli sviluppatori possono usare AzureServiceClientBuilderCustomizer per personalizzare le proprietà client del bus di servizio. Nell'esempio seguente viene personalizzata la proprietà sessionIdleTimeout in ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Campioni

Per altre informazioni, vedere il repository azure-spring-boot-samples in GitHub.