Sdílet prostřednictvím


Podpora Spring Cloud Azure pro integraci Spring

Tento článek se vztahuje na:✅ verze 4.19.0 ✅ verze 5.19.0

Rozšíření Spring Integration pro Azure poskytuje adaptéry integrace Spring pro různé služby poskytované sadou Azure SDK pro Javu. Poskytujeme podporu integrace Spring pro tyto služby Azure: Event Hubs, Service Bus, Fronta úložiště. Následuje seznam podporovaných adaptérů:

Integrace Spring se službou Azure Event Hubs

Klíčové koncepty

Azure Event Hubs je platforma pro streamování velkých objemů dat a služba pro příjem událostí. Může přijímat a zpracovávat miliony událostí za sekundu. Data odesílaná do centra událostí je možné transformovat a ukládat pomocí libovolného poskytovatele analýz v reálném čase nebo adaptérů batchingu nebo úložiště.

Spring Integration umožňuje zjednodušené zasílání zpráv v aplikacích založených na Springu a podporuje integraci s externími systémy prostřednictvím deklarativních adaptérů. Tyto adaptéry poskytují vyšší úroveň abstrakce oproti podpoře vzdálené komunikace, zasílání zpráv a plánování. Integrace Spring pro službu Event Hubs rozšíření poskytuje adaptéry a brány pro příchozí a odchozí kanály pro službu Azure Event Hubs.

Poznámka

Rozhraní API podpory RxJava se zahodí z verze 4.0.0. Podrobnosti najdete v Javadocu.

Skupina příjemců

Event Hubs poskytuje podobnou podporu skupiny příjemců jako Apache Kafka, ale s mírnou odlišnou logikou. Zatímco Kafka ukládá všechny potvrzené posuny ve zprostředkovateli, musíte ukládat posuny zpráv Event Hubs zpracovávaných ručně. Sada Event Hubs SDK poskytuje funkci pro ukládání takových posunů ve službě Azure Storage.

Podpora dělení

Služba Event Hubs poskytuje podobný koncept fyzického oddílu jako Kafka. Na rozdíl od automatického vyrovnávání mezi uživateli a oddíly Kafka ale služba Event Hubs poskytuje druh preemptivního režimu. Účet úložiště funguje jako zapůjčení k určení, který oddíl vlastní uživatel. Když se spustí nový příjemce, pokusí se ukrást některé oddíly od většiny náročných příjemců, aby se dosáhlo vyrovnávání zatížení.

Pokud chcete určit strategii vyrovnávání zatížení, můžou vývojáři pro konfiguraci použít EventHubsContainerProperties. Příklad konfigurace EventHubsContainerPropertiesnajdete v následující části.

Podpora příjemců služby Batch

EventHubsInboundChannelAdapter podporuje režim dávkového využívání. Pokud ho chcete povolit, mohou uživatelé při vytváření instance EventHubsInboundChannelAdapter určit režim naslouchacího procesu jako ListenerMode.BATCH. Pokud je tato možnost povolená, zpráva , ze které je datová část seznamem dávkových událostí, se přijme a předá do podřízeného kanálu. Každé záhlaví zprávy je také převedeno jako seznam, jehož obsah je přidružená hodnota záhlaví parsovaná z každé události. Pro společné hlavičky ID oddílu, kontrolní boder a poslední zařazené vlastnosti jsou prezentovány jako jedna hodnota pro celou dávku událostí sdílí stejný. Další informace najdete v části Hlavičky zpráv služby Event Hubs.

Poznámka

Hlavička kontrolního bodu existuje pouze v případech, kdy se používá režim kontrolního bodu MANUAL.

Vytváření kontrolních bodů dávkového příjemce podporuje dva režimy: BATCH a MANUAL. BATCH režim je režim automatického vytváření kontrolních bodů pro vytvoření kontrolního bodu pro identifikaci celé dávky událostí po jejich přijetí. MANUAL režim slouží k vytvoření kontrolního bodu událostí uživateli. Při použití se Kontrolní boder předá do záhlaví zprávy a uživatelé ho můžou použít k vytváření kontrolních bodů.

