Condividi tramite


Supporto di Spring Cloud Azure per Spring Integration

Questo articolo si applica a:✅ versione 4.19.0 ✅ versione 5.19.0

L'estensione Spring Integration per Azure offre adattatori Spring Integration per i vari servizi forniti dall'Azure SDK per Java. È disponibile il supporto spring integration per questi servizi di Azure: Hub eventi, bus di servizio, coda di archiviazione. Di seguito è riportato un elenco di adattatori supportati:

Integrazione di Spring con Hub eventi di Azure

Concetti chiave

Hub eventi di Azure è una piattaforma di streaming di Big Data e un servizio di inserimento di eventi. Può ricevere ed elaborare milioni di eventi al secondo. I dati inviati a un hub eventi possono essere trasformati e archiviati usando qualsiasi provider di analisi in tempo reale o adattatori di batch/archiviazione.

Spring Integration consente la messaggistica leggera all'interno di applicazioni basate su Spring e supporta l'integrazione con sistemi esterni tramite adattatori dichiarativi. Tali adattatori offrono un livello superiore di astrazione rispetto al supporto di Spring per la comunicazione remota, la messaggistica e la pianificazione. Il progetto di estensione Spring Integration per Hub eventi fornisce schede di canale in ingresso e in uscita e gateway per Hub eventi di Azure.

Nota

Le API di supporto di RxJava vengono eliminate dalla versione 4.0.0. Per informazioni dettagliate, vedere Javadoc.

Gruppo di consumer

Hub eventi offre un supporto simile al gruppo di consumer di Apache Kafka, ma con logica leggermente diversa. Mentre Kafka archivia tutti gli offset di cui è stato eseguito il commit nel broker, è necessario archiviare gli offset dei messaggi di Hub eventi elaborati manualmente. Hub eventi SDK fornisce la funzione per archiviare tali offset all'interno di Archiviazione di Azure.

Supporto del partizionamento

Hub eventi offre un concetto simile a quello della partizione fisica di Kafka. Tuttavia, a differenza del ribilanciamento automatico di Kafka tra consumer e partizioni, Hub eventi offre una sorta di modalità preemptive. L'account di archiviazione funge da lease per determinare quale partizione è di proprietà del consumer. All'avvio di un nuovo consumer, tenterà di rubare alcune partizioni dai consumer con carico elevato per ottenere il bilanciamento del carico di lavoro.

Per specificare la strategia di bilanciamento del carico, gli sviluppatori possono usare EventHubsContainerProperties per la configurazione. Vedere la sezione seguente per un esempio di come configurare EventHubsContainerProperties.

Supporto consumer batch

Il EventHubsInboundChannelAdapter supporta la modalità di utilizzo batch. Per abilitarla, gli utenti possono specificare la modalità listener come ListenerMode.BATCH durante la costruzione di un'istanza di EventHubsInboundChannelAdapter. Se abilitata, verrà ricevuto e passato al canale downstream un messaggio di di cui il payload è un elenco di eventi in batch. Ogni intestazione di messaggio viene convertita anche come elenco, di cui il contenuto è il valore di intestazione associato analizzato da ogni evento. Per le intestazioni comuni di ID partizione, checkpointer e ultime proprietà accodate, vengono presentate come un singolo valore per l'intero batch di eventi condivide lo stesso. Per altre informazioni, vedere la sezione intestazioni dei messaggi di Hub eventi .

Nota

L'intestazione del checkpoint esiste solo quando viene usata modalità checkpoint MANUAL.

Il checkpoint del consumer batch supporta due modalità: BATCH e MANUAL. BATCH modalità è una modalità di checkpoint automatico per checkpoint dell'intero batch di eventi insieme dopo che sono stati ricevuti. MANUAL modalità consiste nel checkpoint degli eventi da parte degli utenti. Se usato, il del checkpoint verrà passato all'intestazione del messaggio e gli utenti potrebbero usarlo per eseguire il checkpoint.

I criteri di utilizzo batch possono essere specificati dalle proprietà di max-size e max-wait-time, dove max-size è una proprietà necessaria mentre max-wait-time è facoltativo. Per specificare la strategia di utilizzo batch, gli sviluppatori possono usare EventHubsContainerProperties per la configurazione. Vedere la sezione seguente per un esempio di come configurare EventHubsContainerProperties.

Configurazione delle dipendenze

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Configurazione

