Podpora Spring Cloud Azure pro Spring Cloud Stream
Tento článek se vztahuje na:✅ verze 4.19.0 ✅ verze 5.19.0
Spring Cloud Stream je architektura pro vytváření vysoce škálovatelných mikroslužeb založených na událostech propojených se sdílenými systémy zasílání zpráv.
Architektura poskytuje flexibilní programovací model založený na již zavedených a známých idiomech Spring a osvědčených postupech. Mezi tyto osvědčené postupy patří podpora trvalých sémantiek pub/sub, skupin příjemců a stavových oddílů.
Mezi aktuální implementace pořadače patří:
-
spring-cloud-azure-stream-binder-eventhubs
– další informace najdete v tématu Spring Cloud Stream Binder pro službu Azure Event Hubs -
spring-cloud-azure-stream-binder-servicebus
– další informace najdete v tématu Spring Cloud Stream Binder pro službu Azure Service Bus
Spring Cloud Stream Binder pro Azure Event Hubs
Klíčové koncepty
Spring Cloud Stream Binder pro Azure Event Hubs poskytuje implementaci vazby pro architekturu Spring Cloud Stream. Tato implementace používá adaptéry kanálu Spring Integration Event Hubs na základu. Z hlediska návrhu je služba Event Hubs podobná jako Kafka. Ke službě Event Hubs je také možné přistupovat přes rozhraní Kafka API. Pokud má váš projekt úzkou závislost na rozhraní Kafka API, můžete zkusit Centrum událostí s ukázkou rozhraní Kafka API
Skupina příjemců
Event Hubs poskytuje podobnou podporu skupiny příjemců jako Apache Kafka, ale s mírnou odlišnou logikou. Zatímco Kafka ukládá všechny potvrzené posuny ve zprostředkovateli, musíte ukládat posuny zpráv Event Hubs zpracovávaných ručně. Sada Event Hubs SDK poskytuje funkci pro ukládání takových posunů ve službě Azure Storage.
Podpora dělení
Služba Event Hubs poskytuje podobný koncept fyzického oddílu jako Kafka. Na rozdíl od automatického vyrovnávání kafka mezi příjemci a oddíly ale Event Hubs poskytuje druh preemptivního režimu. Účet úložiště funguje jako zapůjčení, které příjemce vlastní, který oddíl vlastní. Když se spustí nový příjemce, pokusí se ukrást některé oddíly od nejvíce zatížených příjemců, aby dosáhl rovnováhy mezi úlohami.
Pro určení strategie vyrovnávání zatížení jsou k dispozici vlastnosti spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
. Další informace najdete v části Vlastnosti příjemce.
Podpora příjemců služby Batch
Binder Spring Cloud Azure Stream Event Hubs podporuje funkci Spring Cloud Stream Batch Consumer.
Chcete-li pracovat s režimem dávkového příjemce, nastavte vlastnost spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
na true
. Pokud je tato možnost povolená, zpráva se seznamem dávkových událostí se přijme a předá funkci Consumer
. Každé záhlaví zprávy je také převedeno na seznam, jehož obsah je přidružená hodnota záhlaví parsovaná z každé události. Společné hlavičky ID oddílu, kontrolního bodu a posledního zařazení vlastností jsou prezentovány jako jedna hodnota, protože celá dávka událostí sdílí stejnou hodnotu. Další informace najdete v hlavičce zpráv služby Event Hubs podpory Spring Cloud v Azure proSpring Integration .
Poznámka
Hlavička kontrolního bodu existuje pouze při použití režimu kontrolního bodu MANUAL
.
Vytváření kontrolních bodů dávkového příjemce podporuje dva režimy: BATCH
a MANUAL
.
BATCH
režim je režim automatického vytváření kontrolních bodů, který kontroluje celou dávku událostí společně, jakmile je pořadač přijme.
MANUAL
režim slouží k vytvoření kontrolního bodu událostí uživateli. Při použití se Checkpointer
předá do záhlaví zprávy a uživatelé ho můžou použít k vytváření kontrolních bodů.
Velikost dávky můžete zadat nastavením vlastností max-size
a max-wait-time
, které mají předponu spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
. Vlastnost max-size
je nutná a vlastnost max-wait-time
je volitelná. Další informace najdete v části Vlastnosti příjemce.
Nastavení závislostí
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Alternativně můžete také použít úvodní sadu Spring Cloud Azure Stream Event Hubs, jak je znázorněno v následujícím příkladu pro Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Konfigurace
Pořadač poskytuje následující tři části možností konfigurace:
Vlastnosti konfigurace připojení
Tato část obsahuje možnosti konfigurace používané pro připojení ke službě Azure Event Hubs.
Poznámka
Pokud se rozhodnete použít objekt zabezpečení k ověření a autorizaci pomocí Microsoft Entra ID pro přístup k prostředku Azure, přečtěte si téma Autorizace přístupu s ID Microsoft Entra, abyste měli jistotu, že objekt zabezpečení má dostatečná oprávnění pro přístup k prostředku Azure.
Konfigurovatelné vlastnosti připojení spring-cloud-azure-stream-binder-eventhubs:
Vlastnost | Typ | Popis |
---|---|---|
spring.cloud.azure.eventhubs.enabled | booleovský | Určuje, jestli je služba Azure Event Hubs povolená. |
spring.cloud.azure.eventhubs.connection-string | Řetězec | Hodnota připojovacího řetězce oboru názvů služby Event Hubs |
spring.cloud.azure.eventhubs.namespace | Řetězec | Hodnota oboru názvů služby Event Hubs, což je předpona plně kvalifikovaného názvu domény. Plně kvalifikovaný název domény by se měl skládat z NamespaceName.DomainName. |
spring.cloud.azure.eventhubs.domain-name | Řetězec | Název domény hodnoty oboru názvů služby Azure Event Hubs |
spring.cloud.azure.eventhubs.custom-endpoint-address | Řetězec | Adresa vlastního koncového bodu |
Spropitné
Běžné možnosti konfigurace sady SDK služby Azure Jsou konfigurovatelné také pro pořadač Spring Cloud Azure Stream Event Hubs. Podporované možnosti konfigurace jsou zavedeny v Spring Cloud Azure konfiguracea lze je nakonfigurovat pomocí sjednocené předpony spring.cloud.azure.
nebo předpony spring.cloud.azure.eventhubs.
.
Pořadač také ve výchozím nastavení podporuje Spring Could Azure Resource Manageru. Další informace o načtení připojovacího řetězce s objekty zabezpečení, které nejsou uděleny Data
souvisejícími rolemi, najdete v části Základní použitíSpring Could Azure Resource Manageru.
Vlastnosti konfigurace kontrolního bodu
Tato část obsahuje možnosti konfigurace pro službu Storage Blobs, která se používá k zachování vlastnictví oddílu a informací kontrolních bodů.
Poznámka
Ve verzi 4.0.0 není vlastnost spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists povolena ručně, nebude automaticky vytvořen žádný kontejner úložiště s názvem z spring.cloud.stream.bindings.binding-name.destination.
Konfigurace konfigurovatelných vlastností spring-cloud-azure-stream-binder-eventhubs:
Vlastnost | Typ | Popis |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booleovský | Zda chcete povolit vytváření kontejnerů, pokud neexistuje. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Řetězec | Název účtu úložiště |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Řetězec | Přístupový klíč účtu úložiště. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Řetězec | Název kontejneru úložiště |
Spropitné
Běžné možnosti konfigurace sady SDK služby Azure jsou konfigurovatelné i pro úložiště kontrolních bodů objektů blob služby Storage. Podporované možnosti konfigurace jsou zavedeny v Spring Cloud Azure konfiguracea lze je nakonfigurovat pomocí sjednocené předpony spring.cloud.azure.
nebo předpony spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Vlastnosti konfigurace vazby služby Azure Event Hubs
Následující možnosti jsou rozdělené do čtyř částí: Vlastnosti příjemce, Rozšířené konfigurace příjemců, Vlastnosti producenta a Konfigurace rozšířeného producenta.
Vlastnosti příjemce
Tyto vlastnosti jsou zpřístupněny prostřednictvím EventHubsConsumerProperties
.
Poznámka
Aby nedocházelo k opakování, protože verze 4.19.0 a 5.19.0 podporuje Spring Cloud Azure Stream Binder Event Hubs nastavení hodnot pro všechny kanály ve formátu spring.cloud.stream.eventhubs.default.consumer.<property>=<value>
.
Konfigurovatelné vlastnosti spring-cloud-azure-stream-binder-eventhubs:
Vlastnost | Typ | Popis |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | Režim kontrolního bodu, který se používá, když se uživatel rozhodne, jak se má zpráva kontrolního bodu zobrazit |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Celé číslo | Určuje množství zpráv pro každý oddíl, aby udělal jeden kontrolní bod. Projeví se pouze v PARTITION_COUNT režimu kontrolního bodu. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Trvání | Určuje časový interval, který má provést jeden kontrolní bod. Projeví se pouze v TIME režimu kontrolního bodu. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | Celé číslo | Maximální počet událostí v dávce. Vyžaduje se pro režim dávkového příjemce. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Trvání | Maximální doba trvání dávkového využívání. Projeví se pouze v případě, že je povolený režim dávkového příjemce a je volitelný. |
spring.cloud.stream.eventhubs.binding-name.consumer.load-balancing.update-interval | Trvání | Doba trvání intervalu pro aktualizaci. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | Vyrovnávání zatížení –Strategy | Strategie vyrovnávání zatížení. |
spring.cloud.stream.eventhubs.binding-name.consumer.load-balancing.partition-ownership-interval | Trvání | Doba trvání, po které vyprší platnost vlastnictví oddílu. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Booleovský | Určuje, jestli má procesor událostí požadovat informace o poslední události zařazení do fronty v přidruženém oddílu a sledovat informace při přijetí událostí. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Celé číslo | Počet, který příjemce používá k řízení počtu událostí, které příjemce centra událostí aktivně přijme a zařadí do fronty místně. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Mapování pomocí klíče jako ID oddílu a hodnot StartPositionProperties |
Mapa obsahující pozici události, která se má použít pro každý oddíl, pokud kontrolní bod pro oddíl neexistuje v úložišti kontrolních bodů. Tato mapa se od klíče od ID oddílu. |
Poznámka
Konfigurace initial-partition-event-position
přijímá map
k určení počáteční pozice pro každé centrum událostí. Jeho klíčem je ID oddílu a hodnota je StartPositionProperties
, která zahrnuje vlastnosti posunu, pořadového čísla, vyčíslené datum a zda včetně. Můžete ho například nastavit jako
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Pokročilá konfigurace příjemce
Výše uvedené připojení, kontrolní boda běžné klienty sady Azure SDK konfigurace podporují přizpůsobení pro každého příjemce pořadače, který můžete nakonfigurovat s předponou spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
.
Vlastnosti producenta
Tyto vlastnosti jsou zpřístupněny prostřednictvím EventHubsProducerProperties
.
Poznámka
Aby nedocházelo k opakování, protože verze 4.19.0 a 5.19.0 podporuje Spring Cloud Azure Stream Binder Event Hubs nastavení hodnot pro všechny kanály ve formátu spring.cloud.stream.eventhubs.default.producer.<property>=<value>
.
Konfigurovatelné vlastnosti spring-cloud-azure-stream-binder-eventhubs:
Vlastnost | Typ | Popis |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | booleovský | Příznak přepínače pro synchronizaci producenta. Pokud je hodnota true, producent po operaci odeslání počká na odpověď. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | dlouhý | Doba čekání na odpověď po operaci odeslání. Projeví se jenom v případech, kdy je povolený producent synchronizace. |
Pokročilá konfigurace producenta
Výše uvedené připojení a běžné klienty sady Azure SDK konfigurace podporují přizpůsobení pro každého producenta pořadače, který můžete nakonfigurovat s předponou spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
.
Základní využití
Odesílání a příjem zpráv ze služby Event Hubs nebo do služby Event Hubs
Vyplňte možnosti konfigurace informacemi o přihlašovacích údaji.
Pro přihlašovací údaje jako připojovací řetězec nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Pro přihlašovací údaje jako instanční objekt nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Poznámka
Hodnoty povolené pro tenant-id
jsou: common
, organizations
, consumers
nebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .
U přihlašovacích údajů jako spravovaných identit nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Definujte dodavatele a spotřebitele.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Podpora dělení
Vytvoří se PartitionSupplier
s informacemi o oddílu zadanými uživatelem, aby se nakonfigurovali informace o oddílu, který se má odeslat. Následující vývojový diagram ukazuje proces získání různých priorit PRO ID oddílu a klíč:
Podpora příjemců služby Batch
Zadejte možnosti konfigurace dávky, jak je znázorněno v následujícím příkladu:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Definujte dodavatele a spotřebitele.
V případě režimu kontrolních bodů jako
BATCH
můžete pomocí následujícího kódu odesílat zprávy a využívat je v dávkách.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Pro režim kontrolních bodů jako
MANUAL
můžete použít následující kód k odesílání zpráv a využívání/kontrolního bodu v dávkách.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Poznámka
V dávkovém režimu je výchozí typ obsahu pořadače Spring Cloud Stream application/json
, takže se ujistěte, že datová část zprávy je zarovnaná s typem obsahu. Například při použití výchozího typu obsahu application/json
pro příjem zpráv s String
datovou částí by měla být datová část JSON String
, obklopena dvojitými uvozovkami pro původní String
text. I když text/plain
typ obsahu, může se jednat o objekt String
přímo. Další informace naleznete v tématu Spring Cloud Stream Content Type Vyjednávání.
Zpracování chybových zpráv
Zpracování chybových zpráv odchozí vazby
Ve výchozím nastavení aplikace Spring Integration vytvoří globální kanál chyb s názvem
errorChannel
. Nakonfigurujte následující koncový bod zprávy pro zpracování chybových zpráv odchozí vazby.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Zpracování chybových zpráv příchozí vazby
Spring Cloud Stream Event Hubs Binder podporuje jedno řešení pro zpracování chyb pro příchozí vazby zpráv: obslužné rutiny chyb.
obslužné rutiny chyby
: Spring Cloud Stream zveřejňuje mechanismus pro poskytnutí vlastní obslužné rutiny chyb přidáním
Consumer
, která přijímáErrorMessage
instance. Další informace najdete v tématu Zpracování chybových zpráv v dokumentaci ke službě Spring Cloud Stream.Obslužná rutina chyby výchozí vazby
Nakonfigurujte jednu
Consumer
bean tak, aby spotřebovávat všechny chybové zprávy příchozí vazby. Následující výchozí funkce se přihlásí k odběru každého kanálu chyby příchozí vazby:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Musíte také nastavit vlastnost
spring.cloud.stream.default.error-handler-definition
na název funkce.Obslužná rutina chyby specifická pro vazbu
Nakonfigurujte
Consumer
bean tak, aby spotřebovávat konkrétní chybové zprávy příchozí vazby. Následující funkce se přihlásí k odběru konkrétního chybového kanálu příchozí vazby a má vyšší prioritu než obslužná rutina chyb výchozí vazby:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
Musíte také nastavit vlastnost
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
na název funkce.
Hlavičky zpráv služby Event Hubs
Základní podporovaná záhlaví zpráv najdete v části hlavičky zpráv služby Event Hubs části podpory Spring Cloud Azure prospring integration .
Podpora více pořadačů
Připojení k více oborům názvů služby Event Hubs je také podporováno pomocí několika pořadačů. V této ukázce se jako příklad používá připojovací řetězec. Podporují se také přihlašovací údaje instančních objektů a spravovaných identit. Související vlastnosti můžete nastavit v nastavení prostředí každého pořadače.
Pokud chcete použít více pořadačů se službou Event Hubs, nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Poznámka
Předchozí soubor aplikace ukazuje, jak nakonfigurovat jednu výchozí aplikaci pro aplikaci pro všechny vazby. Pokud chcete konfigurovat vrtule pro určitou vazbu, můžete použít konfiguraci, jako je
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.Potřebujeme definovat dva dodavatele a dva spotřebitele:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Zřizování prostředků
Pořadač služby Event Hubs podporuje zřizování centra událostí a skupiny příjemců. Uživatelé můžou k povolení zřizování použít následující vlastnosti.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Poznámka
Hodnoty povolené pro tenant-id
jsou: common
, organizations
, consumers
nebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .
Vzorky
Další informace najdete v azure-spring-boot-samples úložišti na GitHubu.
Spring Cloud Stream Binder pro Azure Service Bus
Klíčové koncepty
Spring Cloud Stream Binder pro Azure Service Bus poskytuje implementaci vazby pro architekturu Spring Cloud Stream Framework. Tato implementace používá adaptéry kanálu Spring Integration Service Bus na základu.
Naplánovaná zpráva
Tento pořadač podporuje odesílání zpráv do tématu pro zpožděné zpracování. Uživatelé můžou posílat naplánované zprávy s hlavičkou x-delay
vyjádřit v milisekundách dobu zpoždění zprávy. Zpráva se doručí do příslušných témat po x-delay
milisekundách.
Skupina příjemců
Téma služby Service Bus poskytuje podobnou podporu skupiny příjemců jako Apache Kafka, ale s mírnou odlišnou logikou.
Tento pořadač spoléhá na Subscription
tématu, které funguje jako skupina příjemců.
Nastavení závislostí
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Alternativně můžete také použít úvodní sadu Spring Cloud Azure Stream Service Bus, jak je znázorněno v následujícím příkladu pro Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Konfigurace
Pořadač poskytuje následující dvě části možností konfigurace:
Vlastnosti konfigurace připojení
Tato část obsahuje možnosti konfigurace používané pro připojení ke službě Azure Service Bus.
Poznámka
Pokud se rozhodnete použít objekt zabezpečení k ověření a autorizaci pomocí Microsoft Entra ID pro přístup k prostředku Azure, přečtěte si téma Autorizace přístupu s ID Microsoft Entra, abyste měli jistotu, že objekt zabezpečení má dostatečná oprávnění pro přístup k prostředku Azure.
Konfigurovatelné vlastnosti připojení spring-cloud-azure-stream-binder-servicebus:
Vlastnost | Typ | Popis |
---|---|---|
spring.cloud.azure.servicebus.enabled | booleovský | Určuje, jestli je povolená služba Azure Service Bus. |
spring.cloud.azure.servicebus.connection-string | Řetězec | Hodnota připojovacího řetězce oboru názvů služby Service Bus |
spring.cloud.azure.servicebus.custom-endpoint-address | Řetězec | Vlastní adresa koncového bodu, která se má použít při připojování ke službě Service Bus. |
spring.cloud.azure.servicebus.namespace | Řetězec | Hodnota oboru názvů služby Service Bus, což je předpona plně kvalifikovaného názvu domény. Plně kvalifikovaný název domény by se měl skládat z NamespaceName.DomainName. |
spring.cloud.azure.servicebus.domain-name | Řetězec | Název domény hodnoty oboru názvů služby Azure Service Bus |
Poznámka
Běžné možnosti konfigurace sady SDK služby Azure Jsou konfigurovatelné i pro binder Spring Cloud Azure Stream Service Bus. Podporované možnosti konfigurace jsou zavedeny v Spring Cloud Azure konfiguracea lze je nakonfigurovat pomocí sjednocené předpony spring.cloud.azure.
nebo předpony spring.cloud.azure.servicebus.
.
Pořadač také ve výchozím nastavení podporuje Spring Could Azure Resource Manageru. Další informace o načtení připojovacího řetězce s objekty zabezpečení, které nejsou uděleny Data
souvisejícími rolemi, najdete v části Základní použitíSpring Could Azure Resource Manageru.
Vlastnosti konfigurace vazby služby Azure Service Bus
Následující možnosti jsou rozdělené do čtyř částí: Vlastnosti příjemce, Rozšířené konfigurace příjemců, Vlastnosti producenta a Konfigurace rozšířeného producenta.
Vlastnosti příjemce
Tyto vlastnosti jsou zpřístupněny prostřednictvím ServiceBusConsumerProperties
.
Poznámka
Aby nedocházelo k opakování, protože verze 4.19.0 a 5.19.0 podporuje Spring Cloud Azure Stream Binder Service Bus nastavení hodnot pro všechny kanály ve formátu spring.cloud.stream.servicebus.default.consumer.<property>=<value>
.
Konfigurovatelné vlastnosti spring-cloud-azure-stream-binder-servicebus:
Vlastnost | Typ | Výchozí | Popis |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | booleovský | falešný | Pokud jsou neúspěšné zprávy směrovány do dlQ. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Celé číslo | 1 | Maximální počet souběžných zpráv, které by měl zpracovat klient procesoru Service Bus. Pokud je relace povolená, platí pro každou relaci. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Celé číslo | nula | Maximální počet souběžných relací, které se mají zpracovat v libovolném okamžiku. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Booleovský | nula | Určuje, jestli je relace povolená. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Celé číslo | 0 | Prefetch count of the Service Bus processor client. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | Dílčí fronta | žádný | Typ podřízené fronty, ke které se chcete připojit. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Trvání | 5 min. | Doba, po kterou bude zámek automaticky obnovovat. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Režim příjmu klienta procesoru služby Service Bus |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Booleovský | pravdivý | Zda se mají zprávy automaticky vypořádávat. Pokud je nastavená hodnota false, přidá se záhlaví zprávy Checkpointer , aby vývojáři mohli zprávy ručně vyřešit. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabajtů | Dlouhé celé číslo | 1024 | Maximální velikost fronty nebo tématu v megabajtech, což je velikost paměti přidělené frontě nebo tématu. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Trvání | P10675199DT2H48M5.4775807S. (10675199 dny, 2 hodiny, 48 minut, 5 sekund a 477 milisekund) | Doba trvání, po které vyprší platnost zprávy, počínaje odesláním zprávy do služby Service Bus. |
Důležitý
Pokud používáte Azure Resource Manageru (ARM), musíte nakonfigurovat vlastnost spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type
. Další informace najdete v ukázce servicebus-queue-binder-arm na GitHubu.
Pokročilá konfigurace příjemce
Výše uvedené připojení a běžné konfigurace klienta sady Azure SDK podporují přizpůsobení pro každého příjemce pořadače, který můžete nakonfigurovat s předponou spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
.
Vlastnosti producenta
Tyto vlastnosti jsou zpřístupněny prostřednictvím ServiceBusProducerProperties
.
Poznámka
Aby nedocházelo k opakování, protože verze 4.19.0 a 5.19.0 podporuje Spring Cloud Azure Stream Binder Service Bus nastavení hodnot pro všechny kanály ve formátu spring.cloud.stream.servicebus.default.producer.<property>=<value>
.
Konfigurovatelné vlastnosti spring-cloud-azure-stream-binder-servicebus:
Vlastnost | Typ | Výchozí | Popis |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | booleovský | falešný | Přepněte příznak pro synchronizaci producenta. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | dlouhý | 10000 | Hodnota časového limitu pro odesílání producenta |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | nula | Typ entity Service Bus producenta, který je nutný pro závazného producenta. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabajtů | Dlouhé celé číslo | 1024 | Maximální velikost fronty nebo tématu v megabajtech, což je velikost paměti přidělené frontě nebo tématu. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Trvání | P10675199DT2H48M5.4775807S. (10675199 dny, 2 hodiny, 48 minut, 5 sekund a 477 milisekund) | Doba trvání, po které vyprší platnost zprávy, počínaje odesláním zprávy do služby Service Bus. |
Důležitý
Při použití producenta vazby je nutné nakonfigurovat vlastnost spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
.
Pokročilá konfigurace producenta
Výše uvedené připojení a běžné klienty sady Azure SDK konfigurace podporují přizpůsobení pro každého producenta pořadače, který můžete nakonfigurovat s předponou spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
.
Základní využití
Odesílání a příjem zpráv z/do služby Service Bus
Vyplňte možnosti konfigurace informacemi o přihlašovacích údaji.
Pro přihlašovací údaje jako připojovací řetězec nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Pro přihlašovací údaje jako instanční objekt nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Poznámka
Hodnoty povolené pro tenant-id
jsou: common
, organizations
, consumers
nebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .
U přihlašovacích údajů jako spravovaných identit nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Definujte dodavatele a spotřebitele.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Podpora klíče oddílu
Pořadač podporuje dělení služby Service Bus povolením nastavení klíče oddílu a ID relace v hlavičce zprávy. V této části se dozvíte, jak nastavit klíč oddílu pro zprávy.
Spring Cloud Stream poskytuje vlastnost výrazu SpEL klíče oddílu spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
. Můžete například nastavit tuto vlastnost jako "'partitionKey-' + headers[<message-header-key>]"
a přidat hlavičku s názvem message-header-key. Spring Cloud Stream používá hodnotu pro tuto hlavičku při vyhodnocování výrazu k přiřazení klíče oddílu. Následující kód poskytuje příklad producenta:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Podpora relací
Pořadač podporuje relace zpráv služby Service Bus. ID relace zprávy lze nastavit prostřednictvím záhlaví zprávy.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Poznámka
Podle dělení služby Service Bus má ID relace vyšší prioritu než klíč oddílu. Takže pokud jsou nastaveny obě hlavičky ServiceBusMessageHeaders#SESSION_ID
i ServiceBusMessageHeaders#PARTITION_KEY
, hodnota ID relace se nakonec použije k přepsání hodnoty klíče oddílu.
Zpracování chybových zpráv
Zpracování chybových zpráv odchozí vazby
Ve výchozím nastavení aplikace Spring Integration vytvoří globální kanál chyb s názvem
errorChannel
. Nakonfigurujte následující koncový bod zprávy pro zpracování chybové zprávy odchozí vazby.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Zpracování chybových zpráv příchozí vazby
Spring Cloud Stream Service Bus Binder podporuje dvě řešení pro zpracování chyb pro příchozí vazby zpráv: obslužnou rutinu chyby pořadače a obslužné rutiny.
obslužné rutiny chyby Binderu:
Výchozí obslužná rutina chyby pořadače zpracovává příchozí vazbu. Tuto obslužnou rutinu použijete k odesílání zpráv, které selhaly, do fronty nedoručených zpráv při povolení
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
. V opačném případě jsou zprávy, které selhaly, opuštěny. Obslužná rutina chyby pořadače se vzájemně vylučují s jinými zadanými obslužnými rutinami chyb.obslužné rutiny chyby
: Spring Cloud Stream zveřejňuje mechanismus pro poskytnutí vlastní obslužné rutiny chyb přidáním
Consumer
, která přijímáErrorMessage
instance. Další informace najdete v tématu Zpracování chybových zpráv v dokumentaci ke službě Spring Cloud Stream.Obslužná rutina chyby výchozí vazby
Nakonfigurujte jednu
Consumer
bean tak, aby spotřebovávat všechny chybové zprávy příchozí vazby. Následující výchozí funkce se přihlásí k odběru každého kanálu chyby příchozí vazby:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Musíte také nastavit vlastnost
spring.cloud.stream.default.error-handler-definition
na název funkce.Obslužná rutina chyby specifická pro vazbu
Nakonfigurujte
Consumer
bean tak, aby spotřebovávat konkrétní chybové zprávy příchozí vazby. Následující funkce se přihlásí k odběru konkrétního chybového kanálu příchozí vazby s vyšší prioritou než obslužná rutina chyby výchozí vazby.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Musíte také nastavit vlastnost
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
na název funkce.
Hlavičky zpráv služby Service Bus
Základní záhlaví zpráv podporovaná najdete v části hlavičky zpráv
Poznámka
Při nastavování klíče oddílu je priorita hlavičky zprávy vyšší než vlastnost Spring Cloud Stream.
spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
se tedy projeví jenom v případech, kdy není nakonfigurovaná žádná z ServiceBusMessageHeaders#SESSION_ID
a ServiceBusMessageHeaders#PARTITION_KEY
hlaviček.
Podpora více pořadačů
Připojení k více oborům názvů služby Service Bus je také podporováno pomocí více pořadačů. V této ukázce se jako příklad používá připojovací řetězec. Podporují se také přihlašovací údaje instančních objektů a spravovaných identit. Uživatelé můžou v nastavení prostředí každého pořadače nastavit související vlastnosti.
Pokud chcete použít více pořadačů ServiceBus, nakonfigurujte v souboru application.yml následující vlastnosti:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Poznámka
Předchozí soubor aplikace ukazuje, jak nakonfigurovat jednu výchozí aplikaci pro aplikaci pro všechny vazby. Pokud chcete konfigurovat vrtule pro určitou vazbu, můžete použít konfiguraci, jako je
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.potřebujeme definovat dva dodavatele a dva spotřebitele.
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Zřizování prostředků
Pořadač služby Service Bus podporuje zřizování fronty, tématu a předplatného, uživatelé můžou k povolení zřizování použít následující vlastnosti.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Poznámka
Hodnoty povolené pro tenant-id
jsou: common
, organizations
, consumers
nebo ID tenanta. Další informace o těchto hodnotách najdete v části Použití nesprávného koncového bodu (osobní a organizační účty) části Chyba AADSTS50020 – Uživatelský účet od zprostředkovatele identity vtenanta neexistuje . Informace o převodu aplikace s jedním tenantem najdete v tématu Převod jednoklientských aplikací na víceklienta vMicrosoft Entra ID .
Přizpůsobení vlastností klienta služby Service Bus
Vývojáři můžou pomocí AzureServiceClientBuilderCustomizer
přizpůsobit vlastnosti klienta služby Service Bus. Následující příklad přizpůsobí vlastnost sessionIdleTimeout
v ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Vzorky
Další informace najdete v azure-spring-boot-samples úložišti na GitHubu.