Zásady dávkového využívání lze určit vlastnostmi max-size a max-wait-time, kde max-size je nezbytná vlastnost, zatímco max-wait-time je nepovinná. Pokud chcete určit strategii dávkového využívání, můžou vývojáři pro konfiguraci použít EventHubsContainerProperties. Příklad konfigurace EventHubsContainerPropertiesnajdete v následující části.

Nastavení závislostí

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfigurace

Tato úvodní sada poskytuje následující tři části možností konfigurace:

Vlastnosti konfigurace připojení

Tato část obsahuje možnosti konfigurace používané pro připojení ke službě Azure Event Hubs.

Poznámka

Pokud se rozhodnete použít objekt zabezpečení k ověření a autorizaci pomocí Microsoft Entra ID pro přístup k prostředku Azure, přečtěte si téma Autorizace přístupu s ID Microsoft Entra, abyste měli jistotu, že objekt zabezpečení má dostatečná oprávnění pro přístup k prostředku Azure.

Konfigurovatelné vlastnosti připojení spring-cloud-azure-starter-integration-eventhubs:

Vlastnost Typ Popis
spring.cloud.azure.eventhubs.enabled booleovský Určuje, jestli je služba Azure Event Hubs povolená.
spring.cloud.azure.eventhubs.connection-string Řetězec Hodnota připojovacího řetězce oboru názvů služby Event Hubs
spring.cloud.azure.eventhubs.namespace Řetězec Hodnota oboru názvů služby Event Hubs, což je předpona plně kvalifikovaného názvu domény. Plně kvalifikovaný název domény by se měl skládat z NamespaceName.DomainName.
spring.cloud.azure.eventhubs.domain-name Řetězec Název domény hodnoty oboru názvů služby Azure Event Hubs
spring.cloud.azure.eventhubs.custom-endpoint-address Řetězec Adresa vlastního koncového bodu
spring.cloud.azure.eventhubs.shared-connection Booleovský Určuje, jestli základní EventProcessorClient a EventHubProducerAsyncClient používají stejné připojení. Ve výchozím nastavení se vytvoří nové připojení a použije se pro každého vytvořeného klienta centra událostí.

Vlastnosti konfigurace kontrolního bodu

Tato část obsahuje možnosti konfigurace pro službu Storage Blobs, která se používá k zachování vlastnictví oddílu a informací kontrolních bodů.

Poznámka

Ve verzi 4.0.0 není vlastnost spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists povolena ručně, nebude automaticky vytvořen žádný kontejner úložiště.

Konfigurovatelné vlastnosti kontrolních bodů spring-cloud-azure-starter-integration-eventhubs:

Vlastnost Typ Popis
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleovský Zda chcete povolit vytváření kontejnerů, pokud neexistuje.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Řetězec Název účtu úložiště
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Řetězec Přístupový klíč účtu úložiště.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Řetězec Název kontejneru úložiště

Běžné možnosti konfigurace sady SDK služby Azure jsou konfigurovatelné i pro úložiště kontrolních bodů objektů blob služby Storage. Podporované možnosti konfigurace jsou zavedeny v Spring Cloud Azure konfiguracea lze je nakonfigurovat pomocí sjednocené předpony spring.cloud.azure. nebo předpony spring.cloud.azure.eventhubs.processor.checkpoint-store.

Vlastnosti konfigurace procesoru centra událostí

EventHubsInboundChannelAdapter používá EventProcessorClient ke zpracování zpráv z centra událostí, ke konfiguraci celkových vlastností EventProcessorClientmohou vývojáři použít EventHubsContainerProperties pro konfiguraci. Přečtěte si následující části o tom, jak pracovat s EventHubsInboundChannelAdapter.

Základní využití