Questo strumento di avvio fornisce le tre parti seguenti delle opzioni di configurazione:

Proprietà di configurazione della connessione

Questa sezione contiene le opzioni di configurazione usate per la connessione a Hub eventi di Azure.

Nota

Se si sceglie di usare un'entità di sicurezza per eseguire l'autenticazione e l'autorizzazione con Microsoft Entra ID per l'accesso a una risorsa di Azure, vedere Autorizzare l'accesso con Microsoft Entra ID per assicurarsi che all'entità di sicurezza sia stata concessa l'autorizzazione sufficiente per accedere alla risorsa di Azure.

Proprietà configurabili della connessione di spring-cloud-azure-starter-integration-eventhubs:

Proprietà Digitare Descrizione
spring.cloud.azure.eventhubsabilitato booleano Indica se hub eventi di Azure è abilitato.
spring.cloud.azure.eventhubsstringa di connessione Corda Valore della stringa di connessione dello spazio dei nomi di Hub eventi.
spring.cloud.azure.eventhubs.namespace Corda Valore spazio dei nomi di Hub eventi, ovvero il prefisso del nome di dominio completo. Un nome di dominio completo deve essere composto da NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Corda Nome di dominio di un valore dello spazio dei nomi di Hub eventi di Azure.
spring.cloud.azure.eventhubs.custom-endpoint-address Corda Indirizzo endpoint personalizzato.
spring.cloud.azure.eventhubs.shared-connection Booleano Se l'oggetto EventProcessorClient e EventHubProducerAsyncClient sottostante usano la stessa connessione. Per impostazione predefinita, viene creata una nuova connessione e viene usata per ogni client dell'hub eventi creato.

Proprietà di configurazione del checkpoint

Questa sezione contiene le opzioni di configurazione per il servizio BLOB di archiviazione, che viene usato per rendere persistenti le informazioni sulla proprietà della partizione e sul checkpoint.

Nota

Dalla versione 4.0.0, quando la proprietà di spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-exists non è abilitata manualmente, non verrà creato automaticamente alcun contenitore di archiviazione.

Checkpoint delle proprietà configurabili di spring-cloud-azure-starter-integration-eventhubs:

Proprietà Digitare Descrizione
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-exists Booleano Indica se consentire la creazione di contenitori se non esiste.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Corda Nome dell'account di archiviazione.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Corda Chiave di accesso dell'account di archiviazione.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Corda Nome del contenitore di archiviazione.

Le opzioni di configurazione di Common Azure Service SDK sono configurabili anche per l'archivio checkpoint BLOB di archiviazione. Le opzioni di configurazione supportate vengono introdotte in configurazione di Spring Cloud Azuree possono essere configurate con il prefisso unificato spring.cloud.azure. o il prefisso di spring.cloud.azure.eventhubs.processor.checkpoint-store.

Proprietà di configurazione del processore dell'hub eventi

Il EventHubsInboundChannelAdapter usa il EventProcessorClient per utilizzare i messaggi di un hub eventi, per configurare le proprietà complessive di un EventProcessorClient, gli sviluppatori possono usare EventHubsContainerProperties per la configurazione. Vedere sezione seguente su come usare EventHubsInboundChannelAdapter.

Utilizzo di base

Inviare messaggi a Hub eventi di Azure

  1. Compilare le opzioni di configurazione delle credenziali.

    • Per le credenziali come stringa di connessione, configurare le proprietà seguenti nel file application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Per le credenziali come identità gestite, configurare le proprietà seguenti nel file application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Per le credenziali come entità servizio, configurare le proprietà seguenti nel file application.yml:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Nota

I valori consentiti per tenant-id sono: common, organizations, consumerso l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.

  1. Creare DefaultMessageHandler con il bean EventHubsTemplate per inviare messaggi a Hub eventi.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Creare un'associazione del gateway messaggio con il gestore di messaggi precedente tramite un canale di messaggio.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Inviare messaggi usando il gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Ricevere messaggi da Hub eventi di Azure

  1. Compilare le opzioni di configurazione delle credenziali.

  2. Creare un bean di canale di messaggio come canale di input.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Creare EventHubsInboundChannelAdapter con il bean EventHubsMessageListenerContainer per ricevere messaggi da Hub eventi.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Creare un binding del ricevitore di messaggi con EventHubsInboundChannelAdapter tramite il canale di messaggio creato in precedenza.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configurare EventHubsMessageConverter per personalizzare objectMapper

