Sdílet prostřednictvím


Podpora Spring Cloud Azure pro Spring Cloud Stream

Tento článek se vztahuje na:✅ verze 4.19.0 ✅ verze 5.19.0

Spring Cloud Stream je architektura pro vytváření vysoce škálovatelných mikroslužeb založených na událostech propojených se sdílenými systémy zasílání zpráv.

Architektura poskytuje flexibilní programovací model založený na již zavedených a známých idiomech Spring a osvědčených postupech. Mezi tyto osvědčené postupy patří podpora trvalých sémantiek pub/sub, skupin příjemců a stavových oddílů.

Mezi aktuální implementace pořadače patří:

Spring Cloud Stream Binder pro Azure Event Hubs

Klíčové koncepty

Spring Cloud Stream Binder pro Azure Event Hubs poskytuje implementaci vazby pro architekturu Spring Cloud Stream. Tato implementace používá adaptéry kanálu Spring Integration Event Hubs na základu. Z hlediska návrhu je služba Event Hubs podobná jako Kafka. Ke službě Event Hubs je také možné přistupovat přes rozhraní Kafka API. Pokud má váš projekt úzkou závislost na rozhraní Kafka API, můžete zkusit Centrum událostí s ukázkou rozhraní Kafka API

Skupina příjemců

Event Hubs poskytuje podobnou podporu skupiny příjemců jako Apache Kafka, ale s mírnou odlišnou logikou. Zatímco Kafka ukládá všechny potvrzené posuny ve zprostředkovateli, musíte ukládat posuny zpráv Event Hubs zpracovávaných ručně. Sada Event Hubs SDK poskytuje funkci pro ukládání takových posunů ve službě Azure Storage.

Podpora dělení

Služba Event Hubs poskytuje podobný koncept fyzického oddílu jako Kafka. Na rozdíl od automatického vyrovnávání kafka mezi příjemci a oddíly ale Event Hubs poskytuje druh preemptivního režimu. Účet úložiště funguje jako zapůjčení, které příjemce vlastní, který oddíl vlastní. Když se spustí nový příjemce, pokusí se ukrást některé oddíly od nejvíce zatížených příjemců, aby dosáhl rovnováhy mezi úlohami.

Pro určení strategie vyrovnávání zatížení jsou k dispozici vlastnosti spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*. Další informace najdete v části Vlastnosti příjemce.

Podpora příjemců služby Batch

Binder Spring Cloud Azure Stream Event Hubs podporuje funkci Spring Cloud Stream Batch Consumer.

Chcete-li pracovat s režimem dávkového příjemce, nastavte vlastnost spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode na true. Pokud je tato možnost povolená, zpráva se seznamem dávkových událostí se přijme a předá funkci Consumer. Každé záhlaví zprávy je také převedeno na seznam, jehož obsah je přidružená hodnota záhlaví parsovaná z každé události. Společné hlavičky ID oddílu, kontrolního bodu a posledního zařazení vlastností jsou prezentovány jako jedna hodnota, protože celá dávka událostí sdílí stejnou hodnotu. Další informace najdete v hlavičce zpráv služby Event Hubs podpory Spring Cloud v Azure proSpring Integration .

Poznámka

Hlavička kontrolního bodu existuje pouze při použití režimu kontrolního bodu MANUAL.

Vytváření kontrolních bodů dávkového příjemce podporuje dva režimy: BATCH a MANUAL. BATCH režim je režim automatického vytváření kontrolních bodů, který kontroluje celou dávku událostí společně, jakmile je pořadač přijme. MANUAL režim slouží k vytvoření kontrolního bodu událostí uživateli. Při použití se Checkpointer předá do záhlaví zprávy a uživatelé ho můžou použít k vytváření kontrolních bodů.

Velikost dávky můžete zadat nastavením vlastností max-size a max-wait-time, které mají předponu spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. Vlastnost max-size je nutná a vlastnost max-wait-time je volitelná. Další informace najdete v části Vlastnosti příjemce.

Nastavení závislostí

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Alternativně můžete také použít úvodní sadu Spring Cloud Azure Stream Event Hubs, jak je znázorněno v následujícím příkladu pro Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Konfigurace

Pořadač poskytuje následující tři části možností konfigurace:

Vlastnosti konfigurace připojení