Odesílání zpráv do služby Azure Event Hubs

  1. Vyplňte možnosti konfigurace přihlašovacích údajů.

    • Pro přihlašovací údaje jako připojovací řetězec nakonfigurujte v souboru application.yml následující vlastnosti:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • U přihlašovacích údajů jako spravovaných identit nakonfigurujte v souboru application.yml následující vlastnosti:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Pro přihlašovací údaje jako instanční objekt nakonfigurujte v souboru application.yml následující vlastnosti:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Poznámka

Hodnoty povolené pro tenant-id jsou: common, organizations, consumersnebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .

  1. Vytvořte DefaultMessageHandler pomocí EventHubsTemplate bean pro odesílání zpráv do služby Event Hubs.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Vytvořte vazbu brány zpráv s výše uvedenou obslužnou rutinou zprávy prostřednictvím kanálu zprávy.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Odesílání zpráv pomocí brány

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Příjem zpráv ze služby Azure Event Hubs

  1. Vyplňte možnosti konfigurace přihlašovacích údajů.

  2. Vytvořte jako vstupní kanál bean kanálu zprávy.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Vytvořte EventHubsInboundChannelAdapter pomocí EventHubsMessageListenerContainer bean pro příjem zpráv ze služby Event Hubs.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Vytvořte vazbu příjemce zprávy pomocí eventHubsInboundChannelAdapter prostřednictvím kanálu zprávy vytvořeného dříve.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurace EventHubsMessageConverter pro přizpůsobení objectMapper

EventHubsMessageConverter se vytvoří jako konfigurovatelná bean, která uživatelům umožní přizpůsobit ObjectMapper.

Podpora příjemců služby Batch

Pokud chcete využívat zprávy ze služby Event Hubs v dávkách, je podobné jako u výše uvedené ukázky, kromě toho by uživatelé měli nastavit možnosti konfigurace související s dávkovou spotřebou pro EventHubsInboundChannelAdapter.

Při vytváření EventHubsInboundChannelAdapterby měl být režim naslouchacího procesu nastaven jako BATCH. Při vytváření bean z EventHubsMessageListenerContainernastavte režim kontrolního bodu na MANUAL nebo BATCHa možnosti dávky lze podle potřeby nakonfigurovat.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Hlavičky zpráv služby Event Hubs

Následující tabulka ukazuje, jak jsou vlastnosti zpráv služby Event Hubs mapovány na hlavičky zpráv Spring. U služby Azure Event Hubs se zpráva volá jako event.

Mapování mezi zprávou event hubs / vlastnostmi události a záhlavími springových zpráv v režimu naslouchacího procesu záznamu:

Vlastnosti událostí služby Event Hubs Konstanty záhlaví springových zpráv Typ Popis
Čas zařazení do fronty EventHubsHeaders#ENQUEUED_TIME Okamžitý Okamžitě v UTC, kdy byla událost zapsána do fronty v oddílu centra událostí.
Ofset EventHubsHeaders#OFFSET Dlouhý Posun události při přijetí z přidruženého oddílu centra událostí.
Klíč oddílu AzureHeaders#PARTITION_KEY Řetězec Klíč hashování oddílu, pokud byl nastaven při původním publikování události.
ID oddílu AzureHeaders#RAW_PARTITION_ID Řetězec ID oddílu centra událostí.
Pořadové číslo EventHubsHeaders#SEQUENCE_NUMBER Dlouhý Pořadové číslo přiřazené události, když byla zařazena do fronty v přidruženém oddílu centra událostí.
Vlastnosti poslední události ve frontě EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Vlastnosti poslední události v tomto oddílu.
SODÍK AzureHeaders#CHECKPOINTER Kontrolní bod Záhlaví pro kontrolní bod konkrétní zprávy.