EventHubsMessageConverter viene creato come bean configurabile per consentire agli utenti di personalizzare ObjectMapper.

Supporto consumer batch

Per utilizzare i messaggi di Hub eventi in batch è simile all'esempio precedente, oltre agli utenti è necessario impostare le opzioni di configurazione correlate a consumo in batch per EventHubsInboundChannelAdapter.

Quando si crea EventHubsInboundChannelAdapter, la modalità listener deve essere impostata come BATCH. Quando si crea un bean di EventHubsMessageListenerContainer, impostare la modalità di checkpoint come MANUAL o BATCHe le opzioni batch possono essere configurate in base alle esigenze.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Intestazioni dei messaggi di Hub eventi

Nella tabella seguente viene illustrato il mapping delle proprietà dei messaggi di Hub eventi alle intestazioni dei messaggi Spring. Per Hub eventi di Azure, il messaggio viene chiamato come event.

Mapping tra le proprietà messaggio/evento di Hub eventi e le intestazioni di spring message in modalità listener record:

Proprietà evento di Hub eventi Costanti di intestazione spring message Digitare Descrizione
Tempo accodato EventHubsHeaders#ENQUEUED_TIME Istante Istante, in formato UTC, di quando l'evento è stato accodato nella partizione dell'hub eventi.
Compensare EventHubsHeaders#OFFSET Lungo Offset dell'evento quando è stato ricevuto dalla partizione dell'hub eventi associata.
Chiave di partizione AzureHeaders#PARTITION_KEY Corda Chiave di hashing della partizione se è stata impostata durante la pubblicazione originale dell'evento.
ID partizione AzureHeaders#RAW_PARTITION_ID Corda ID di partizione dell'hub eventi.
Numero di sequenza EventHubsHeaders#SEQUENCE_NUMBER Lungo Numero di sequenza assegnato all'evento quando è stato accodato nella partizione dell'hub eventi associata.
Last enqueued event properties EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Proprietà dell'ultimo evento accodato in questa partizione.
NA AzureHeaders#CHECKPOINTER Checkpointer Intestazione per il checkpoint del messaggio specifico.

Gli utenti possono analizzare le intestazioni del messaggio per le informazioni correlate di ogni evento. Per impostare un'intestazione di messaggio per l'evento, tutte le intestazioni personalizzate verranno inserite come proprietà dell'applicazione di un evento, in cui l'intestazione viene impostata come chiave della proprietà. Quando gli eventi vengono ricevuti da Hub eventi, tutte le proprietà dell'applicazione verranno convertite nell'intestazione del messaggio.

Nota

Le intestazioni del messaggio della chiave di partizione, il tempo accodato, l'offset e il numero di sequenza non sono supportati per essere impostati manualmente.

Quando la modalità batch-consumer è abilitata, le intestazioni specifiche dei messaggi in batch sono elencate di seguito, che contiene un elenco di valori di ogni singolo evento di Hub eventi.

Mapping tra le proprietà messaggio/evento di Hub eventi e le intestazioni di spring message in modalità listener batch:

Proprietà evento di Hub eventi Costanti di intestazione messaggio Spring Batch Digitare Descrizione
Tempo accodato EventHubsHeaders#ENQUEUED_TIME Elenco di istantanee Elenco dell'istante, in formato UTC, di quando ogni evento è stato accodato nella partizione dell'hub eventi.
Compensare EventHubsHeaders#OFFSET Elenco di long Elenco dell'offset di ogni evento quando è stato ricevuto dalla partizione di Hub eventi associata.
Chiave di partizione AzureHeaders#PARTITION_KEY Elenco di stringhe Elenco della chiave di hashing della partizione se è stata impostata durante la pubblicazione originale di ogni evento.
Numero di sequenza EventHubsHeaders#SEQUENCE_NUMBER Elenco di long Elenco del numero di sequenza assegnato a ogni evento quando è stato accodato nella partizione di Hub eventi associata.
Proprietà di sistema EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Elenco di mappe Elenco delle proprietà di sistema di ogni evento.
Proprietà dell'applicazione EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Elenco di mappe Elenco delle proprietà dell'applicazione di ogni evento, in cui vengono inserite tutte le intestazioni o le proprietà dell'evento personalizzate.

Nota

Quando si pubblicano messaggi, tutte le intestazioni batch precedenti verranno rimosse dai messaggi, se presenti.