Tato část obsahuje možnosti konfigurace používané pro připojení ke službě Azure Event Hubs.

Poznámka

Pokud se rozhodnete použít objekt zabezpečení k ověření a autorizaci pomocí Microsoft Entra ID pro přístup k prostředku Azure, přečtěte si téma Autorizace přístupu s ID Microsoft Entra, abyste měli jistotu, že objekt zabezpečení má dostatečná oprávnění pro přístup k prostředku Azure.

Konfigurovatelné vlastnosti připojení spring-cloud-azure-stream-binder-eventhubs:

Vlastnost Typ Popis
spring.cloud.azure.eventhubs.enabled booleovský Určuje, jestli je služba Azure Event Hubs povolená.
spring.cloud.azure.eventhubs.connection-string Řetězec Hodnota připojovacího řetězce oboru názvů služby Event Hubs
spring.cloud.azure.eventhubs.namespace Řetězec Hodnota oboru názvů služby Event Hubs, což je předpona plně kvalifikovaného názvu domény. Plně kvalifikovaný název domény by se měl skládat z NamespaceName.DomainName.
spring.cloud.azure.eventhubs.domain-name Řetězec Název domény hodnoty oboru názvů služby Azure Event Hubs
spring.cloud.azure.eventhubs.custom-endpoint-address Řetězec Adresa vlastního koncového bodu

Spropitné

Běžné možnosti konfigurace sady SDK služby Azure Jsou konfigurovatelné také pro pořadač Spring Cloud Azure Stream Event Hubs. Podporované možnosti konfigurace jsou zavedeny v Spring Cloud Azure konfiguracea lze je nakonfigurovat pomocí sjednocené předpony spring.cloud.azure. nebo předpony spring.cloud.azure.eventhubs..

Pořadač také ve výchozím nastavení podporuje Spring Could Azure Resource Manageru. Další informace o načtení připojovacího řetězce s objekty zabezpečení, které nejsou uděleny Data souvisejícími rolemi, najdete v části Základní použitíSpring Could Azure Resource Manageru.

Vlastnosti konfigurace kontrolního bodu

Tato část obsahuje možnosti konfigurace pro službu Storage Blobs, která se používá k zachování vlastnictví oddílu a informací kontrolních bodů.

Poznámka

Ve verzi 4.0.0 není vlastnost spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists povolena ručně, nebude automaticky vytvořen žádný kontejner úložiště s názvem z spring.cloud.stream.bindings.binding-name.destination.

Konfigurace konfigurovatelných vlastností spring-cloud-azure-stream-binder-eventhubs:

Vlastnost Typ Popis
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleovský Zda chcete povolit vytváření kontejnerů, pokud neexistuje.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Řetězec Název účtu úložiště
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Řetězec Přístupový klíč účtu úložiště.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Řetězec Název kontejneru úložiště

Spropitné

Běžné možnosti konfigurace sady SDK služby Azure jsou konfigurovatelné i pro úložiště kontrolních bodů objektů blob služby Storage. Podporované možnosti konfigurace jsou zavedeny v Spring Cloud Azure konfiguracea lze je nakonfigurovat pomocí sjednocené předpony spring.cloud.azure. nebo předpony spring.cloud.azure.eventhubs.processor.checkpoint-store.

Vlastnosti konfigurace vazby služby Azure Event Hubs

Následující možnosti jsou rozdělené do čtyř částí: Vlastnosti příjemce, Rozšířené konfigurace příjemců, Vlastnosti producenta a Konfigurace rozšířeného producenta.

Vlastnosti příjemce

Tyto vlastnosti jsou zpřístupněny prostřednictvím EventHubsConsumerProperties.

Poznámka

Aby nedocházelo k opakování, protože verze 4.19.0 a 5.19.0 podporuje Spring Cloud Azure Stream Binder Event Hubs nastavení hodnot pro všechny kanály ve formátu spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Konfigurovatelné vlastnosti spring-cloud-azure-stream-binder-eventhubs:

