Supporto di Spring Cloud Azure per Spring Integration
Questo articolo si applica a:✅ versione 4.19.0 ✅ versione 5.19.0
L'estensione Spring Integration per Azure offre adattatori Spring Integration per i vari servizi forniti dall'Azure SDK per Java. È disponibile il supporto spring integration per questi servizi di Azure: Hub eventi, bus di servizio, coda di archiviazione. Di seguito è riportato un elenco di adattatori supportati:
-
spring-cloud-azure-starter-integration-eventhubs
: per altre informazioni, vedere Spring Integration con Hub eventi di Azure -
spring-cloud-azure-starter-integration-servicebus
: per altre informazioni, vedere Spring Integration with Azure Service Bus -
spring-cloud-azure-starter-integration-storage-queue
: per altre informazioni, vedere Spring Integration with Azure Storage Queue
Integrazione di Spring con Hub eventi di Azure
Concetti chiave
Hub eventi di Azure è una piattaforma di streaming di Big Data e un servizio di inserimento di eventi. Può ricevere ed elaborare milioni di eventi al secondo. I dati inviati a un hub eventi possono essere trasformati e archiviati usando qualsiasi provider di analisi in tempo reale o adattatori di batch/archiviazione.
Spring Integration consente la messaggistica leggera all'interno di applicazioni basate su Spring e supporta l'integrazione con sistemi esterni tramite adattatori dichiarativi. Tali adattatori offrono un livello superiore di astrazione rispetto al supporto di Spring per la comunicazione remota, la messaggistica e la pianificazione. Il progetto di estensione Spring Integration per Hub eventi fornisce schede di canale in ingresso e in uscita e gateway per Hub eventi di Azure.
Nota
Le API di supporto di RxJava vengono eliminate dalla versione 4.0.0. Per informazioni dettagliate, vedere Javadoc.
Gruppo di consumer
Hub eventi offre un supporto simile al gruppo di consumer di Apache Kafka, ma con logica leggermente diversa. Mentre Kafka archivia tutti gli offset di cui è stato eseguito il commit nel broker, è necessario archiviare gli offset dei messaggi di Hub eventi elaborati manualmente. Hub eventi SDK fornisce la funzione per archiviare tali offset all'interno di Archiviazione di Azure.
Supporto del partizionamento
Hub eventi offre un concetto simile a quello della partizione fisica di Kafka. Tuttavia, a differenza del ribilanciamento automatico di Kafka tra consumer e partizioni, Hub eventi offre una sorta di modalità preemptive. L'account di archiviazione funge da lease per determinare quale partizione è di proprietà del consumer. All'avvio di un nuovo consumer, tenterà di rubare alcune partizioni dai consumer con carico elevato per ottenere il bilanciamento del carico di lavoro.
Per specificare la strategia di bilanciamento del carico, gli sviluppatori possono usare EventHubsContainerProperties
per la configurazione. Vedere la sezione seguente per un esempio di come configurare EventHubsContainerProperties
.
Supporto consumer batch
Il EventHubsInboundChannelAdapter
supporta la modalità di utilizzo batch. Per abilitarla, gli utenti possono specificare la modalità listener come ListenerMode.BATCH
durante la costruzione di un'istanza di EventHubsInboundChannelAdapter
.
Se abilitata, verrà ricevuto e passato al canale downstream un messaggio di
Nota
L'intestazione del checkpoint esiste solo quando viene usata modalità checkpoint MANUAL.
Il checkpoint del consumer batch supporta due modalità: BATCH
e MANUAL
.
BATCH
modalità è una modalità di checkpoint automatico per checkpoint dell'intero batch di eventi insieme dopo che sono stati ricevuti.
MANUAL
modalità consiste nel checkpoint degli eventi da parte degli utenti. Se usato, il del checkpoint
I criteri di utilizzo batch possono essere specificati dalle proprietà di max-size
e max-wait-time
, dove max-size
è una proprietà necessaria mentre max-wait-time
è facoltativo.
Per specificare la strategia di utilizzo batch, gli sviluppatori possono usare EventHubsContainerProperties
per la configurazione. Vedere la sezione seguente per un esempio di come configurare EventHubsContainerProperties
.
Configurazione delle dipendenze
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Configurazione
Questo strumento di avvio fornisce le tre parti seguenti delle opzioni di configurazione:
Proprietà di configurazione della connessione
Questa sezione contiene le opzioni di configurazione usate per la connessione a Hub eventi di Azure.
Nota
Se si sceglie di usare un'entità di sicurezza per eseguire l'autenticazione e l'autorizzazione con Microsoft Entra ID per l'accesso a una risorsa di Azure, vedere Autorizzare l'accesso con Microsoft Entra ID per assicurarsi che all'entità di sicurezza sia stata concessa l'autorizzazione sufficiente per accedere alla risorsa di Azure.
Proprietà configurabili della connessione di spring-cloud-azure-starter-integration-eventhubs:
Proprietà | Digitare | Descrizione |
---|---|---|
spring.cloud.azure.eventhubsabilitato | booleano | Indica se hub eventi di Azure è abilitato. |
spring.cloud.azure.eventhubsstringa di connessione | Corda | Valore della stringa di connessione dello spazio dei nomi di Hub eventi. |
spring.cloud.azure.eventhubs.namespace | Corda | Valore spazio dei nomi di Hub eventi, ovvero il prefisso del nome di dominio completo. Un nome di dominio completo deve essere composto da NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Corda | Nome di dominio di un valore dello spazio dei nomi di Hub eventi di Azure. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Corda | Indirizzo endpoint personalizzato. |
spring.cloud.azure.eventhubs.shared-connection | Booleano | Se l'oggetto EventProcessorClient e EventHubProducerAsyncClient sottostante usano la stessa connessione. Per impostazione predefinita, viene creata una nuova connessione e viene usata per ogni client dell'hub eventi creato. |
Proprietà di configurazione del checkpoint
Questa sezione contiene le opzioni di configurazione per il servizio BLOB di archiviazione, che viene usato per rendere persistenti le informazioni sulla proprietà della partizione e sul checkpoint.
Nota
Dalla versione 4.0.0, quando la proprietà di spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-exists non è abilitata manualmente, non verrà creato automaticamente alcun contenitore di archiviazione.
Checkpoint delle proprietà configurabili di spring-cloud-azure-starter-integration-eventhubs:
Proprietà | Digitare | Descrizione |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-exists | Booleano | Indica se consentire la creazione di contenitori se non esiste. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Corda | Nome dell'account di archiviazione. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Corda | Chiave di accesso dell'account di archiviazione. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Corda | Nome del contenitore di archiviazione. |
Le opzioni di configurazione di Common Azure Service SDK sono configurabili anche per l'archivio checkpoint BLOB di archiviazione. Le opzioni di configurazione supportate vengono introdotte in configurazione di Spring Cloud Azuree possono essere configurate con il prefisso unificato spring.cloud.azure.
o il prefisso di spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Proprietà di configurazione del processore dell'hub eventi
Il EventHubsInboundChannelAdapter
usa il EventProcessorClient
per utilizzare i messaggi di un hub eventi, per configurare le proprietà complessive di un EventProcessorClient
, gli sviluppatori possono usare EventHubsContainerProperties
per la configurazione. Vedere sezione seguente su come usare EventHubsInboundChannelAdapter
.
Utilizzo di base
Inviare messaggi a Hub eventi di Azure
Compilare le opzioni di configurazione delle credenziali.
Per le credenziali come stringa di connessione, configurare le proprietà seguenti nel file application.yml:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Per le credenziali come identità gestite, configurare le proprietà seguenti nel file application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Per le credenziali come entità servizio, configurare le proprietà seguenti nel file application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Nota
I valori consentiti per tenant-id
sono: common
, organizations
, consumers
o l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.
Creare
DefaultMessageHandler
con il beanEventHubsTemplate
per inviare messaggi a Hub eventi.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Creare un'associazione del gateway messaggio con il gestore di messaggi precedente tramite un canale di messaggio.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Inviare messaggi usando il gateway.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Ricevere messaggi da Hub eventi di Azure
Compilare le opzioni di configurazione delle credenziali.
Creare un bean di canale di messaggio come canale di input.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Creare
EventHubsInboundChannelAdapter
con il beanEventHubsMessageListenerContainer
per ricevere messaggi da Hub eventi.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Creare un binding del ricevitore di messaggi con EventHubsInboundChannelAdapter tramite il canale di messaggio creato in precedenza.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Configurare EventHubsMessageConverter per personalizzare objectMapper
EventHubsMessageConverter
viene creato come bean configurabile per consentire agli utenti di personalizzare ObjectMapper.
Supporto consumer batch
Per utilizzare i messaggi di Hub eventi in batch è simile all'esempio precedente, oltre agli utenti è necessario impostare le opzioni di configurazione correlate a consumo in batch per EventHubsInboundChannelAdapter
.
Quando si crea EventHubsInboundChannelAdapter
, la modalità listener deve essere impostata come BATCH
. Quando si crea un bean di EventHubsMessageListenerContainer
, impostare la modalità di checkpoint come MANUAL
o BATCH
e le opzioni batch possono essere configurate in base alle esigenze.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Intestazioni dei messaggi di Hub eventi
Nella tabella seguente viene illustrato il mapping delle proprietà dei messaggi di Hub eventi alle intestazioni dei messaggi Spring. Per Hub eventi di Azure, il messaggio viene chiamato come event
.
Mapping tra le proprietà messaggio/evento di Hub eventi e le intestazioni di spring message in modalità listener record:
Proprietà evento di Hub eventi | Costanti di intestazione spring message | Digitare | Descrizione |
---|---|---|---|
Tempo accodato | EventHubsHeaders#ENQUEUED_TIME | Istante | Istante, in formato UTC, di quando l'evento è stato accodato nella partizione dell'hub eventi. |
Compensare | EventHubsHeaders#OFFSET | Lungo | Offset dell'evento quando è stato ricevuto dalla partizione dell'hub eventi associata. |
Chiave di partizione | AzureHeaders#PARTITION_KEY | Corda | Chiave di hashing della partizione se è stata impostata durante la pubblicazione originale dell'evento. |
ID partizione | AzureHeaders#RAW_PARTITION_ID | Corda | ID di partizione dell'hub eventi. |
Numero di sequenza | EventHubsHeaders#SEQUENCE_NUMBER | Lungo | Numero di sequenza assegnato all'evento quando è stato accodato nella partizione dell'hub eventi associata. |
Last enqueued event properties | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Proprietà dell'ultimo evento accodato in questa partizione. |
NA | AzureHeaders#CHECKPOINTER | Checkpointer | Intestazione per il checkpoint del messaggio specifico. |
Gli utenti possono analizzare le intestazioni del messaggio per le informazioni correlate di ogni evento. Per impostare un'intestazione di messaggio per l'evento, tutte le intestazioni personalizzate verranno inserite come proprietà dell'applicazione di un evento, in cui l'intestazione viene impostata come chiave della proprietà. Quando gli eventi vengono ricevuti da Hub eventi, tutte le proprietà dell'applicazione verranno convertite nell'intestazione del messaggio.
Nota
Le intestazioni del messaggio della chiave di partizione, il tempo accodato, l'offset e il numero di sequenza non sono supportati per essere impostati manualmente.
Quando la modalità batch-consumer è abilitata, le intestazioni specifiche dei messaggi in batch sono elencate di seguito, che contiene un elenco di valori di ogni singolo evento di Hub eventi.
Mapping tra le proprietà messaggio/evento di Hub eventi e le intestazioni di spring message in modalità listener batch:
Proprietà evento di Hub eventi | Costanti di intestazione messaggio Spring Batch | Digitare | Descrizione |
---|---|---|---|
Tempo accodato | EventHubsHeaders#ENQUEUED_TIME | Elenco di istantanee | Elenco dell'istante, in formato UTC, di quando ogni evento è stato accodato nella partizione dell'hub eventi. |
Compensare | EventHubsHeaders#OFFSET | Elenco di long | Elenco dell'offset di ogni evento quando è stato ricevuto dalla partizione di Hub eventi associata. |
Chiave di partizione | AzureHeaders#PARTITION_KEY | Elenco di stringhe | Elenco della chiave di hashing della partizione se è stata impostata durante la pubblicazione originale di ogni evento. |
Numero di sequenza | EventHubsHeaders#SEQUENCE_NUMBER | Elenco di long | Elenco del numero di sequenza assegnato a ogni evento quando è stato accodato nella partizione di Hub eventi associata. |
Proprietà di sistema | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Elenco di mappe | Elenco delle proprietà di sistema di ogni evento. |
Proprietà dell'applicazione | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Elenco di mappe | Elenco delle proprietà dell'applicazione di ogni evento, in cui vengono inserite tutte le intestazioni o le proprietà dell'evento personalizzate. |
Nota
Quando si pubblicano messaggi, tutte le intestazioni batch precedenti verranno rimosse dai messaggi, se presenti.
Campioni
Per altre informazioni, vedere il repository azure-spring-boot-samples
Integrazione spring con il bus di servizio di Azure
Concetti chiave
Spring Integration consente la messaggistica leggera all'interno di applicazioni basate su Spring e supporta l'integrazione con sistemi esterni tramite adattatori dichiarativi.
Il progetto di estensione Spring Integration per il bus di servizio di Azure fornisce schede di canale in ingresso e in uscita per il bus di servizio di Azure.
Nota
Le API di supporto completableFuture sono state deprecate dalla versione 2.10.0 ed è sostituita da Reactor Core dalla versione 4.0.0. Per informazioni dettagliate, vedere Javadoc.
Configurazione delle dipendenze
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Configurazione
Questo strumento di avvio fornisce le due parti seguenti delle opzioni di configurazione:
Proprietà di configurazione della connessione
Questa sezione contiene le opzioni di configurazione usate per la connessione al bus di servizio di Azure.
Nota
Se si sceglie di usare un'entità di sicurezza per eseguire l'autenticazione e l'autorizzazione con Microsoft Entra ID per l'accesso a una risorsa di Azure, vedere Autorizzare l'accesso con Microsoft Entra ID per assicurarsi che all'entità di sicurezza sia stata concessa l'autorizzazione sufficiente per accedere alla risorsa di Azure.
Proprietà configurabili della connessione di spring-cloud-azure-starter-integration-servicebus:
Proprietà | Digitare | Descrizione |
---|---|---|
spring.cloud.azure.servicebusabilitato | booleano | Indica se un bus di servizio di Azure è abilitato. |
spring.cloud.azure.servicebus.connection-string | Corda | Valore della stringa di connessione dello spazio dei nomi del bus di servizio. |
spring.cloud.azure.servicebus.custom-endpoint-address | Corda | Indirizzo dell'endpoint personalizzato da usare per la connessione al bus di servizio. |
spring.cloud.azure.servicebus.namespace | Corda | Valore spazio dei nomi del bus di servizio, ovvero il prefisso del nome di dominio completo. Un nome di dominio completo deve essere composto da NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Corda | Nome di dominio di un valore dello spazio dei nomi del bus di servizio di Azure. |
Proprietà di configurazione del processore del bus di servizio
Il ServiceBusInboundChannelAdapter
usa il ServiceBusProcessorClient
per utilizzare i messaggi, per configurare le proprietà complessive di un ServiceBusProcessorClient
, gli sviluppatori possono usare ServiceBusContainerProperties
per la configurazione. Vedere sezione seguente su come usare ServiceBusInboundChannelAdapter
.
Utilizzo di base
Inviare messaggi al bus di servizio di Azure
Compilare le opzioni di configurazione delle credenziali.
Per le credenziali come stringa di connessione, configurare le proprietà seguenti nel file application.yml:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Per le credenziali come identità gestite, configurare le proprietà seguenti nel file application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Nota
I valori consentiti per tenant-id
sono: common
, organizations
, consumers
o l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.
Per le credenziali come entità servizio, configurare le proprietà seguenti nel file application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Nota
I valori consentiti per tenant-id
sono: common
, organizations
, consumers
o l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.
Creare
DefaultMessageHandler
con il beanServiceBusTemplate
per inviare messaggi al bus di servizio, impostare il tipo di entità per ServiceBusTemplate. Questo esempio accetta la coda del bus di servizio come esempio.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Creare un'associazione del gateway messaggio con il gestore di messaggi precedente tramite un canale di messaggio.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Inviare messaggi usando il gateway.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Ricevere messaggi dal bus di servizio di Azure
Compilare le opzioni di configurazione delle credenziali.
Creare un bean di canale di messaggio come canale di input.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Creare
ServiceBusInboundChannelAdapter
con il beanServiceBusMessageListenerContainer
per ricevere messaggi al bus di servizio. Questo esempio accetta la coda del bus di servizio come esempio.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Creare un'associazione di ricevitore di messaggi con
ServiceBusInboundChannelAdapter
tramite il canale di messaggio creato in precedenza.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Configurare ServiceBusMessageConverter per personalizzare objectMapper
ServiceBusMessageConverter
viene creato come bean configurabile per consentire agli utenti di personalizzare ObjectMapper
.
Intestazioni dei messaggi del bus di servizio
Per alcune intestazioni del bus di servizio di cui è possibile eseguire il mapping a più costanti di intestazione Spring, viene elencata la priorità di intestazioni Spring diverse.
Mapping tra intestazioni del bus di servizio e intestazioni Spring:
Intestazioni e proprietà del messaggio del bus di servizio | Costanti dell'intestazione del messaggio Spring | Digitare | Configurabile | Descrizione |
---|---|---|---|---|
Tipo di contenuto | MessageHeaders#CONTENT_TYPE |
Corda | Sì | Descrittore RFC2045 Content-Type del messaggio. |
ID correlazione | ServiceBusMessageHeaders#CORRELATION_ID |
Corda | Sì | ID di correlazione del messaggio |
ID messaggio | ServiceBusMessageHeaders#MESSAGE_ID |
Corda | Sì | ID messaggio del messaggio, questa intestazione ha priorità più alta rispetto a MessageHeaders#ID . |
ID messaggio | MessageHeaders#ID |
UUID | Sì | ID messaggio del messaggio, questa intestazione ha una priorità inferiore a ServiceBusMessageHeaders#MESSAGE_ID . |
Chiave di partizione | ServiceBusMessageHeaders#PARTITION_KEY |
Corda | Sì | Chiave di partizione per l'invio del messaggio a un'entità partizionata. |
Rispondi a | MessageHeaders#REPLY_CHANNEL |
Corda | Sì | Indirizzo di un'entità a cui inviare risposte. |
Rispondi all'ID sessione | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Corda | Sì | Valore della proprietà ReplyToGroupId del messaggio. |
Ora di accodamento pianificata utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Sì | Data/ora in cui il messaggio deve essere accodato nel bus di servizio, questa intestazione ha priorità più alta rispetto a AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Ora di accodamento pianificata utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Numero intero | Sì | Data/ora in cui il messaggio deve essere accodato nel bus di servizio, questa intestazione ha priorità inferiore rispetto a ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
ID sessione | ServiceBusMessageHeaders#SESSION_ID |
Corda | Sì | IDentifier di sessione per un'entità con riconoscimento della sessione. |
Durata (TTL) | ServiceBusMessageHeaders#TIME_TO_LIVE |
Durata | Sì | Durata dell'intervallo di tempo prima della scadenza del messaggio. |
A | ServiceBusMessageHeaders#TO |
Corda | Sì | Indirizzo "a" del messaggio, riservato per un uso futuro negli scenari di routing e attualmente ignorato dal broker stesso. |
Oggetto | ServiceBusMessageHeaders#SUBJECT |
Corda | Sì | Oggetto del messaggio. |
Descrizione dell'errore di messaggi non recapitabili | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Corda | No | Descrizione di un messaggio non recapitato. |
Motivo dei messaggi non recapitabili | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Corda | No | Il motivo per cui un messaggio è stato inviato a messaggi non recapitabili. |
Origine messaggi non recapitabili | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Corda | No | Entità in cui il messaggio è stato recapitato in un messaggio non recapitato. |
Numero di recapito | ServiceBusMessageHeaders#DELIVERY_COUNT |
lungo | No | Numero di volte in cui il messaggio è stato recapitato ai client. |
Numero di sequenza accodato | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
lungo | No | Numero di sequenza accodato assegnato a un messaggio dal bus di servizio. |
Tempo accodato | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | No | Data/ora in cui il messaggio è stato accodato nel bus di servizio. |
Scade all'indirizzo | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | No | Data/ora in cui il messaggio scadrà. |
Token di blocco | ServiceBusMessageHeaders#LOCK_TOKEN |
Corda | No | Token di blocco per il messaggio corrente. |
Bloccato fino a quando | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | No | Data/ora in cui scade il blocco del messaggio. |
Numero di sequenza | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
lungo | No | Numero univoco assegnato a un messaggio dal bus di servizio. |
Stato | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | No | Stato del messaggio, che può essere Attivo, Posticipato o Pianificato. |
Supporto della chiave di partizione
Questo starter supporta di partizionamento del bus di servizio consentendo di impostare la chiave di partizione e l'ID sessione nell'intestazione del messaggio. Questa sezione illustra come impostare la chiave di partizione per i messaggi.
Consigliato: Usare ServiceBusMessageHeaders.PARTITION_KEY
come chiave dell'intestazione.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Non consigliato ma attualmente supportato:AzureHeaders.PARTITION_KEY
come chiave dell'intestazione.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nota
Quando sia ServiceBusMessageHeaders.PARTITION_KEY
che AzureHeaders.PARTITION_KEY
vengono impostati nelle intestazioni del messaggio, è preferibile ServiceBusMessageHeaders.PARTITION_KEY
.
Supporto delle sessioni
In questo esempio viene illustrato come impostare manualmente l'ID sessione di un messaggio nell'applicazione.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nota
Quando la ServiceBusMessageHeaders.SESSION_ID
viene impostata nelle intestazioni del messaggio e viene impostata anche un'intestazione di ServiceBusMessageHeaders.PARTITION_KEY
diversa, il valore dell'ID sessione verrà infine usato per sovrascrivere il valore della chiave di partizione.
Personalizzare le proprietà client del bus di servizio
Gli sviluppatori possono usare AzureServiceClientBuilderCustomizer
per personalizzare le proprietà client del bus di servizio. Nell'esempio seguente viene personalizzata la proprietà sessionIdleTimeout
in ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Campioni
Per altre informazioni, vedere il repository azure-spring-boot-samples
Integrazione spring con coda di archiviazione di Azure
Concetti chiave
Archiviazione code di Azure è un servizio per l'archiviazione di un numero elevato di messaggi. È possibile accedere ai messaggi da qualsiasi parte del mondo tramite chiamate autenticate tramite HTTP o HTTPS. Un messaggio in coda può avere dimensioni fino a 64 KB. Una coda può contenere milioni di messaggi, fino al limite di capacità totale di un account di archiviazione. Le code vengono comunemente usate per creare un backlog di lavoro da elaborare in modo asincrono.
Configurazione delle dipendenze
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Configurazione
Questo strumento di avvio offre le opzioni di configurazione seguenti:
Proprietà di configurazione della connessione
Questa sezione contiene le opzioni di configurazione usate per la connessione alla coda di archiviazione di Azure.
Nota
Se si sceglie di usare un'entità di sicurezza per eseguire l'autenticazione e l'autorizzazione con Microsoft Entra ID per l'accesso a una risorsa di Azure, vedere Autorizzare l'accesso con Microsoft Entra ID per assicurarsi che all'entità di sicurezza sia stata concessa l'autorizzazione sufficiente per accedere alla risorsa di Azure.
Proprietà configurabili della connessione di spring-cloud-azure-starter-integration-storage-queue:
Proprietà | Digitare | Descrizione |
---|---|---|
spring.cloud.azure.storage.queue.enabled | booleano | Indica se è abilitata una coda di archiviazione di Azure. |
spring.cloud.azure.storage.queue.connection-string | Corda | Valore della stringa di connessione dello spazio dei nomi della coda di archiviazione. |
spring.cloud.azure.storage.queue.accountName | Corda | Nome dell'account della coda di archiviazione. |
spring.cloud.azure.storage.queue.accountKey | Corda | Chiave dell'account della coda di archiviazione. |
spring.cloud.azure.storage.queueendpoint | Corda | Endpoint del servizio di accodamento archiviazione. |
spring.cloud.azure.storage.queue.sasToken | Corda | Credenziali del token di firma di accesso condiviso |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion usato durante l'esecuzione di richieste API. |
spring.cloud.azure.storage.queue.messageEncoding | Corda | Codifica dei messaggi della coda. |
Utilizzo di base
Inviare messaggi alla coda di archiviazione di Azure
Compilare le opzioni di configurazione delle credenziali.
Per le credenziali come stringa di connessione, configurare le proprietà seguenti nel file application.yml:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
Per le credenziali come identità gestite, configurare le proprietà seguenti nel file application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Nota
I valori consentiti per tenant-id
sono: common
, organizations
, consumers
o l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.
Per le credenziali come entità servizio, configurare le proprietà seguenti nel file application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Nota
I valori consentiti per tenant-id
sono: common
, organizations
, consumers
o l'ID tenant. Per altre informazioni su questi valori, vedere la sezione Usato l'endpoint errato (account personali e dell'organizzazione) di Errore AADSTS50020 - L'account utente del provider di identità non esiste nel tenant. Per informazioni sulla conversione dell'app a tenant singolo, vedere Convertire l'app a tenant singolo in multi-tenant in Microsoft Entra ID.
Creare
DefaultMessageHandler
con il beanStorageQueueTemplate
per inviare messaggi alla coda di archiviazione.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Creare un'associazione gateway messaggio con il gestore di messaggi precedente tramite un canale di messaggio.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Inviare messaggi usando il gateway.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Ricevere messaggi dalla coda di archiviazione di Azure
Compilare le opzioni di configurazione delle credenziali.
Creare un bean di canale di messaggio come canale di input.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Creare
StorageQueueMessageSource
con il beanStorageQueueTemplate
per ricevere messaggi nella coda di archiviazione.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Creare un binding del ricevitore di messaggi con StorageQueueMessageSource creato nell'ultimo passaggio tramite il canale di messaggio creato in precedenza.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Campioni
Per altre informazioni, vedere il repository azure-spring-boot-samples