Uživatelé mohou analyzovat záhlaví zprávy pro související informace o každé události. Pokud chcete nastavit záhlaví zprávy pro událost, budou všechna přizpůsobená záhlaví vložena jako vlastnost aplikace události, kde je hlavička nastavena jako klíč vlastnosti. Při přijetí událostí ze služby Event Hubs se všechny vlastnosti aplikace převedou do záhlaví zprávy.

Poznámka

Záhlaví zpráv klíče oddílu, čas zařazení do fronty, posun a pořadové číslo není podporováno ruční nastavení.

Pokud je povolený režim dávkového příjemce, zobrazí se následující konkrétní hlavičky dávkových zpráv, které obsahují seznam hodnot z každé události služby Event Hubs.

Mapování mezi zprávami event hubs / vlastnostmi události a hlavičkami spring zpráv v režimu dávkového naslouchacího procesu:

Vlastnosti událostí služby Event Hubs Konstanty záhlaví zpráv Spring Batch Typ Popis
Čas zařazení do fronty EventHubsHeaders#ENQUEUED_TIME Seznam okamžitých Seznam okamžité události ve standardu UTC, kdy byla každá událost zapsána do fronty v oddílu centra událostí.
Ofset EventHubsHeaders#OFFSET Seznam dlouhých Seznam posunu každé události při přijetí z přidruženého oddílu centra událostí
Klíč oddílu AzureHeaders#PARTITION_KEY Seznam řetězců Seznam klíče hash oddílu, pokud byl nastaven při původním publikování každé události.
Pořadové číslo EventHubsHeaders#SEQUENCE_NUMBER Seznam dlouhých Seznam pořadového čísla přiřazeného každé události, když byla zařazena do fronty v přidruženém oddílu centra událostí.
Systémové vlastnosti EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Seznam map Seznam systémových vlastností každé události
Vlastnosti aplikace EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Seznam map Seznam vlastností aplikace každé události, kde jsou umístěny všechny přizpůsobené hlavičky zpráv nebo vlastnosti události.

Poznámka

Při publikování zpráv budou všechny výše uvedené dávkové hlavičky odebrány ze zpráv, pokud existují.

Vzorky

Další informace najdete v azure-spring-boot-samples úložišti na GitHubu.

Integrace Spring se službou Azure Service Bus

Klíčové koncepty

Spring Integration umožňuje zjednodušené zasílání zpráv v aplikacích založených na Springu a podporuje integraci s externími systémy prostřednictvím deklarativních adaptérů.

Projekt rozšíření Spring Integration for Azure Service Bus poskytuje adaptéry příchozích a odchozích kanálů pro Službu Azure Service Bus.

Poznámka

Rozhraní API podpory CompletableFuture jsou od verze 2.10.0 zastaralá a nahrazuje jádro Reactor verze 4.0.0. Podrobnosti najdete v Javadocu.

Nastavení závislostí

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfigurace

Tato úvodní sada poskytuje následující dvě části možností konfigurace:

Vlastnosti konfigurace připojení

Tato část obsahuje možnosti konfigurace používané pro připojení ke službě Azure Service Bus.

Poznámka

Pokud se rozhodnete použít objekt zabezpečení k ověření a autorizaci pomocí Microsoft Entra ID pro přístup k prostředku Azure, přečtěte si téma Autorizace přístupu s ID Microsoft Entra, abyste měli jistotu, že objekt zabezpečení má dostatečná oprávnění pro přístup k prostředku Azure.

Konfigurovatelné vlastnosti připojení spring-cloud-azure-starter-integration-servicebus:

Vlastnost Typ Popis
spring.cloud.azure.servicebus.enabled booleovský Určuje, jestli je povolená služba Azure Service Bus.
spring.cloud.azure.servicebus.connection-string Řetězec Hodnota připojovacího řetězce oboru názvů služby Service Bus
spring.cloud.azure.servicebus.custom-endpoint-address Řetězec Vlastní adresa koncového bodu, která se má použít při připojování ke službě Service Bus.
spring.cloud.azure.servicebus.namespace Řetězec Hodnota oboru názvů služby Service Bus, což je předpona plně kvalifikovaného názvu domény. Plně kvalifikovaný název domény by se měl skládat z NamespaceName.DomainName.
spring.cloud.azure.servicebus.domain-name Řetězec Název domény hodnoty oboru názvů služby Azure Service Bus