Vlastnost Typ Popis
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Režim kontrolního bodu, který se používá, když se uživatel rozhodne, jak se má zpráva kontrolního bodu zobrazit
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Celé číslo Určuje množství zpráv pro každý oddíl, aby udělal jeden kontrolní bod. Projeví se pouze v PARTITION_COUNT režimu kontrolního bodu.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Trvání Určuje časový interval, který má provést jeden kontrolní bod. Projeví se pouze v TIME režimu kontrolního bodu.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Celé číslo Maximální počet událostí v dávce. Vyžaduje se pro režim dávkového příjemce.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Trvání Maximální doba trvání dávkového využívání. Projeví se pouze v případě, že je povolený režim dávkového příjemce a je volitelný.
spring.cloud.stream.eventhubs.binding-name.consumer.load-balancing.update-interval Trvání Doba trvání intervalu pro aktualizaci.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy Vyrovnávání zatížení –Strategy Strategie vyrovnávání zatížení.
spring.cloud.stream.eventhubs.binding-name.consumer.load-balancing.partition-ownership-interval Trvání Doba trvání, po které vyprší platnost vlastnictví oddílu.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Booleovský Určuje, jestli má procesor událostí požadovat informace o poslední události zařazení do fronty v přidruženém oddílu a sledovat informace při přijetí událostí.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Celé číslo Počet, který příjemce používá k řízení počtu událostí, které příjemce centra událostí aktivně přijme a zařadí do fronty místně.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Mapování pomocí klíče jako ID oddílu a hodnot StartPositionProperties Mapa obsahující pozici události, která se má použít pro každý oddíl, pokud kontrolní bod pro oddíl neexistuje v úložišti kontrolních bodů. Tato mapa se od klíče od ID oddílu.

Poznámka

Konfigurace initial-partition-event-position přijímá map k určení počáteční pozice pro každé centrum událostí. Jeho klíčem je ID oddílu a hodnota je StartPositionProperties, která zahrnuje vlastnosti posunu, pořadového čísla, vyčíslené datum a zda včetně. Můžete ho například nastavit jako

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Pokročilá konfigurace příjemce

Výše uvedené připojení, kontrolní boda běžné klienty sady Azure SDK konfigurace podporují přizpůsobení pro každého příjemce pořadače, který můžete nakonfigurovat s předponou spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Vlastnosti producenta

Tyto vlastnosti jsou zpřístupněny prostřednictvím EventHubsProducerProperties.

Poznámka

Aby nedocházelo k opakování, protože verze 4.19.0 a 5.19.0 podporuje Spring Cloud Azure Stream Binder Event Hubs nastavení hodnot pro všechny kanály ve formátu spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Konfigurovatelné vlastnosti spring-cloud-azure-stream-binder-eventhubs:

Vlastnost Typ Popis
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync booleovský Příznak přepínače pro synchronizaci producenta. Pokud je hodnota true, producent po operaci odeslání počká na odpověď.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout dlouhý Doba čekání na odpověď po operaci odeslání. Projeví se jenom v případech, kdy je povolený producent synchronizace.
Pokročilá konfigurace producenta

Výše uvedené připojení a běžné klienty sady Azure SDK konfigurace podporují přizpůsobení pro každého producenta pořadače, který můžete nakonfigurovat s předponou spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Základní využití

Odesílání a příjem zpráv ze služby Event Hubs nebo do služby Event Hubs

  1. Vyplňte možnosti konfigurace informacemi o přihlašovacích údaji.

    • Pro přihlašovací údaje jako připojovací řetězec nakonfigurujte v souboru application.yml následující vlastnosti:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Pro přihlašovací údaje jako instanční objekt nakonfigurujte v souboru application.yml následující vlastnosti:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Poznámka

Hodnoty povolené pro tenant-id jsou: common, organizations, consumersnebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .

  • U přihlašovacích údajů jako spravovaných identit nakonfigurujte v souboru application.yml následující vlastnosti:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Definujte dodavatele a spotřebitele.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Podpora dělení

Vytvoří se PartitionSupplier s informacemi o oddílu zadanými uživatelem, aby se nakonfigurovali informace o oddílu, který se má odeslat. Následující vývojový diagram ukazuje proces získání různých priorit PRO ID oddílu a klíč:

diagram znázorňující vývojový diagram procesu podpory dělení