Campioni

Per altre informazioni, vedere il repository azure-spring-boot-samples in GitHub.

Integrazione spring con il bus di servizio di Azure

Concetti chiave

Spring Integration consente la messaggistica leggera all'interno di applicazioni basate su Spring e supporta l'integrazione con sistemi esterni tramite adattatori dichiarativi.

Il progetto di estensione Spring Integration per il bus di servizio di Azure fornisce schede di canale in ingresso e in uscita per il bus di servizio di Azure.

Nota

Le API di supporto completableFuture sono state deprecate dalla versione 2.10.0 ed è sostituita da Reactor Core dalla versione 4.0.0. Per informazioni dettagliate, vedere Javadoc.

Configurazione delle dipendenze

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Configurazione

Questo strumento di avvio fornisce le due parti seguenti delle opzioni di configurazione:

Proprietà di configurazione della connessione

Questa sezione contiene le opzioni di configurazione usate per la connessione al bus di servizio di Azure.

Nota

Se si sceglie di usare un'entità di sicurezza per eseguire l'autenticazione e l'autorizzazione con Microsoft Entra ID per l'accesso a una risorsa di Azure, vedere Autorizzare l'accesso con Microsoft Entra ID per assicurarsi che all'entità di sicurezza sia stata concessa l'autorizzazione sufficiente per accedere alla risorsa di Azure.

Proprietà configurabili della connessione di spring-cloud-azure-starter-integration-servicebus:

Proprietà Digitare Descrizione
spring.cloud.azure.servicebusabilitato booleano Indica se un bus di servizio di Azure è abilitato.
spring.cloud.azure.servicebus.connection-string Corda Valore della stringa di connessione dello spazio dei nomi del bus di servizio.
spring.cloud.azure.servicebus.custom-endpoint-address Corda Indirizzo dell'endpoint personalizzato da usare per la connessione al bus di servizio.
spring.cloud.azure.servicebus.namespace Corda Valore spazio dei nomi del bus di servizio, ovvero il prefisso del nome di dominio completo. Un nome di dominio completo deve essere composto da NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Corda Nome di dominio di un valore dello spazio dei nomi del bus di servizio di Azure.

Proprietà di configurazione del processore del bus di servizio

Il ServiceBusInboundChannelAdapter usa il ServiceBusProcessorClient per utilizzare i messaggi, per configurare le proprietà complessive di un ServiceBusProcessorClient, gli sviluppatori possono usare ServiceBusContainerProperties per la configurazione. Vedere sezione seguente su come usare ServiceBusInboundChannelAdapter.

Utilizzo di base

Inviare messaggi al bus di servizio di Azure

  1. Compilare le opzioni di configurazione delle credenziali.

    • Per le credenziali come stringa di connessione, configurare le proprietà seguenti nel file application.yml:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Per le credenziali come identità gestite, configurare le proprietà seguenti nel file application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Nota

I valori consentiti per tenant-id sono: common, organizations, consumerso l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.

  • Per le credenziali come entità servizio, configurare le proprietà seguenti nel file application.yml:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Nota

I valori consentiti per tenant-id sono: common, organizations, consumerso l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.

  1. Creare DefaultMessageHandler con il bean ServiceBusTemplate per inviare messaggi al bus di servizio, impostare il tipo di entità per ServiceBusTemplate. Questo esempio accetta la coda del bus di servizio come esempio.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Creare un'associazione del gateway messaggio con il gestore di messaggi precedente tramite un canale di messaggio.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Inviare messaggi usando il gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Ricevere messaggi dal bus di servizio di Azure

  1. Compilare le opzioni di configurazione delle credenziali.

  2. Creare un bean di canale di messaggio come canale di input.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Creare ServiceBusInboundChannelAdapter con il bean ServiceBusMessageListenerContainer per ricevere messaggi al bus di servizio. Questo esempio accetta la coda del bus di servizio come esempio.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Creare un'associazione di ricevitore di messaggi con ServiceBusInboundChannelAdapter tramite il canale di messaggio creato in precedenza.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configurare ServiceBusMessageConverter per personalizzare objectMapper

ServiceBusMessageConverter viene creato come bean configurabile per consentire agli utenti di personalizzare ObjectMapper.

Intestazioni dei messaggi del bus di servizio

Per alcune intestazioni del bus di servizio di cui è possibile eseguire il mapping a più costanti di intestazione Spring, viene elencata la priorità di intestazioni Spring diverse.