Vlastnosti konfigurace procesoru služby Service Bus

ServiceBusInboundChannelAdapter používá ServiceBusProcessorClient ke zpracování zpráv, ke konfiguraci celkových vlastností ServiceBusProcessorClient, mohou vývojáři pro konfiguraci použít ServiceBusContainerProperties. Přečtěte si následující části o tom, jak pracovat s ServiceBusInboundChannelAdapter.

Základní využití

Odesílání zpráv do služby Azure Service Bus

  1. Vyplňte možnosti konfigurace přihlašovacích údajů.

    • Pro přihlašovací údaje jako připojovací řetězec nakonfigurujte v souboru application.yml následující vlastnosti:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • U přihlašovacích údajů jako spravovaných identit nakonfigurujte v souboru application.yml následující vlastnosti:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Poznámka

Hodnoty povolené pro tenant-id jsou: common, organizations, consumersnebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .

  • Pro přihlašovací údaje jako instanční objekt nakonfigurujte v souboru application.yml následující vlastnosti:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Poznámka

Hodnoty povolené pro tenant-id jsou: common, organizations, consumersnebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .

  1. Vytvořte DefaultMessageHandler pomocí ServiceBusTemplate bean pro odesílání zpráv do služby Service Bus a nastavte typ entity pro ServiceBusTemplate. V této ukázce se například používá fronta služby Service Bus.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Vytvořte vazbu brány zpráv s výše uvedenou obslužnou rutinou zprávy prostřednictvím kanálu zprávy.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Odesílání zpráv pomocí brány

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Příjem zpráv ze služby Azure Service Bus

  1. Vyplňte možnosti konfigurace přihlašovacích údajů.

  2. Vytvořte jako vstupní kanál bean kanálu zprávy.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Vytvořte ServiceBusInboundChannelAdapter s ServiceBusMessageListenerContainer bean pro příjem zpráv do služby Service Bus. V této ukázce se například používá fronta služby Service Bus.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Vytvořte vazbu příjemce zprávy s ServiceBusInboundChannelAdapter prostřednictvím kanálu zprávy, který jsme vytvořili dříve.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurace ServiceBusMessageConverter pro přizpůsobení objectMapper

ServiceBusMessageConverter se vytvoří jako konfigurovatelná bean, která uživatelům umožní přizpůsobit ObjectMapper.

Hlavičky zpráv služby Service Bus

U některých hlaviček služby Service Bus, které je možné mapovat na více konstant záhlaví Spring, je uvedena priorita různých hlaviček Spring.

Mapování mezi hlavičkami služby Service Bus a spring headers:

Hlavičky a vlastnosti zpráv služby Service Bus Konstanty záhlaví spring zprávy Typ Konfigurovatelný Popis
Typ obsahu MessageHeaders#CONTENT_TYPE Řetězec Ano Popisovač typu obsahu zprávy RFC2045.
ID korelace ServiceBusMessageHeaders#CORRELATION_ID Řetězec Ano ID korelace zprávy
ID zprávy ServiceBusMessageHeaders#MESSAGE_ID Řetězec Ano ID zprávy má toto záhlaví vyšší prioritu než MessageHeaders#ID.
ID zprávy MessageHeaders#ID UUID Ano ID zprávy má toto záhlaví nižší prioritu než ServiceBusMessageHeaders#MESSAGE_ID.
Klíč oddílu ServiceBusMessageHeaders#PARTITION_KEY Řetězec Ano Klíč oddílu pro odeslání zprávy do dělené entity.
Odpovědět na MessageHeaders#REPLY_CHANNEL Řetězec Ano Adresa entity, na kterou se mají odesílat odpovědi.
Odpověď na ID relace ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Řetězec Ano Hodnota vlastnosti ReplyToGroupId zprávy.
Naplánovaný čas fronty utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Ano Datum a čas, kdy má být zpráva ve frontě ve službě Service Bus, má tato hlavička vyšší prioritu než AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Naplánovaný čas fronty utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Celé číslo Ano Datum a čas, kdy má být zpráva ve frontě ve službě Service Bus, má tato hlavička nižší prioritu než ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
ID relace ServiceBusMessageHeaders#SESSION_ID Řetězec Ano IDentifier relace pro entitu pracující s relací.
Čas k živému provozu ServiceBusMessageHeaders#TIME_TO_LIVE Trvání Ano Doba trvání před vypršením platnosti této zprávy.
K ServiceBusMessageHeaders#TO Řetězec Ano Adresa "komu" zprávy, vyhrazená pro budoucí použití ve scénářích směrování a v současnosti ignorována samotným zprostředkovatelem.
Předmět ServiceBusMessageHeaders#SUBJECT Řetězec Ano Předmět zprávy.
Popis chyby nedoručených zpráv ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Řetězec Ne Popis zprávy, která byla nedoručována.
Důvod nedoručených dopisů ServiceBusMessageHeaders#DEAD_LETTER_REASON Řetězec Ne Důvod, proč byla zpráva nedoručována.
Zdroj nedoručených dopisů ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Řetězec Ne Entita, ve které byla zpráva nedoručována.
Počet doručení ServiceBusMessageHeaders#DELIVERY_COUNT dlouhý Ne Počet doručení této zprávy klientům.
Pořadové číslo ve frontě ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER dlouhý Ne Pořadové číslo přiřazené ke zprávě službou Service Bus.
Čas zařazení do fronty ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Ne Datum a čas, kdy byla tato zpráva zapsána do fronty ve službě Service Bus.
Vyprší na adrese . ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Ne Datum a čas, kdy platnost této zprávy vyprší.
Zámek tokenu ServiceBusMessageHeaders#LOCK_TOKEN Řetězec Ne Token zámku pro aktuální zprávu.
Uzamčeno do ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Ne Datum a čas, kdy vyprší platnost zámku této zprávy.
Pořadové číslo ServiceBusMessageHeaders#SEQUENCE_NUMBER dlouhý Ne Jedinečné číslo přiřazené ke zprávě službou Service Bus.
Stát ServiceBusMessageHeaders#STATE ServiceBusMessageState Ne Stav zprávy, která může být Aktivní, Odloženo nebo Naplánováno.

Podpora klíče oddílu

Tato úvodní sada podporuje dělení služby Service Bus povolením nastavení klíče oddílu a ID relace v hlavičce zprávy. V této části se dozvíte, jak nastavit klíč oddílu pro zprávy.

Doporučeno: jako klíč hlavičky použijte ServiceBusMessageHeaders.PARTITION_KEY.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nedoporučuje se, ale aktuálně se podporuje:AzureHeaders.PARTITION_KEY jako klíč hlavičky.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Poznámka

Pokud jsou v záhlaví zprávy nastavené ServiceBusMessageHeaders.PARTITION_KEY i AzureHeaders.PARTITION_KEY, je preferován ServiceBusMessageHeaders.PARTITION_KEY.

Podpora relací

Tento příklad ukazuje, jak ručně nastavit ID relace zprávy v aplikaci.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Poznámka

Pokud je ServiceBusMessageHeaders.SESSION_ID nastavena v záhlaví zprávy a je nastavena jiná hlavička ServiceBusMessageHeaders.PARTITION_KEY, hodnota ID relace se nakonec použije k přepsání hodnoty klíče oddílu.

Přizpůsobení vlastností klienta služby Service Bus