Podpora příjemců služby Batch

  1. Zadejte možnosti konfigurace dávky, jak je znázorněno v následujícím příkladu:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Definujte dodavatele a spotřebitele.

    V případě režimu kontrolních bodů jako BATCHmůžete pomocí následujícího kódu odesílat zprávy a využívat je v dávkách.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Pro režim kontrolních bodů jako MANUALmůžete použít následující kód k odesílání zpráv a využívání/kontrolního bodu v dávkách.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Poznámka

V dávkovém režimu je výchozí typ obsahu pořadače Spring Cloud Stream application/json, takže se ujistěte, že datová část zprávy je zarovnaná s typem obsahu. Například při použití výchozího typu obsahu application/json pro příjem zpráv s String datovou částí by měla být datová část JSON String, obklopena dvojitými uvozovkami pro původní String text. I když text/plain typ obsahu, může se jednat o objekt String přímo. Další informace naleznete v tématu Spring Cloud Stream Content Type Vyjednávání.

Zpracování chybových zpráv

  • Zpracování chybových zpráv odchozí vazby

    Ve výchozím nastavení aplikace Spring Integration vytvoří globální kanál chyb s názvem errorChannel. Nakonfigurujte následující koncový bod zprávy pro zpracování chybových zpráv odchozí vazby.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Zpracování chybových zpráv příchozí vazby

    Spring Cloud Stream Event Hubs Binder podporuje jedno řešení pro zpracování chyb pro příchozí vazby zpráv: obslužné rutiny chyb.

    obslužné rutiny chyby :

    Spring Cloud Stream zveřejňuje mechanismus pro poskytnutí vlastní obslužné rutiny chyb přidáním Consumer, která přijímá ErrorMessage instance. Další informace najdete v tématu Zpracování chybových zpráv v dokumentaci ke službě Spring Cloud Stream.

    • Obslužná rutina chyby výchozí vazby

      Nakonfigurujte jednu Consumer bean tak, aby spotřebovávat všechny chybové zprávy příchozí vazby. Následující výchozí funkce se přihlásí k odběru každého kanálu chyby příchozí vazby:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Musíte také nastavit vlastnost spring.cloud.stream.default.error-handler-definition na název funkce.

    • Obslužná rutina chyby specifická pro vazbu

      Nakonfigurujte Consumer bean tak, aby spotřebovávat konkrétní chybové zprávy příchozí vazby. Následující funkce se přihlásí k odběru konkrétního chybového kanálu příchozí vazby a má vyšší prioritu než obslužná rutina chyb výchozí vazby:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Musíte také nastavit vlastnost spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition na název funkce.

Hlavičky zpráv služby Event Hubs

Základní podporovaná záhlaví zpráv najdete v části hlavičky zpráv služby Event Hubs části podpory Spring Cloud Azure prospring integration .

Podpora více pořadačů

Připojení k více oborům názvů služby Event Hubs je také podporováno pomocí několika pořadačů. V této ukázce se jako příklad používá připojovací řetězec. Podporují se také přihlašovací údaje instančních objektů a spravovaných identit. Související vlastnosti můžete nastavit v nastavení prostředí každého pořadače.

  1. Pokud chcete použít více pořadačů se službou Event Hubs, nakonfigurujte v souboru application.yml následující vlastnosti:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Poznámka

    Předchozí soubor aplikace ukazuje, jak nakonfigurovat jednu výchozí aplikaci pro aplikaci pro všechny vazby. Pokud chcete konfigurovat vrtule pro určitou vazbu, můžete použít konfiguraci, jako je spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Potřebujeme definovat dva dodavatele a dva spotřebitele:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Zřizování prostředků

Pořadač služby Event Hubs podporuje zřizování centra událostí a skupiny příjemců. Uživatelé můžou k povolení zřizování použít následující vlastnosti.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Poznámka

Hodnoty povolené pro tenant-id jsou: common, organizations, consumersnebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .

Vzorky

Další informace najdete v azure-spring-boot-samples úložišti na GitHubu.

Spring Cloud Stream Binder pro Azure Service Bus

Klíčové koncepty

Spring Cloud Stream Binder pro Azure Service Bus poskytuje implementaci vazby pro architekturu Spring Cloud Stream Framework. Tato implementace používá adaptéry kanálu Spring Integration Service Bus na základu.

Naplánovaná zpráva