Mapping tra intestazioni del bus di servizio e intestazioni Spring:

Intestazioni e proprietà del messaggio del bus di servizio Costanti dell'intestazione del messaggio Spring Digitare Configurabile Descrizione
Tipo di contenuto MessageHeaders#CONTENT_TYPE Corda Descrittore RFC2045 Content-Type del messaggio.
ID correlazione ServiceBusMessageHeaders#CORRELATION_ID Corda ID di correlazione del messaggio
ID messaggio ServiceBusMessageHeaders#MESSAGE_ID Corda ID messaggio del messaggio, questa intestazione ha priorità più alta rispetto a MessageHeaders#ID.
ID messaggio MessageHeaders#ID UUID ID messaggio del messaggio, questa intestazione ha una priorità inferiore a ServiceBusMessageHeaders#MESSAGE_ID.
Chiave di partizione ServiceBusMessageHeaders#PARTITION_KEY Corda Chiave di partizione per l'invio del messaggio a un'entità partizionata.
Rispondi a MessageHeaders#REPLY_CHANNEL Corda Indirizzo di un'entità a cui inviare risposte.
Rispondi all'ID sessione ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Corda Valore della proprietà ReplyToGroupId del messaggio.
Ora di accodamento pianificata utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Data/ora in cui il messaggio deve essere accodato nel bus di servizio, questa intestazione ha priorità più alta rispetto a AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Ora di accodamento pianificata utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Numero intero Data/ora in cui il messaggio deve essere accodato nel bus di servizio, questa intestazione ha priorità inferiore rispetto a ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
ID sessione ServiceBusMessageHeaders#SESSION_ID Corda IDentifier di sessione per un'entità con riconoscimento della sessione.
Durata (TTL) ServiceBusMessageHeaders#TIME_TO_LIVE Durata Durata dell'intervallo di tempo prima della scadenza del messaggio.
A ServiceBusMessageHeaders#TO Corda Indirizzo "a" del messaggio, riservato per un uso futuro negli scenari di routing e attualmente ignorato dal broker stesso.
Oggetto ServiceBusMessageHeaders#SUBJECT Corda Oggetto del messaggio.
Descrizione dell'errore di messaggi non recapitabili ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Corda No Descrizione di un messaggio non recapitato.
Motivo dei messaggi non recapitabili ServiceBusMessageHeaders#DEAD_LETTER_REASON Corda No Il motivo per cui un messaggio è stato inviato a messaggi non recapitabili.
Origine messaggi non recapitabili ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Corda No Entità in cui il messaggio è stato recapitato in un messaggio non recapitato.
Numero di recapito ServiceBusMessageHeaders#DELIVERY_COUNT lungo No Numero di volte in cui il messaggio è stato recapitato ai client.
Numero di sequenza accodato ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER lungo No Numero di sequenza accodato assegnato a un messaggio dal bus di servizio.
Tempo accodato ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime No Data/ora in cui il messaggio è stato accodato nel bus di servizio.
Scade all'indirizzo ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime No Data/ora in cui il messaggio scadrà.
Token di blocco ServiceBusMessageHeaders#LOCK_TOKEN Corda No Token di blocco per il messaggio corrente.
Bloccato fino a quando ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime No Data/ora in cui scade il blocco del messaggio.
Numero di sequenza ServiceBusMessageHeaders#SEQUENCE_NUMBER lungo No Numero univoco assegnato a un messaggio dal bus di servizio.
Stato ServiceBusMessageHeaders#STATE ServiceBusMessageState No Stato del messaggio, che può essere Attivo, Posticipato o Pianificato.

Supporto della chiave di partizione

Questo starter supporta di partizionamento del bus di servizio consentendo di impostare la chiave di partizione e l'ID sessione nell'intestazione del messaggio. Questa sezione illustra come impostare la chiave di partizione per i messaggi.

Consigliato: Usare ServiceBusMessageHeaders.PARTITION_KEY come chiave dell'intestazione.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Non consigliato ma attualmente supportato:AzureHeaders.PARTITION_KEY come chiave dell'intestazione.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota

Quando sia ServiceBusMessageHeaders.PARTITION_KEY che AzureHeaders.PARTITION_KEY vengono impostati nelle intestazioni del messaggio, è preferibile ServiceBusMessageHeaders.PARTITION_KEY.

