Podpora Spring Cloud Azure pro integraci Spring
Tento článek se vztahuje na:✅ verze 4.19.0 ✅ verze 5.19.0
Rozšíření Spring Integration pro Azure poskytuje adaptéry integrace Spring pro různé služby poskytované sadou Azure SDK pro Javu. Poskytujeme podporu integrace Spring pro tyto služby Azure: Event Hubs, Service Bus, Fronta úložiště. Následuje seznam podporovaných adaptérů:
-
spring-cloud-azure-starter-integration-eventhubs
– další informace najdete v tématu Integrace Spring se službou Azure Event Hubs -
spring-cloud-azure-starter-integration-servicebus
– další informace najdete v tématu Integrace Spring se službou Azure Service Bus -
spring-cloud-azure-starter-integration-storage-queue
– další informace najdete v tématu Integrace Spring s frontou azure Storage
Integrace Spring se službou Azure Event Hubs
Klíčové koncepty
Azure Event Hubs je platforma pro streamování velkých objemů dat a služba pro příjem událostí. Může přijímat a zpracovávat miliony událostí za sekundu. Data odesílaná do centra událostí je možné transformovat a ukládat pomocí libovolného poskytovatele analýz v reálném čase nebo adaptérů batchingu nebo úložiště.
Spring Integration umožňuje zjednodušené zasílání zpráv v aplikacích založených na Springu a podporuje integraci s externími systémy prostřednictvím deklarativních adaptérů. Tyto adaptéry poskytují vyšší úroveň abstrakce oproti podpoře vzdálené komunikace, zasílání zpráv a plánování. Integrace Spring pro službu Event Hubs rozšíření poskytuje adaptéry a brány pro příchozí a odchozí kanály pro službu Azure Event Hubs.
Poznámka
Rozhraní API podpory RxJava se zahodí z verze 4.0.0. Podrobnosti najdete v Javadocu.
Skupina příjemců
Event Hubs poskytuje podobnou podporu skupiny příjemců jako Apache Kafka, ale s mírnou odlišnou logikou. Zatímco Kafka ukládá všechny potvrzené posuny ve zprostředkovateli, musíte ukládat posuny zpráv Event Hubs zpracovávaných ručně. Sada Event Hubs SDK poskytuje funkci pro ukládání takových posunů ve službě Azure Storage.
Podpora dělení
Služba Event Hubs poskytuje podobný koncept fyzického oddílu jako Kafka. Na rozdíl od automatického vyrovnávání mezi uživateli a oddíly Kafka ale služba Event Hubs poskytuje druh preemptivního režimu. Účet úložiště funguje jako zapůjčení k určení, který oddíl vlastní uživatel. Když se spustí nový příjemce, pokusí se ukrást některé oddíly od většiny náročných příjemců, aby se dosáhlo vyrovnávání zatížení.
Pokud chcete určit strategii vyrovnávání zatížení, můžou vývojáři pro konfiguraci použít EventHubsContainerProperties
. Příklad konfigurace EventHubsContainerProperties
najdete v následující části.
Podpora příjemců služby Batch
EventHubsInboundChannelAdapter
podporuje režim dávkového využívání. Pokud ho chcete povolit, mohou uživatelé při vytváření instance EventHubsInboundChannelAdapter
určit režim naslouchacího procesu jako ListenerMode.BATCH
.
Pokud je tato možnost povolená, zpráva
Poznámka
Hlavička kontrolního bodu existuje pouze v případech, kdy se používá režim kontrolního bodu MANUAL.
Vytváření kontrolních bodů dávkového příjemce podporuje dva režimy: BATCH
a MANUAL
.
BATCH
režim je režim automatického vytváření kontrolních bodů pro vytvoření kontrolního bodu pro identifikaci celé dávky událostí po jejich přijetí.
MANUAL
režim slouží k vytvoření kontrolního bodu událostí uživateli. Při použití se Kontrolní boder předá do záhlaví zprávy a uživatelé ho můžou použít k vytváření kontrolních bodů.
Zásady dávkového využívání lze určit vlastnostmi max-size
a max-wait-time
, kde max-size
je nezbytná vlastnost, zatímco max-wait-time
je nepovinná.
Pokud chcete určit strategii dávkového využívání, můžou vývojáři pro konfiguraci použít EventHubsContainerProperties
. Příklad konfigurace EventHubsContainerProperties
najdete v následující části.
Nastavení závislostí
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Konfigurace
Tato úvodní sada poskytuje následující tři části možností konfigurace:
Vlastnosti konfigurace připojení
Tato část obsahuje možnosti konfigurace používané pro připojení ke službě Azure Event Hubs.
Poznámka
Pokud se rozhodnete použít objekt zabezpečení k ověření a autorizaci pomocí Microsoft Entra ID pro přístup k prostředku Azure, přečtěte si téma Autorizace přístupu s ID Microsoft Entra, abyste měli jistotu, že objekt zabezpečení má dostatečná oprávnění pro přístup k prostředku Azure.
Konfigurovatelné vlastnosti připojení spring-cloud-azure-starter-integration-eventhubs:
Vlastnost | Typ | Popis |
---|---|---|
spring.cloud.azure.eventhubs.enabled | booleovský | Určuje, jestli je služba Azure Event Hubs povolená. |
spring.cloud.azure.eventhubs.connection-string | Řetězec | Hodnota připojovacího řetězce oboru názvů služby Event Hubs |
spring.cloud.azure.eventhubs.namespace | Řetězec | Hodnota oboru názvů služby Event Hubs, což je předpona plně kvalifikovaného názvu domény. Plně kvalifikovaný název domény by se měl skládat z NamespaceName.DomainName. |
spring.cloud.azure.eventhubs.domain-name | Řetězec | Název domény hodnoty oboru názvů služby Azure Event Hubs |
spring.cloud.azure.eventhubs.custom-endpoint-address | Řetězec | Adresa vlastního koncového bodu |
spring.cloud.azure.eventhubs.shared-connection | Booleovský | Určuje, jestli základní EventProcessorClient a EventHubProducerAsyncClient používají stejné připojení. Ve výchozím nastavení se vytvoří nové připojení a použije se pro každého vytvořeného klienta centra událostí. |
Vlastnosti konfigurace kontrolního bodu
Tato část obsahuje možnosti konfigurace pro službu Storage Blobs, která se používá k zachování vlastnictví oddílu a informací kontrolních bodů.
Poznámka
Ve verzi 4.0.0 není vlastnost spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists povolena ručně, nebude automaticky vytvořen žádný kontejner úložiště.
Konfigurovatelné vlastnosti kontrolních bodů spring-cloud-azure-starter-integration-eventhubs:
Vlastnost | Typ | Popis |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booleovský | Zda chcete povolit vytváření kontejnerů, pokud neexistuje. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Řetězec | Název účtu úložiště |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Řetězec | Přístupový klíč účtu úložiště. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Řetězec | Název kontejneru úložiště |
Běžné možnosti konfigurace sady SDK služby Azure jsou konfigurovatelné i pro úložiště kontrolních bodů objektů blob služby Storage. Podporované možnosti konfigurace jsou zavedeny v Spring Cloud Azure konfiguracea lze je nakonfigurovat pomocí sjednocené předpony spring.cloud.azure.
nebo předpony spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Vlastnosti konfigurace procesoru centra událostí
EventHubsInboundChannelAdapter
používá EventProcessorClient
ke zpracování zpráv z centra událostí, ke konfiguraci celkových vlastností EventProcessorClient
mohou vývojáři použít EventHubsContainerProperties
pro konfiguraci. Přečtěte si následující části o tom, jak pracovat s EventHubsInboundChannelAdapter
.
Základní využití
Odesílání zpráv do služby Azure Event Hubs
Vyplňte možnosti konfigurace přihlašovacích údajů.
Pro přihlašovací údaje jako připojovací řetězec nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
U přihlašovacích údajů jako spravovaných identit nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Pro přihlašovací údaje jako instanční objekt nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Poznámka
Hodnoty povolené pro tenant-id
jsou: common
, organizations
, consumers
nebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .
Vytvořte
DefaultMessageHandler
pomocíEventHubsTemplate
bean pro odesílání zpráv do služby Event Hubs.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Vytvořte vazbu brány zpráv s výše uvedenou obslužnou rutinou zprávy prostřednictvím kanálu zprávy.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Odesílání zpráv pomocí brány
class Demo { public void demo() { this.messagingGateway.send(message); } }
Příjem zpráv ze služby Azure Event Hubs
Vyplňte možnosti konfigurace přihlašovacích údajů.
Vytvořte jako vstupní kanál bean kanálu zprávy.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Vytvořte
EventHubsInboundChannelAdapter
pomocíEventHubsMessageListenerContainer
bean pro příjem zpráv ze služby Event Hubs.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Vytvořte vazbu příjemce zprávy pomocí eventHubsInboundChannelAdapter prostřednictvím kanálu zprávy vytvořeného dříve.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurace EventHubsMessageConverter pro přizpůsobení objectMapper
EventHubsMessageConverter
se vytvoří jako konfigurovatelná bean, která uživatelům umožní přizpůsobit ObjectMapper.
Podpora příjemců služby Batch
Pokud chcete využívat zprávy ze služby Event Hubs v dávkách, je podobné jako u výše uvedené ukázky, kromě toho by uživatelé měli nastavit možnosti konfigurace související s dávkovou spotřebou pro EventHubsInboundChannelAdapter
.
Při vytváření EventHubsInboundChannelAdapter
by měl být režim naslouchacího procesu nastaven jako BATCH
. Při vytváření bean z EventHubsMessageListenerContainer
nastavte režim kontrolního bodu na MANUAL
nebo BATCH
a možnosti dávky lze podle potřeby nakonfigurovat.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Hlavičky zpráv služby Event Hubs
Následující tabulka ukazuje, jak jsou vlastnosti zpráv služby Event Hubs mapovány na hlavičky zpráv Spring. U služby Azure Event Hubs se zpráva volá jako event
.
Mapování mezi zprávou event hubs / vlastnostmi události a záhlavími springových zpráv v režimu naslouchacího procesu záznamu:
Vlastnosti událostí služby Event Hubs | Konstanty záhlaví springových zpráv | Typ | Popis |
---|---|---|---|
Čas zařazení do fronty | EventHubsHeaders#ENQUEUED_TIME | Okamžitý | Okamžitě v UTC, kdy byla událost zapsána do fronty v oddílu centra událostí. |
Ofset | EventHubsHeaders#OFFSET | Dlouhý | Posun události při přijetí z přidruženého oddílu centra událostí. |
Klíč oddílu | AzureHeaders#PARTITION_KEY | Řetězec | Klíč hashování oddílu, pokud byl nastaven při původním publikování události. |
ID oddílu | AzureHeaders#RAW_PARTITION_ID | Řetězec | ID oddílu centra událostí. |
Pořadové číslo | EventHubsHeaders#SEQUENCE_NUMBER | Dlouhý | Pořadové číslo přiřazené události, když byla zařazena do fronty v přidruženém oddílu centra událostí. |
Vlastnosti poslední události ve frontě | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Vlastnosti poslední události v tomto oddílu. |
SODÍK | AzureHeaders#CHECKPOINTER | Kontrolní bod | Záhlaví pro kontrolní bod konkrétní zprávy. |
Uživatelé mohou analyzovat záhlaví zprávy pro související informace o každé události. Pokud chcete nastavit záhlaví zprávy pro událost, budou všechna přizpůsobená záhlaví vložena jako vlastnost aplikace události, kde je hlavička nastavena jako klíč vlastnosti. Při přijetí událostí ze služby Event Hubs se všechny vlastnosti aplikace převedou do záhlaví zprávy.
Poznámka
Záhlaví zpráv klíče oddílu, čas zařazení do fronty, posun a pořadové číslo není podporováno ruční nastavení.
Pokud je povolený režim dávkového příjemce, zobrazí se následující konkrétní hlavičky dávkových zpráv, které obsahují seznam hodnot z každé události služby Event Hubs.
Mapování mezi zprávami event hubs / vlastnostmi události a hlavičkami spring zpráv v režimu dávkového naslouchacího procesu:
Vlastnosti událostí služby Event Hubs | Konstanty záhlaví zpráv Spring Batch | Typ | Popis |
---|---|---|---|
Čas zařazení do fronty | EventHubsHeaders#ENQUEUED_TIME | Seznam okamžitých | Seznam okamžité události ve standardu UTC, kdy byla každá událost zapsána do fronty v oddílu centra událostí. |
Ofset | EventHubsHeaders#OFFSET | Seznam dlouhých | Seznam posunu každé události při přijetí z přidruženého oddílu centra událostí |
Klíč oddílu | AzureHeaders#PARTITION_KEY | Seznam řetězců | Seznam klíče hash oddílu, pokud byl nastaven při původním publikování každé události. |
Pořadové číslo | EventHubsHeaders#SEQUENCE_NUMBER | Seznam dlouhých | Seznam pořadového čísla přiřazeného každé události, když byla zařazena do fronty v přidruženém oddílu centra událostí. |
Systémové vlastnosti | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Seznam map | Seznam systémových vlastností každé události |
Vlastnosti aplikace | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Seznam map | Seznam vlastností aplikace každé události, kde jsou umístěny všechny přizpůsobené hlavičky zpráv nebo vlastnosti události. |
Poznámka
Při publikování zpráv budou všechny výše uvedené dávkové hlavičky odebrány ze zpráv, pokud existují.
Vzorky
Další informace najdete v azure-spring-boot-samples úložišti na GitHubu.
Integrace Spring se službou Azure Service Bus
Klíčové koncepty
Spring Integration umožňuje zjednodušené zasílání zpráv v aplikacích založených na Springu a podporuje integraci s externími systémy prostřednictvím deklarativních adaptérů.
Projekt rozšíření Spring Integration for Azure Service Bus poskytuje adaptéry příchozích a odchozích kanálů pro Službu Azure Service Bus.
Poznámka
Rozhraní API podpory CompletableFuture jsou od verze 2.10.0 zastaralá a nahrazuje jádro Reactor verze 4.0.0. Podrobnosti najdete v Javadocu.
Nastavení závislostí
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Konfigurace
Tato úvodní sada poskytuje následující dvě části možností konfigurace:
Vlastnosti konfigurace připojení
Tato část obsahuje možnosti konfigurace používané pro připojení ke službě Azure Service Bus.
Poznámka
Pokud se rozhodnete použít objekt zabezpečení k ověření a autorizaci pomocí Microsoft Entra ID pro přístup k prostředku Azure, přečtěte si téma Autorizace přístupu s ID Microsoft Entra, abyste měli jistotu, že objekt zabezpečení má dostatečná oprávnění pro přístup k prostředku Azure.
Konfigurovatelné vlastnosti připojení spring-cloud-azure-starter-integration-servicebus:
Vlastnost | Typ | Popis |
---|---|---|
spring.cloud.azure.servicebus.enabled | booleovský | Určuje, jestli je povolená služba Azure Service Bus. |
spring.cloud.azure.servicebus.connection-string | Řetězec | Hodnota připojovacího řetězce oboru názvů služby Service Bus |
spring.cloud.azure.servicebus.custom-endpoint-address | Řetězec | Vlastní adresa koncového bodu, která se má použít při připojování ke službě Service Bus. |
spring.cloud.azure.servicebus.namespace | Řetězec | Hodnota oboru názvů služby Service Bus, což je předpona plně kvalifikovaného názvu domény. Plně kvalifikovaný název domény by se měl skládat z NamespaceName.DomainName. |
spring.cloud.azure.servicebus.domain-name | Řetězec | Název domény hodnoty oboru názvů služby Azure Service Bus |
Vlastnosti konfigurace procesoru služby Service Bus
ServiceBusInboundChannelAdapter
používá ServiceBusProcessorClient
ke zpracování zpráv, ke konfiguraci celkových vlastností ServiceBusProcessorClient
, mohou vývojáři pro konfiguraci použít ServiceBusContainerProperties
. Přečtěte si následující části o tom, jak pracovat s ServiceBusInboundChannelAdapter
.
Základní využití
Odesílání zpráv do služby Azure Service Bus
Vyplňte možnosti konfigurace přihlašovacích údajů.
Pro přihlašovací údaje jako připojovací řetězec nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
U přihlašovacích údajů jako spravovaných identit nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Poznámka
Hodnoty povolené pro tenant-id
jsou: common
, organizations
, consumers
nebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .
Pro přihlašovací údaje jako instanční objekt nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Poznámka
Hodnoty povolené pro tenant-id
jsou: common
, organizations
, consumers
nebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .
Vytvořte
DefaultMessageHandler
pomocíServiceBusTemplate
bean pro odesílání zpráv do služby Service Bus a nastavte typ entity pro ServiceBusTemplate. V této ukázce se například používá fronta služby Service Bus.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Vytvořte vazbu brány zpráv s výše uvedenou obslužnou rutinou zprávy prostřednictvím kanálu zprávy.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Odesílání zpráv pomocí brány
class Demo { public void demo() { this.messagingGateway.send(message); } }
Příjem zpráv ze služby Azure Service Bus
Vyplňte možnosti konfigurace přihlašovacích údajů.
Vytvořte jako vstupní kanál bean kanálu zprávy.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Vytvořte
ServiceBusInboundChannelAdapter
sServiceBusMessageListenerContainer
bean pro příjem zpráv do služby Service Bus. V této ukázce se například používá fronta služby Service Bus.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Vytvořte vazbu příjemce zprávy s
ServiceBusInboundChannelAdapter
prostřednictvím kanálu zprávy, který jsme vytvořili dříve.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurace ServiceBusMessageConverter pro přizpůsobení objectMapper
ServiceBusMessageConverter
se vytvoří jako konfigurovatelná bean, která uživatelům umožní přizpůsobit ObjectMapper
.
Hlavičky zpráv služby Service Bus
U některých hlaviček služby Service Bus, které je možné mapovat na více konstant záhlaví Spring, je uvedena priorita různých hlaviček Spring.
Mapování mezi hlavičkami služby Service Bus a spring headers:
Hlavičky a vlastnosti zpráv služby Service Bus | Konstanty záhlaví spring zprávy | Typ | Konfigurovatelný | Popis |
---|---|---|---|---|
Typ obsahu | MessageHeaders#CONTENT_TYPE |
Řetězec | Ano | Popisovač typu obsahu zprávy RFC2045. |
ID korelace | ServiceBusMessageHeaders#CORRELATION_ID |
Řetězec | Ano | ID korelace zprávy |
ID zprávy | ServiceBusMessageHeaders#MESSAGE_ID |
Řetězec | Ano | ID zprávy má toto záhlaví vyšší prioritu než MessageHeaders#ID . |
ID zprávy | MessageHeaders#ID |
UUID | Ano | ID zprávy má toto záhlaví nižší prioritu než ServiceBusMessageHeaders#MESSAGE_ID . |
Klíč oddílu | ServiceBusMessageHeaders#PARTITION_KEY |
Řetězec | Ano | Klíč oddílu pro odeslání zprávy do dělené entity. |
Odpovědět na | MessageHeaders#REPLY_CHANNEL |
Řetězec | Ano | Adresa entity, na kterou se mají odesílat odpovědi. |
Odpověď na ID relace | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Řetězec | Ano | Hodnota vlastnosti ReplyToGroupId zprávy. |
Naplánovaný čas fronty utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Ano | Datum a čas, kdy má být zpráva ve frontě ve službě Service Bus, má tato hlavička vyšší prioritu než AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Naplánovaný čas fronty utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Celé číslo | Ano | Datum a čas, kdy má být zpráva ve frontě ve službě Service Bus, má tato hlavička nižší prioritu než ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
ID relace | ServiceBusMessageHeaders#SESSION_ID |
Řetězec | Ano | IDentifier relace pro entitu pracující s relací. |
Čas k živému provozu | ServiceBusMessageHeaders#TIME_TO_LIVE |
Trvání | Ano | Doba trvání před vypršením platnosti této zprávy. |
K | ServiceBusMessageHeaders#TO |
Řetězec | Ano | Adresa "komu" zprávy, vyhrazená pro budoucí použití ve scénářích směrování a v současnosti ignorována samotným zprostředkovatelem. |
Předmět | ServiceBusMessageHeaders#SUBJECT |
Řetězec | Ano | Předmět zprávy. |
Popis chyby nedoručených zpráv | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Řetězec | Ne | Popis zprávy, která byla nedoručována. |
Důvod nedoručených dopisů | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Řetězec | Ne | Důvod, proč byla zpráva nedoručována. |
Zdroj nedoručených dopisů | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Řetězec | Ne | Entita, ve které byla zpráva nedoručována. |
Počet doručení | ServiceBusMessageHeaders#DELIVERY_COUNT |
dlouhý | Ne | Počet doručení této zprávy klientům. |
Pořadové číslo ve frontě | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
dlouhý | Ne | Pořadové číslo přiřazené ke zprávě službou Service Bus. |
Čas zařazení do fronty | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Ne | Datum a čas, kdy byla tato zpráva zapsána do fronty ve službě Service Bus. |
Vyprší na adrese |
ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Ne | Datum a čas, kdy platnost této zprávy vyprší. |
Zámek tokenu | ServiceBusMessageHeaders#LOCK_TOKEN |
Řetězec | Ne | Token zámku pro aktuální zprávu. |
Uzamčeno do | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Ne | Datum a čas, kdy vyprší platnost zámku této zprávy. |
Pořadové číslo | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
dlouhý | Ne | Jedinečné číslo přiřazené ke zprávě službou Service Bus. |
Stát | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Ne | Stav zprávy, která může být Aktivní, Odloženo nebo Naplánováno. |
Podpora klíče oddílu
Tato úvodní sada podporuje dělení služby Service Bus povolením nastavení klíče oddílu a ID relace v hlavičce zprávy. V této části se dozvíte, jak nastavit klíč oddílu pro zprávy.
Doporučeno: jako klíč hlavičky použijte ServiceBusMessageHeaders.PARTITION_KEY
.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nedoporučuje se, ale aktuálně se podporuje:AzureHeaders.PARTITION_KEY
jako klíč hlavičky.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Poznámka
Pokud jsou v záhlaví zprávy nastavené ServiceBusMessageHeaders.PARTITION_KEY
i AzureHeaders.PARTITION_KEY
, je preferován ServiceBusMessageHeaders.PARTITION_KEY
.
Podpora relací
Tento příklad ukazuje, jak ručně nastavit ID relace zprávy v aplikaci.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Poznámka
Pokud je ServiceBusMessageHeaders.SESSION_ID
nastavena v záhlaví zprávy a je nastavena jiná hlavička ServiceBusMessageHeaders.PARTITION_KEY
, hodnota ID relace se nakonec použije k přepsání hodnoty klíče oddílu.
Přizpůsobení vlastností klienta služby Service Bus
Vývojáři můžou pomocí AzureServiceClientBuilderCustomizer
přizpůsobit vlastnosti klienta služby Service Bus. Následující příklad přizpůsobí vlastnost sessionIdleTimeout
v ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Vzorky
Další informace najdete v azure-spring-boot-samples úložišti na GitHubu.
Integrace Spring s frontou azure Storage
Klíčové koncepty
Azure Queue Storage je služba pro ukládání velkého počtu zpráv. Ke zprávům se dostanete odkudkoli na světě prostřednictvím ověřených volání pomocí protokolu HTTP nebo HTTPS. Zpráva fronty může mít velikost až 64 kB. Fronta může obsahovat miliony zpráv až do celkového limitu kapacity účtu úložiště. Fronty se běžně používají k vytvoření backlogu práce pro asynchronní zpracování.
Nastavení závislostí
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Konfigurace
Tato úvodní sada nabízí následující možnosti konfigurace:
Vlastnosti konfigurace připojení
Tato část obsahuje možnosti konfigurace používané pro připojení k frontě služby Azure Storage.
Poznámka
Pokud se rozhodnete použít objekt zabezpečení k ověření a autorizaci pomocí Microsoft Entra ID pro přístup k prostředku Azure, přečtěte si téma Autorizace přístupu s ID Microsoft Entra, abyste měli jistotu, že objekt zabezpečení má dostatečná oprávnění pro přístup k prostředku Azure.
Konfigurovatelné vlastnosti připojení spring-cloud-azure-starter-integration-storage-queue:
Vlastnost | Typ | Popis |
---|---|---|
spring.cloud.azure.storage.queue.enabled | booleovský | Určuje, jestli je povolená fronta služby Azure Storage. |
spring.cloud.azure.storage.queue.connection-string | Řetězec | Hodnota připojovacího řetězce fronty služby Storage |
spring.cloud.azure.storage.queue.accountName | Řetězec | Název účtu fronty úložiště |
spring.cloud.azure.storage.queue.accountKey | Řetězec | Klíč účtu fronty úložiště. |
spring.cloud.azure.storage.queue.endpoint | Řetězec | Koncový bod služby Fronta úložiště |
spring.cloud.azure.storage.queue.sasToken | Řetězec | Přihlašovací údaje tokenu Sas |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion, který se používá při vytváření požadavků rozhraní API. |
spring.cloud.azure.storage.queue.messageEncoding | Řetězec | Kódování zpráv ve frontě |
Základní využití
Odesílání zpráv do fronty služby Azure Storage
Vyplňte možnosti konfigurace přihlašovacích údajů.
Pro přihlašovací údaje jako připojovací řetězec nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
U přihlašovacích údajů jako spravovaných identit nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Poznámka
Hodnoty povolené pro tenant-id
jsou: common
, organizations
, consumers
nebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .
Pro přihlašovací údaje jako instanční objekt nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Poznámka
Hodnoty povolené pro tenant-id
jsou: common
, organizations
, consumers
nebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .
Vytvořte
DefaultMessageHandler
pomocíStorageQueueTemplate
bean pro odesílání zpráv do fronty úložiště.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Vytvořte vazbu brány zpráv s výše uvedenou obslužnou rutinou zprávy prostřednictvím kanálu zprávy.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Odesílání zpráv pomocí brány
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Příjem zpráv z fronty služby Azure Storage
Vyplňte možnosti konfigurace přihlašovacích údajů.
Vytvořte jako vstupní kanál bean kanálu zprávy.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Vytvořte
StorageQueueMessageSource
sStorageQueueTemplate
bean pro příjem zpráv do fronty úložiště.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Vytvořte vazbu příjemce zprávy pomocí StorageQueueMessageSource vytvořeného v posledním kroku prostřednictvím kanálu zprávy, který jsme vytvořili dříve.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Vzorky
Další informace najdete v azure-spring-boot-samples úložišti na GitHubu.