Tento pořadač podporuje odesílání zpráv do tématu pro zpožděné zpracování. Uživatelé můžou posílat naplánované zprávy s hlavičkou x-delay vyjádřit v milisekundách dobu zpoždění zprávy. Zpráva se doručí do příslušných témat po x-delay milisekundách.

Skupina příjemců

Téma služby Service Bus poskytuje podobnou podporu skupiny příjemců jako Apache Kafka, ale s mírnou odlišnou logikou. Tento pořadač spoléhá na Subscription tématu, které funguje jako skupina příjemců.

Nastavení závislostí

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Alternativně můžete také použít úvodní sadu Spring Cloud Azure Stream Service Bus, jak je znázorněno v následujícím příkladu pro Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Konfigurace

Pořadač poskytuje následující dvě části možností konfigurace:

Vlastnosti konfigurace připojení

Tato část obsahuje možnosti konfigurace používané pro připojení ke službě Azure Service Bus.

Poznámka

Pokud se rozhodnete použít objekt zabezpečení k ověření a autorizaci pomocí Microsoft Entra ID pro přístup k prostředku Azure, přečtěte si téma Autorizace přístupu s ID Microsoft Entra, abyste měli jistotu, že objekt zabezpečení má dostatečná oprávnění pro přístup k prostředku Azure.

Konfigurovatelné vlastnosti připojení spring-cloud-azure-stream-binder-servicebus:

Vlastnost Typ Popis
spring.cloud.azure.servicebus.enabled booleovský Určuje, jestli je povolená služba Azure Service Bus.
spring.cloud.azure.servicebus.connection-string Řetězec Hodnota připojovacího řetězce oboru názvů služby Service Bus
spring.cloud.azure.servicebus.custom-endpoint-address Řetězec Vlastní adresa koncového bodu, která se má použít při připojování ke službě Service Bus.
spring.cloud.azure.servicebus.namespace Řetězec Hodnota oboru názvů služby Service Bus, což je předpona plně kvalifikovaného názvu domény. Plně kvalifikovaný název domény by se měl skládat z NamespaceName.DomainName.
spring.cloud.azure.servicebus.domain-name Řetězec Název domény hodnoty oboru názvů služby Azure Service Bus

Poznámka

Běžné možnosti konfigurace sady SDK služby Azure Jsou konfigurovatelné i pro binder Spring Cloud Azure Stream Service Bus. Podporované možnosti konfigurace jsou zavedeny v Spring Cloud Azure konfiguracea lze je nakonfigurovat pomocí sjednocené předpony spring.cloud.azure. nebo předpony spring.cloud.azure.servicebus..

Pořadač také ve výchozím nastavení podporuje Spring Could Azure Resource Manageru. Další informace o načtení připojovacího řetězce s objekty zabezpečení, které nejsou uděleny Data souvisejícími rolemi, najdete v části Základní použitíSpring Could Azure Resource Manageru.

Vlastnosti konfigurace vazby služby Azure Service Bus

Následující možnosti jsou rozdělené do čtyř částí: Vlastnosti příjemce, Rozšířené konfigurace příjemců, Vlastnosti producenta a Konfigurace rozšířeného producenta.

Vlastnosti příjemce

Tyto vlastnosti jsou zpřístupněny prostřednictvím ServiceBusConsumerProperties.

Poznámka

Aby nedocházelo k opakování, protože verze 4.19.0 a 5.19.0 podporuje Spring Cloud Azure Stream Binder Service Bus nastavení hodnot pro všechny kanály ve formátu spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Konfigurovatelné vlastnosti spring-cloud-azure-stream-binder-servicebus:

Vlastnost Typ Výchozí Popis
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected booleovský falešný Pokud jsou neúspěšné zprávy směrovány do dlQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Celé číslo 1 Maximální počet souběžných zpráv, které by měl zpracovat klient procesoru Service Bus. Pokud je relace povolená, platí pro každou relaci.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Celé číslo nula Maximální počet souběžných relací, které se mají zpracovat v libovolném okamžiku.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Booleovský nula Určuje, jestli je relace povolená.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Celé číslo 0 Prefetch count of the Service Bus processor client.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue Dílčí fronta žádný Typ podřízené fronty, ke které se chcete připojit.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Trvání 5 min. Doba, po kterou bude zámek automaticky obnovovat.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Režim příjmu klienta procesoru služby Service Bus
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Booleovský pravdivý Zda se mají zprávy automaticky vypořádávat. Pokud je nastavená hodnota false, přidá se záhlaví zprávy Checkpointer, aby vývojáři mohli zprávy ručně vyřešit.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabajtů Dlouhé celé číslo 1024 Maximální velikost fronty nebo tématu v megabajtech, což je velikost paměti přidělené frontě nebo tématu.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Trvání P10675199DT2H48M5.4775807S. (10675199 dny, 2 hodiny, 48 minut, 5 sekund a 477 milisekund) Doba trvání, po které vyprší platnost zprávy, počínaje odesláním zprávy do služby Service Bus.