Supporto delle sessioni

In questo esempio viene illustrato come impostare manualmente l'ID sessione di un messaggio nell'applicazione.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota

Quando la ServiceBusMessageHeaders.SESSION_ID viene impostata nelle intestazioni del messaggio e viene impostata anche un'intestazione di ServiceBusMessageHeaders.PARTITION_KEY diversa, il valore dell'ID sessione verrà infine usato per sovrascrivere il valore della chiave di partizione.

Personalizzare le proprietà client del bus di servizio

Gli sviluppatori possono usare AzureServiceClientBuilderCustomizer per personalizzare le proprietà client del bus di servizio. Nell'esempio seguente viene personalizzata la proprietà sessionIdleTimeout in ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Campioni

Per altre informazioni, vedere il repository azure-spring-boot-samples in GitHub.

Integrazione spring con coda di archiviazione di Azure

Concetti chiave

Archiviazione code di Azure è un servizio per l'archiviazione di un numero elevato di messaggi. È possibile accedere ai messaggi da qualsiasi parte del mondo tramite chiamate autenticate tramite HTTP o HTTPS. Un messaggio in coda può avere dimensioni fino a 64 KB. Una coda può contenere milioni di messaggi, fino al limite di capacità totale di un account di archiviazione. Le code vengono comunemente usate per creare un backlog di lavoro da elaborare in modo asincrono.

Configurazione delle dipendenze

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Configurazione

Questo strumento di avvio offre le opzioni di configurazione seguenti:

Proprietà di configurazione della connessione

Questa sezione contiene le opzioni di configurazione usate per la connessione alla coda di archiviazione di Azure.

Nota

Se si sceglie di usare un'entità di sicurezza per eseguire l'autenticazione e l'autorizzazione con Microsoft Entra ID per l'accesso a una risorsa di Azure, vedere Autorizzare l'accesso con Microsoft Entra ID per assicurarsi che all'entità di sicurezza sia stata concessa l'autorizzazione sufficiente per accedere alla risorsa di Azure.

Proprietà configurabili della connessione di spring-cloud-azure-starter-integration-storage-queue:

Proprietà Digitare Descrizione
spring.cloud.azure.storage.queue.enabled booleano Indica se è abilitata una coda di archiviazione di Azure.
spring.cloud.azure.storage.queue.connection-string Corda Valore della stringa di connessione dello spazio dei nomi della coda di archiviazione.
spring.cloud.azure.storage.queue.accountName Corda Nome dell'account della coda di archiviazione.
spring.cloud.azure.storage.queue.accountKey Corda Chiave dell'account della coda di archiviazione.
spring.cloud.azure.storage.queueendpoint Corda Endpoint del servizio di accodamento archiviazione.
spring.cloud.azure.storage.queue.sasToken Corda Credenziali del token di firma di accesso condiviso
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion usato durante l'esecuzione di richieste API.
spring.cloud.azure.storage.queue.messageEncoding Corda Codifica dei messaggi della coda.

Utilizzo di base

Inviare messaggi alla coda di archiviazione di Azure

  1. Compilare le opzioni di configurazione delle credenziali.

    • Per le credenziali come stringa di connessione, configurare le proprietà seguenti nel file application.yml:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      
    • Per le credenziali come identità gestite, configurare le proprietà seguenti nel file application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Nota

I valori consentiti per tenant-id sono: common, organizations, consumerso l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.

  • Per le credenziali come entità servizio, configurare le proprietà seguenti nel file application.yml:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Nota

I valori consentiti per tenant-id sono: common, organizations, consumerso l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.

  1. Creare DefaultMessageHandler con il bean StorageQueueTemplate per inviare messaggi alla coda di archiviazione.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Creare un'associazione gateway messaggio con il gestore di messaggi precedente tramite un canale di messaggio.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Inviare messaggi usando il gateway.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Ricevere messaggi dalla coda di archiviazione di Azure

  1. Compilare le opzioni di configurazione delle credenziali.

  2. Creare un bean di canale di messaggio come canale di input.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Creare StorageQueueMessageSource con il bean StorageQueueTemplate per ricevere messaggi nella coda di archiviazione.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Creare un binding del ricevitore di messaggi con StorageQueueMessageSource creato nell'ultimo passaggio tramite il canale di messaggio creato in precedenza.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Campioni

Per altre informazioni, vedere il repository azure-spring-boot-samples in GitHub.