Vývojáři můžou pomocí AzureServiceClientBuilderCustomizer přizpůsobit vlastnosti klienta služby Service Bus. Následující příklad přizpůsobí vlastnost sessionIdleTimeout v ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Vzorky

Další informace najdete v azure-spring-boot-samples úložišti na GitHubu.

Integrace Spring s frontou azure Storage

Klíčové koncepty

Azure Queue Storage je služba pro ukládání velkého počtu zpráv. Ke zprávům se dostanete odkudkoli na světě prostřednictvím ověřených volání pomocí protokolu HTTP nebo HTTPS. Zpráva fronty může mít velikost až 64 kB. Fronta může obsahovat miliony zpráv až do celkového limitu kapacity účtu úložiště. Fronty se běžně používají k vytvoření backlogu práce pro asynchronní zpracování.

Nastavení závislostí

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfigurace

Tato úvodní sada nabízí následující možnosti konfigurace:

Vlastnosti konfigurace připojení

Tato část obsahuje možnosti konfigurace používané pro připojení k frontě služby Azure Storage.

Poznámka

Pokud se rozhodnete použít objekt zabezpečení k ověření a autorizaci pomocí Microsoft Entra ID pro přístup k prostředku Azure, přečtěte si téma Autorizace přístupu s ID Microsoft Entra, abyste měli jistotu, že objekt zabezpečení má dostatečná oprávnění pro přístup k prostředku Azure.

Konfigurovatelné vlastnosti připojení spring-cloud-azure-starter-integration-storage-queue:

Vlastnost Typ Popis
spring.cloud.azure.storage.queue.enabled booleovský Určuje, jestli je povolená fronta služby Azure Storage.
spring.cloud.azure.storage.queue.connection-string Řetězec Hodnota připojovacího řetězce fronty služby Storage
spring.cloud.azure.storage.queue.accountName Řetězec Název účtu fronty úložiště
spring.cloud.azure.storage.queue.accountKey Řetězec Klíč účtu fronty úložiště.
spring.cloud.azure.storage.queue.endpoint Řetězec Koncový bod služby Fronta úložiště
spring.cloud.azure.storage.queue.sasToken Řetězec Přihlašovací údaje tokenu Sas
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion, který se používá při vytváření požadavků rozhraní API.
spring.cloud.azure.storage.queue.messageEncoding Řetězec Kódování zpráv ve frontě

Základní využití

Odesílání zpráv do fronty služby Azure Storage

  1. Vyplňte možnosti konfigurace přihlašovacích údajů.

    • Pro přihlašovací údaje jako připojovací řetězec nakonfigurujte v souboru application.yml následující vlastnosti:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      
    • U přihlašovacích údajů jako spravovaných identit nakonfigurujte v souboru application.yml následující vlastnosti:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Poznámka

Hodnoty povolené pro tenant-id jsou: common, organizations, consumersnebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .

  • Pro přihlašovací údaje jako instanční objekt nakonfigurujte v souboru application.yml následující vlastnosti:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Poznámka

Hodnoty povolené pro tenant-id jsou: common, organizations, consumersnebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .

  1. Vytvořte DefaultMessageHandler pomocí StorageQueueTemplate bean pro odesílání zpráv do fronty úložiště.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Vytvořte vazbu brány zpráv s výše uvedenou obslužnou rutinou zprávy prostřednictvím kanálu zprávy.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Odesílání zpráv pomocí brány

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Příjem zpráv z fronty služby Azure Storage

  1. Vyplňte možnosti konfigurace přihlašovacích údajů.

  2. Vytvořte jako vstupní kanál bean kanálu zprávy.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Vytvořte StorageQueueMessageSource s StorageQueueTemplate bean pro příjem zpráv do fronty úložiště.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Vytvořte vazbu příjemce zprávy pomocí StorageQueueMessageSource vytvořeného v posledním kroku prostřednictvím kanálu zprávy, který jsme vytvořili dříve.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Vzorky

Další informace najdete v azure-spring-boot-samples úložišti na GitHubu.