Důležitý

Pokud používáte Azure Resource Manageru (ARM), musíte nakonfigurovat vlastnost spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type. Další informace najdete v ukázce servicebus-queue-binder-arm na GitHubu.

Pokročilá konfigurace příjemce

Výše uvedené připojení a běžné konfigurace klienta sady Azure SDK podporují přizpůsobení pro každého příjemce pořadače, který můžete nakonfigurovat s předponou spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Vlastnosti producenta

Tyto vlastnosti jsou zpřístupněny prostřednictvím ServiceBusProducerProperties.

Poznámka

Aby nedocházelo k opakování, protože verze 4.19.0 a 5.19.0 podporuje Spring Cloud Azure Stream Binder Service Bus nastavení hodnot pro všechny kanály ve formátu spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Konfigurovatelné vlastnosti spring-cloud-azure-stream-binder-servicebus:

Vlastnost Typ Výchozí Popis
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync booleovský falešný Přepněte příznak pro synchronizaci producenta.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout dlouhý 10000 Hodnota časového limitu pro odesílání producenta
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType nula Typ entity Service Bus producenta, který je nutný pro závazného producenta.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabajtů Dlouhé celé číslo 1024 Maximální velikost fronty nebo tématu v megabajtech, což je velikost paměti přidělené frontě nebo tématu.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Trvání P10675199DT2H48M5.4775807S. (10675199 dny, 2 hodiny, 48 minut, 5 sekund a 477 milisekund) Doba trvání, po které vyprší platnost zprávy, počínaje odesláním zprávy do služby Service Bus.

Důležitý

Při použití producenta vazby je nutné nakonfigurovat vlastnost spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type.

Pokročilá konfigurace producenta

Výše uvedené připojení a běžné klienty sady Azure SDK konfigurace podporují přizpůsobení pro každého producenta pořadače, který můžete nakonfigurovat s předponou spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Základní využití

Odesílání a příjem zpráv z/do služby Service Bus

  1. Vyplňte možnosti konfigurace informacemi o přihlašovacích údaji.

    • Pro přihlašovací údaje jako připojovací řetězec nakonfigurujte v souboru application.yml následující vlastnosti:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Pro přihlašovací údaje jako instanční objekt nakonfigurujte v souboru application.yml následující vlastnosti:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Poznámka

Hodnoty povolené pro tenant-id jsou: common, organizations, consumersnebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .

  • U přihlašovacích údajů jako spravovaných identit nakonfigurujte v souboru application.yml následující vlastnosti:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Definujte dodavatele a spotřebitele.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Podpora klíče oddílu

Pořadač podporuje dělení služby Service Bus povolením nastavení klíče oddílu a ID relace v hlavičce zprávy. V této části se dozvíte, jak nastavit klíč oddílu pro zprávy.

Spring Cloud Stream poskytuje vlastnost výrazu SpEL klíče oddílu spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Můžete například nastavit tuto vlastnost jako "'partitionKey-' + headers[<message-header-key>]" a přidat hlavičku s názvem message-header-key. Spring Cloud Stream používá hodnotu pro tuto hlavičku při vyhodnocování výrazu k přiřazení klíče oddílu. Následující kód poskytuje příklad producenta:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Podpora relací

Pořadač podporuje relace zpráv služby Service Bus. ID relace zprávy lze nastavit prostřednictvím záhlaví zprávy.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Poznámka

Podle dělení služby Service Bus má ID relace vyšší prioritu než klíč oddílu. Takže pokud jsou nastaveny obě hlavičky ServiceBusMessageHeaders#SESSION_ID i ServiceBusMessageHeaders#PARTITION_KEY, hodnota ID relace se nakonec použije k přepsání hodnoty klíče oddílu.

Zpracování chybových zpráv

  • Zpracování chybových zpráv odchozí vazby

    Ve výchozím nastavení aplikace Spring Integration vytvoří globální kanál chyb s názvem errorChannel. Nakonfigurujte následující koncový bod zprávy pro zpracování chybové zprávy odchozí vazby.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Zpracování chybových zpráv příchozí vazby

    Spring Cloud Stream Service Bus Binder podporuje dvě řešení pro zpracování chyb pro příchozí vazby zpráv: obslužnou rutinu chyby pořadače a obslužné rutiny.

    obslužné rutiny chyby Binderu:

    Výchozí obslužná rutina chyby pořadače zpracovává příchozí vazbu. Tuto obslužnou rutinu použijete k odesílání zpráv, které selhaly, do fronty nedoručených zpráv při povolení spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected. V opačném případě jsou zprávy, které selhaly, opuštěny. Obslužná rutina chyby pořadače se vzájemně vylučují s jinými zadanými obslužnými rutinami chyb.

    obslužné rutiny chyby :

    Spring Cloud Stream zveřejňuje mechanismus pro poskytnutí vlastní obslužné rutiny chyb přidáním Consumer, která přijímá ErrorMessage instance. Další informace najdete v tématu Zpracování chybových zpráv v dokumentaci ke službě Spring Cloud Stream.

    • Obslužná rutina chyby výchozí vazby

      Nakonfigurujte jednu Consumer bean tak, aby spotřebovávat všechny chybové zprávy příchozí vazby. Následující výchozí funkce se přihlásí k odběru každého kanálu chyby příchozí vazby:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Musíte také nastavit vlastnost spring.cloud.stream.default.error-handler-definition na název funkce.

    • Obslužná rutina chyby specifická pro vazbu

      Nakonfigurujte Consumer bean tak, aby spotřebovávat konkrétní chybové zprávy příchozí vazby. Následující funkce se přihlásí k odběru konkrétního chybového kanálu příchozí vazby s vyšší prioritou než obslužná rutina chyby výchozí vazby.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Musíte také nastavit vlastnost spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition na název funkce.

Hlavičky zpráv služby Service Bus

Základní záhlaví zpráv podporovaná najdete v části hlavičky zpráv Service Bus podpory Spring Cloud azure proSpring Integration .

Poznámka

Při nastavování klíče oddílu je priorita hlavičky zprávy vyšší než vlastnost Spring Cloud Stream. spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression se tedy projeví jenom v případech, kdy není nakonfigurovaná žádná z ServiceBusMessageHeaders#SESSION_ID a ServiceBusMessageHeaders#PARTITION_KEY hlaviček.

Podpora více pořadačů

Připojení k více oborům názvů služby Service Bus je také podporováno pomocí více pořadačů. V této ukázce se jako příklad používá připojovací řetězec. Podporují se také přihlašovací údaje instančních objektů a spravovaných identit. Uživatelé můžou v nastavení prostředí každého pořadače nastavit související vlastnosti.

  1. Pokud chcete použít více pořadačů ServiceBus, nakonfigurujte v souboru application.yml následující vlastnosti:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Poznámka

    Předchozí soubor aplikace ukazuje, jak nakonfigurovat jednu výchozí aplikaci pro aplikaci pro všechny vazby. Pokud chcete konfigurovat vrtule pro určitou vazbu, můžete použít konfiguraci, jako je spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. potřebujeme definovat dva dodavatele a dva spotřebitele.

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Zřizování prostředků

Pořadač služby Service Bus podporuje zřizování fronty, tématu a předplatného, uživatelé můžou k povolení zřizování použít následující vlastnosti.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Poznámka

Hodnoty povolené pro tenant-id jsou: common, organizations, consumersnebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .

Přizpůsobení vlastností klienta služby Service Bus

Vývojáři můžou pomocí AzureServiceClientBuilderCustomizer přizpůsobit vlastnosti klienta služby Service Bus. Následující příklad přizpůsobí vlastnost sessionIdleTimeout v ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Vzorky

Další informace najdete v azure-spring-boot-samples úložišti na GitHubu.