Obsługa platformy Azure platformy Spring Cloud dla usługi Spring Cloud Stream
Ten artykuł dotyczy:✅ w wersji 4.19.0 ✅ w wersji 5.19.0
Spring Cloud Stream to struktura umożliwiająca tworzenie wysoce skalowalnych mikrousług opartych na zdarzeniach połączonych z udostępnionymi systemami obsługi komunikatów.
Platforma udostępnia elastyczny model programowania oparty na już ustalonych i znanych idiomach Spring i najlepszych rozwiązaniach. Te najlepsze rozwiązania obejmują obsługę trwałych semantyki pub/podsieci, grup odbiorców i partycji stanowych.
Bieżące implementacje bindera obejmują:
-
spring-cloud-azure-stream-binder-eventhubs
— aby uzyskać więcej informacji, zobacz binder strumienia Spring Cloud dla usługi Azure Event Hubs -
spring-cloud-azure-stream-binder-servicebus
— aby uzyskać więcej informacji, zobacz Spring Cloud Stream Binder for Azure Service Bus
Powiązanie strumienia Spring Cloud dla usługi Azure Event Hubs
Kluczowe pojęcia
Powiązanie strumienia Spring Cloud dla usługi Azure Event Hubs zapewnia implementację powiązań dla platformy Spring Cloud Stream. Ta implementacja używa adapterów kanału spring Integration Event Hubs w jego podstawach. Z perspektywy projektu usługa Event Hubs jest podobna do platformy Kafka. Ponadto dostęp do usługi Event Hubs można uzyskać za pośrednictwem interfejsu API platformy Kafka. Jeśli projekt ma ścisłą zależność od interfejsu API platformy Kafka, możesz spróbować Event Hub przy użyciu przykładu interfejsu API platformy Kafka— przykład
Grupa odbiorców
Usługa Event Hubs zapewnia podobną obsługę grupy odbiorców jako platformę Apache Kafka, ale z niewielką inną logiką. Podczas gdy platforma Kafka przechowuje wszystkie zatwierdzone przesunięcia w brokerze, musisz przechowywać przesunięcia komunikatów usługi Event Hubs przetwarzanych ręcznie. Zestaw SDK usługi Event Hubs udostępnia funkcję do przechowywania takich przesunięć w usłudze Azure Storage.
Obsługa partycjonowania
Usługa Event Hubs udostępnia podobną koncepcję partycji fizycznej jako platformy Kafka. Jednak w przeciwieństwie do automatycznego równoważenia platformy Kafka między użytkownikami i partycjami usługa Event Hubs zapewnia rodzaj trybu wyprzedzania. Konto magazynu działa jako dzierżawa, aby określić, który użytkownik jest właścicielem której partycji. Po uruchomieniu nowego użytkownika próbuje ukraść niektóre partycje od najbardziej obciążonych użytkowników w celu osiągnięcia równowagi obciążenia.
Aby określić strategię równoważenia obciążenia, udostępniane są właściwości spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
. Aby uzyskać więcej informacji, zobacz sekcję Właściwości konsumenta.
Obsługa konsumentów usługi Batch
Binder spring Cloud Azure Stream Event Hubs obsługuje funkcję Spring Cloud Batch Consumer.
Aby pracować z trybem batch-consumer, ustaw właściwość spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
na wartość true
. Po włączeniu komunikat z ładunkiem listy zdarzeń wsadowych jest odbierany i przekazywany do funkcji Consumer
. Każdy nagłówek wiadomości jest również konwertowany na listę, z której zawartość jest skojarzona wartość nagłówka analizowana z każdego zdarzenia. Wspólne nagłówki identyfikatora partycji, modułu kontrolnego i ostatnio w kolejce właściwości są prezentowane jako pojedyncza wartość, ponieważ cała partia zdarzeń ma tę samą wartość. Aby uzyskać więcej informacji, zobacz nagłówki komunikatów usługi Event Hubs sekcji Spring Cloud Azure support for Spring Integration.
Nuta
Nagłówek punktu kontrolnego istnieje tylko wtedy, gdy jest używany tryb punktu kontrolnego MANUAL
.
Punkt kontrolny odbiorcy wsadowego obsługuje dwa tryby: BATCH
i MANUAL
.
BATCH
tryb jest trybem automatycznego tworzenia punktów kontrolnych, aby wskazać całą partię zdarzeń razem po odebraniu przez binder.
MANUAL
tryb polega na określeniu punktów kontrolnych zdarzeń przez użytkowników. W przypadku użycia Checkpointer
jest przekazywany do nagłówka wiadomości, a użytkownicy mogą używać go do tworzenia punktów kontrolnych.
Rozmiar partii można określić, ustawiając właściwości max-size
i max-wait-time
, które mają prefiks spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
. Właściwość max-size
jest wymagana, a właściwość max-wait-time
jest opcjonalna. Aby uzyskać więcej informacji, zobacz sekcję Właściwości konsumenta.
Konfiguracja zależności
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Alternatywnie możesz również użyć szablonu startowego Spring Cloud Azure Stream Event Hubs, jak pokazano w poniższym przykładzie dla narzędzia Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Konfiguracja
Powiążnik udostępnia następujące trzy części opcji konfiguracji:
Właściwości konfiguracji połączenia
Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Event Hubs.
Nuta
Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzuj dostęp za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.
Konfigurowalne właściwości połączenia spring-cloud-azure-stream-binder-eventhubs:
Własność | Typ | Opis |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolowski | Określa, czy usługa Azure Event Hubs jest włączona. |
spring.cloud.azure.eventhubs.connection-string | Struna | Wartość parametrów połączenia przestrzeni nazw usługi Event Hubs. |
spring.cloud.azure.eventhubs.namespace | Struna | Wartość przestrzeni nazw usługi Event Hubs, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Struna | Nazwa domeny wartości przestrzeni nazw usługi Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Struna | Niestandardowy adres punktu końcowego. |
Napiwek
Typowe opcje konfiguracji zestawu SDK usługi Azure Service można również konfigurować dla powiązania usługi Azure Stream Azure Stream w usłudze Spring Cloud. Obsługiwane opcje konfiguracji są wprowadzane w konfiguracji platformy Azure Spring Cloudi można je skonfigurować za pomocą ujednoliconego prefiksu spring.cloud.azure.
lub prefiksu spring.cloud.azure.eventhubs.
.
Binder obsługuje również Spring Could Azure Resource Manager domyślnie. Aby dowiedzieć się, jak pobrać parametry połączenia przy użyciu podmiotów zabezpieczeń, które nie zostały przyznane z rolami powiązanymi z Data
, zobacz sekcję podstawowego użyciaSpring Could Azure Resource Manager.
Właściwości konfiguracji punktu kontrolnego
Ta sekcja zawiera opcje konfiguracji usługi Storage Blobs Service, która jest używana do utrwalania własności partycji i informacji o punkcie kontrolnym.
Nuta
W wersji 4.0.0, jeśli właściwość spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nie jest włączona ręcznie, żaden kontener magazynu nie zostanie utworzony automatycznie z nazwą z spring.cloud.stream.bindings.binding-name.destination.
Tworzenie punktów kontrolnych konfigurowalnych właściwości spring-cloud-azure-stream-binder-eventhubs:
Własność | Typ | Opis |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolowski | Czy zezwolić na tworzenie kontenerów, jeśli nie istnieje. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Struna | Nazwa konta magazynu. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Struna | Klucz dostępu do konta magazynu. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Struna | Nazwa kontenera magazynu. |
Napiwek
Typowe opcje konfiguracji zestawu SDK usługi Azure Service można również konfigurować dla magazynu punktów kontrolnych obiektów blob usługi Storage. Obsługiwane opcje konfiguracji są wprowadzane w konfiguracji platformy Azure Spring Cloudi można je skonfigurować za pomocą ujednoliconego prefiksu spring.cloud.azure.
lub prefiksu spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Właściwości konfiguracji powiązania usługi Azure Event Hubs
Następujące opcje są podzielone na cztery sekcje: Właściwości konsumenta, Zaawansowane konfiguracje konsumentów, Właściwości producenta i Zaawansowane konfiguracje producenta.
Właściwości odbiorcy
Te właściwości są widoczne za pośrednictwem EventHubsConsumerProperties
.
Nuta
Aby uniknąć powtórzeń, ponieważ wersja 4.19.0 i 5.19.0, usługa Spring Cloud Azure Stream Binder Event Hubs obsługuje ustawianie wartości dla wszystkich kanałów w formacie spring.cloud.stream.eventhubs.default.consumer.<property>=<value>
.
Konfigurowalne właściwości rozwiązania spring-cloud-azure-stream-binder-eventhubs:
Własność | Typ | Opis |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | Tryb punktu kontrolnego używany, gdy użytkownik decyduje o sposobie wyświetlania komunikatu punktu kontrolnego |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Liczba całkowita | Decyduje o ilości komunikatu dla każdej partycji do wykonania jednego punktu kontrolnego. Zacznie obowiązywać tylko wtedy, gdy jest używany PARTITION_COUNT tryb punktu kontrolnego. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Czas trwania | Decyduje o interwale czasu do wykonania jednego punktu kontrolnego. Zacznie obowiązywać tylko wtedy, gdy jest używany TIME tryb punktu kontrolnego. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | Liczba całkowita | Maksymalna liczba zdarzeń w partii. Wymagany dla trybu batch-consumer. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Czas trwania | Maksymalny czas trwania partii. Zacznie obowiązywać tylko wtedy, gdy tryb batch-consumer jest włączony i jest opcjonalny. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Czas trwania | Czas trwania interwału aktualizacji. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | Strategia równoważenia obciążenia. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Czas trwania | Czas trwania, po którym wygasa własność partycji. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Boolowski | Czy procesor zdarzeń powinien zażądać informacji o ostatnim zdarzeniu w kolejce na skojarzonej partycji i śledzić te informacje w miarę odbierania zdarzeń. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Liczba całkowita | Liczba używana przez konsumenta do kontrolowania liczby zdarzeń, które odbiorca centrum zdarzeń będzie aktywnie odbierać i kolejkować lokalnie. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Mapowanie przy użyciu klucza jako identyfikatora partycji i wartości StartPositionProperties |
Mapa zawierająca położenie zdarzenia do użycia dla każdej partycji, jeśli punkt kontrolny partycji nie istnieje w magazynie punktów kontrolnych. Ta mapa jest kluczem z identyfikatora partycji. |
Nuta
Konfiguracja initial-partition-event-position
akceptuje map
w celu określenia początkowej pozycji dla każdego centrum zdarzeń. W związku z tym jego kluczem jest identyfikator partycji, a wartość jest StartPositionProperties
, która zawiera właściwości przesunięcia, numeru sekwencji, w kolejce daty i tego, czy włącznie. Można na przykład ustawić ją jako
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Zaawansowana konfiguracja konsumenta
Powyższe połączenia, punktów kontrolnychi typowe konfiguracje klienta zestawu Azure SDK obsługują dostosowywanie dla każdego odbiorcy powiązania, który można skonfigurować przy użyciu prefiksu spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
.
Właściwości producenta
Te właściwości są widoczne za pośrednictwem EventHubsProducerProperties
.
Nuta
Aby uniknąć powtórzeń, ponieważ wersja 4.19.0 i 5.19.0, usługa Spring Cloud Azure Stream Binder Event Hubs obsługuje ustawianie wartości dla wszystkich kanałów w formacie spring.cloud.stream.eventhubs.default.producer.<property>=<value>
.
Konfigurowalne właściwości spring-cloud-azure-stream-binder-eventhubs:
Własność | Typ | Opis |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | boolowski | Flaga przełącznika do synchronizacji producenta. Jeśli to prawda, producent będzie czekać na odpowiedź po operacji wysyłania. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | długi | Czas oczekiwania na odpowiedź po operacji wysyłania. Zacznie obowiązywać tylko wtedy, gdy producent synchronizacji jest włączony. |
Zaawansowana konfiguracja producenta
Powyższe połączenia i typowe konfiguracje klienta zestawu Azure SDK obsługują dostosowywanie dla każdego producenta binder, który można skonfigurować przy użyciu prefiksu spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
.
Podstawowe użycie
Wysyłanie i odbieranie komunikatów z/do usługi Event Hubs
Wypełnij opcje konfiguracji informacjami o poświadczeniach.
W przypadku poświadczeń jako parametrów połączenia skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Nuta
Dozwolone wartości dla tenant-id
to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.
W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Definiowanie dostawcy i konsumenta.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Obsługa partycjonowania
Zostanie utworzona PartitionSupplier
z informacjami o partycji udostępnianymi przez użytkownika w celu skonfigurowania informacji o partycji dotyczących wiadomości do wysłania. Poniższy schemat blokowy przedstawia proces uzyskiwania różnych priorytetów dla identyfikatora partycji i klucza:
Obsługa konsumentów usługi Batch
Podaj opcje konfiguracji wsadowej, jak pokazano w poniższym przykładzie:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Definiowanie dostawcy i konsumenta.
W przypadku trybu tworzenia punktów kontrolnych jako
BATCH
można użyć następującego kodu do wysyłania komunikatów i używania ich w partiach.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
W przypadku trybu tworzenia punktów kontrolnych jako
MANUAL
można użyć następującego kodu do wysyłania komunikatów i używania/punktu kontrolnego w partiach.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Nuta
W trybie przetwarzania wsadowego domyślny typ zawartości powiązania strumienia Spring Cloud jest application/json
, dlatego upewnij się, że ładunek komunikatu jest zgodny z typem zawartości. Na przykład w przypadku używania domyślnego typu zawartości application/json
do odbierania komunikatów z ładunkiem String
ładunek powinien być JSON String
, otoczony podwójnymi cudzysłowymi dla oryginalnego tekstu String
. Podczas text/plain
typu zawartości może to być obiekt String
bezpośrednio. Aby uzyskać więcej informacji, zobacz Negocjowanie typu zawartości strumienia Spring Cloud.
Obsługa komunikatów o błędach
Obsługa komunikatów o błędach powiązania wychodzącego
Domyślnie platforma Spring Integration tworzy globalny kanał błędów o nazwie
errorChannel
. Skonfiguruj następujący punkt końcowy komunikatu, aby obsługiwać komunikaty o błędach powiązania wychodzącego.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Obsługa komunikatów o błędach powiązania przychodzącego
Powiązanie usługi Event Hubs spring Cloud Stream obsługuje jedno rozwiązanie do obsługi błędów dla powiązań komunikatów przychodzących: procedur obsługi błędów.
program obsługi błędów :
Usługa Spring Cloud Stream udostępnia mechanizm obsługi błędów niestandardowych, dodając
Consumer
, który akceptuje wystąpieniaErrorMessage
. Aby uzyskać więcej informacji, zobacz Obsługa komunikatów o błędach w dokumentacji usługi Spring Cloud Stream.Domyślna procedura obsługi błędów powiązania
Skonfiguruj pojedynczą
Consumer
fasolę, aby korzystać ze wszystkich komunikatów o błędach powiązania przychodzącego. Następująca funkcja domyślna subskrybuje każdy kanał błędów powiązania przychodzącego:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Należy również ustawić właściwość
spring.cloud.stream.default.error-handler-definition
na nazwę funkcji.Procedura obsługi błędów specyficzna dla powiązania
Skonfiguruj
Consumer
fasoli, aby korzystać z określonych komunikatów o błędach powiązania przychodzącego. Następująca funkcja subskrybuje określony kanał błędów powiązania przychodzącego i ma wyższy priorytet niż domyślna procedura obsługi błędów powiązania:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
Należy również ustawić właściwość
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
na nazwę funkcji.
Nagłówki komunikatów usługi Event Hubs
Aby zapoznać się z podstawowymi nagłówkami komunikatów obsługiwanymi, zobacz sekcję nagłówków komunikatów usługi Event Hubsobsługa platformy Azure spring Cloud dla usługi Spring Integration.
Obsługa wielu powiązań
Połączenie z wieloma przestrzeniami nazw usługi Event Hubs jest również obsługiwane przy użyciu wielu powiązań. W tym przykładzie parametry połączenia są przykładowe. Obsługiwane są również poświadczenia jednostek usługi i tożsamości zarządzanych. Powiązane właściwości można ustawić w ustawieniach środowiska każdego powiązania.
Aby użyć wielu powiązań z usługą Event Hubs, skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Nuta
W poprzednim pliku aplikacji pokazano, jak skonfigurować pojedynczy domyślny element poller dla aplikacji do wszystkich powiązań. Jeśli chcesz skonfigurować narzędzie poller dla określonego powiązania, możesz użyć konfiguracji, takiej jak
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.Musimy zdefiniować dwóch dostawców i dwóch konsumentów:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Aprowizowanie zasobów
Binder usługi Event Hubs obsługuje aprowizację centrum zdarzeń i grupy odbiorców. Użytkownicy mogą użyć następujących właściwości w celu włączenia aprowizacji.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Nuta
Dozwolone wartości dla tenant-id
to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.
Próbki
Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.
Powiązanie strumienia Spring Cloud dla usługi Azure Service Bus
Kluczowe pojęcia
Powiązanie strumienia Spring Cloud dla usługi Azure Service Bus zapewnia implementację powiązania dla struktury Spring Cloud Stream Framework. Ta implementacja używa adapterów kanału Spring Integration Service Bus na jego podstawie.
Zaplanowana wiadomość
Ten binder obsługuje przesyłanie komunikatów do tematu w celu opóźnionego przetwarzania. Użytkownicy mogą wysyłać zaplanowane komunikaty z nagłówkiem x-delay
wyrażając w milisekundach czas opóźnienia komunikatu. Wiadomość zostanie dostarczona do odpowiednich tematów po x-delay
milisekundach.
Grupa odbiorców
Temat usługi Service Bus zapewnia podobną obsługę grupy odbiorców jako platformy Apache Kafka, ale z niewielką różnicą logiki.
Ten binder opiera się na Subscription
tematu, który działa jako grupa odbiorców.
Konfiguracja zależności
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Alternatywnie możesz również użyć szablonu startowego spring Cloud Azure Stream Service Bus, jak pokazano w poniższym przykładzie dla narzędzia Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Konfiguracja
Binder udostępnia następujące dwie części opcji konfiguracji:
Właściwości konfiguracji połączenia
Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Service Bus.
Nuta
Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzuj dostęp za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.
Konfigurowalne właściwości połączenia spring-cloud-azure-stream-binder-servicebus:
Własność | Typ | Opis |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolowski | Określa, czy usługa Azure Service Bus jest włączona. |
spring.cloud.azure.servicebus.connection-string | Struna | Wartość parametrów połączenia przestrzeni nazw usługi Service Bus. |
spring.cloud.azure.servicebus.custom-endpoint-address | Struna | Niestandardowy adres punktu końcowego do użycia podczas nawiązywania połączenia z usługą Service Bus. |
spring.cloud.azure.servicebus.namespace | Struna | Wartość przestrzeni nazw usługi Service Bus, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Struna | Nazwa domeny wartości przestrzeni nazw usługi Azure Service Bus. |
Nuta
Typowe opcje konfiguracji zestawu SDK usługi Azure Service można również konfigurować dla powiązania usługi Azure Stream Bus spring Cloud. Obsługiwane opcje konfiguracji są wprowadzane w konfiguracji platformy Azure Spring Cloudi można je skonfigurować za pomocą ujednoliconego prefiksu spring.cloud.azure.
lub prefiksu spring.cloud.azure.servicebus.
.
Binder obsługuje również Spring Could Azure Resource Manager domyślnie. Aby dowiedzieć się, jak pobrać parametry połączenia przy użyciu podmiotów zabezpieczeń, które nie zostały przyznane z rolami powiązanymi z Data
, zobacz sekcję podstawowego użyciaSpring Could Azure Resource Manager.
Właściwości konfiguracji powiązania usługi Azure Service Bus
Następujące opcje są podzielone na cztery sekcje: Właściwości konsumenta, Zaawansowane konfiguracje konsumentów, Właściwości producenta i Zaawansowane konfiguracje producenta.
Właściwości odbiorcy
Te właściwości są widoczne za pośrednictwem ServiceBusConsumerProperties
.
Nuta
Aby uniknąć powtórzeń, ponieważ wersja 4.19.0 i 5.19.0, usługa Spring Cloud Azure Stream Binder Service Bus obsługuje ustawianie wartości dla wszystkich kanałów w formacie spring.cloud.stream.servicebus.default.consumer.<property>=<value>
.
Konfigurowalne właściwości rozwiązania spring-cloud-azure-stream-binder-servicebus:
Własność | Typ | Domyślny | Opis |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-odrzucone | boolowski | fałszywy | Jeśli komunikaty, które zakończyły się niepowodzeniem, są kierowane do biblioteki DLQ. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Liczba całkowita | 1 | Maksymalna liczba współbieżnych komunikatów, które powinien przetworzyć klient procesora usługi Service Bus. Po włączeniu sesji ma zastosowanie do każdej sesji. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Liczba całkowita | zero | Maksymalna liczba współbieżnych sesji do przetworzenia w danym momencie. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Boolowski | zero | Czy sesja jest włączona. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Liczba całkowita | 0 | Liczba wstępnego pobierania klienta procesora usługi Service Bus. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | Kolejka podrzędna | żaden | Typ kolejki podrzędnej do nawiązania połączenia. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Czas trwania | 5 m | Czas na kontynuowanie automatycznego odnawiania blokady. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Tryb odbierania klienta procesora usługi Service Bus. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Boolowski | prawdziwy | Czy komunikaty mają być automatycznie rozliczane. W przypadku ustawienia wartości false nagłówek komunikatu Checkpointer zostanie dodany w celu umożliwienia deweloperom ręcznego rozliczania komunikatów. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabajty | Długi | 1024 | Maksymalny rozmiar kolejki/tematu w megabajtach, czyli rozmiar pamięci przydzielonej do kolejki/tematu. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Czas trwania | P10675199DT2H48M5.4775807S. (10675199 dni, 2 godziny, 48 minut, 5 sekund i 477 milisekund) | Czas trwania, po upływie którego komunikat wygaśnie, począwszy od momentu wysłania komunikatu do usługi Service Bus. |
Ważny
W przypadku korzystania z usługi Azure Resource Manager (ARM) należy skonfigurować właściwość spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type
. Aby uzyskać więcej informacji, zobacz przykład servicebus-queue-binder-arm w witrynie GitHub.
Zaawansowana konfiguracja konsumenta
Powyższe połączenia i typowe konfiguracje klienta zestawu Azure SDK obsługują dostosowywanie dla każdego konsumenta binder, który można skonfigurować przy użyciu prefiksu spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
.
Właściwości producenta
Te właściwości są widoczne za pośrednictwem ServiceBusProducerProperties
.
Nuta
Aby uniknąć powtórzeń, ponieważ wersja 4.19.0 i 5.19.0, usługa Spring Cloud Azure Stream Binder Service Bus obsługuje ustawianie wartości dla wszystkich kanałów w formacie spring.cloud.stream.servicebus.default.producer.<property>=<value>
.
Konfigurowalne właściwości narzędzia spring-cloud-azure-stream-binder-servicebus:
Własność | Typ | Domyślny | Opis |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | boolowski | fałszywy | Przełącz flagę synchronizacji producenta. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | długi | 10000 | Wartość limitu czasu wysyłania producenta. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | zero | Typ jednostki usługi Service Bus producenta, wymagany dla producenta powiązania. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabajty | Długi | 1024 | Maksymalny rozmiar kolejki/tematu w megabajtach, czyli rozmiar pamięci przydzielonej do kolejki/tematu. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Czas trwania | P10675199DT2H48M5.4775807S. (10675199 dni, 2 godziny, 48 minut, 5 sekund i 477 milisekund) | Czas trwania, po upływie którego komunikat wygaśnie, począwszy od momentu wysłania komunikatu do usługi Service Bus. |
Ważny
W przypadku korzystania z producenta powiązania należy skonfigurować właściwość spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
.
Zaawansowana konfiguracja producenta
Powyższe połączenia i typowe konfiguracje klienta zestawu Azure SDK obsługują dostosowywanie dla każdego producenta binder, który można skonfigurować przy użyciu prefiksu spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
.
Podstawowe użycie
Wysyłanie i odbieranie komunikatów z/do usługi Service Bus
Wypełnij opcje konfiguracji informacjami o poświadczeniach.
W przypadku poświadczeń jako parametrów połączenia skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Nuta
Dozwolone wartości dla tenant-id
to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.
W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Definiowanie dostawcy i konsumenta.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Obsługa klucza partycji
Powiązanie obsługuje partycjonowanie usługi Service Bus, umożliwiając ustawienie klucza partycji i identyfikatora sesji w nagłówku komunikatu. W tej sekcji przedstawiono sposób ustawiania klucza partycji dla komunikatów.
Usługa Spring Cloud Stream udostępnia właściwość wyrażenia SpEL klucza partycji spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
. Na przykład ustawienie tej właściwości jako "'partitionKey-' + headers[<message-header-key>]"
i dodanie nagłówka o nazwie message-header-key. Usługa Spring Cloud Stream używa wartości dla tego nagłówka podczas oceniania wyrażenia w celu przypisania klucza partycji. Poniższy kod zawiera przykładowego producenta:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Obsługa sesji
Powiążnik obsługuje sesje komunikatów usługi Service Bus. Identyfikator sesji wiadomości można ustawić za pośrednictwem nagłówka komunikatu.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Nuta
Zgodnie z partycjonowania usługi Service Busidentyfikator sesji ma wyższy priorytet niż klucz partycji. Dlatego po ustawieniu zarówno nagłówków ServiceBusMessageHeaders#SESSION_ID
, jak i ServiceBusMessageHeaders#PARTITION_KEY
wartość identyfikatora sesji zostanie ostatecznie użyta do zastąpienia wartości klucza partycji.
Obsługa komunikatów o błędach
Obsługa komunikatów o błędach powiązania wychodzącego
Domyślnie platforma Spring Integration tworzy globalny kanał błędów o nazwie
errorChannel
. Skonfiguruj następujący punkt końcowy komunikatu, aby obsłużyć komunikat o błędzie powiązania wychodzącego.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Obsługa komunikatów o błędach powiązania przychodzącego
Powiązanie usługi Service Bus spring Cloud Stream obsługuje dwa rozwiązania do obsługi błędów dla powiązań komunikatów przychodzących: procedury obsługi błędów i procedur obsługi błędów programu binder.
program obsługi błędów bindera:
Domyślna procedura obsługi błędów bindera obsługuje powiązanie przychodzące. Ta procedura obsługi służy do wysyłania komunikatów, które zakończyły się niepowodzeniem do kolejki utraconych komunikatów po włączeniu
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
. W przeciwnym razie komunikaty, które zakończyły się niepowodzeniem, zostaną porzucone. Procedura obsługi błędów bindera wzajemnie się wyklucza z innymi dostarczonymi procedurami obsługi błędów.program obsługi błędów :
Usługa Spring Cloud Stream udostępnia mechanizm obsługi błędów niestandardowych, dodając
Consumer
, który akceptuje wystąpieniaErrorMessage
. Aby uzyskać więcej informacji, zobacz Obsługa komunikatów o błędach w dokumentacji usługi Spring Cloud Stream.Domyślna procedura obsługi błędów powiązania
Skonfiguruj pojedynczą
Consumer
fasolę, aby korzystać ze wszystkich komunikatów o błędach powiązania przychodzącego. Następująca funkcja domyślna subskrybuje każdy kanał błędów powiązania przychodzącego:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Należy również ustawić właściwość
spring.cloud.stream.default.error-handler-definition
na nazwę funkcji.Procedura obsługi błędów specyficzna dla powiązania
Skonfiguruj
Consumer
fasoli, aby korzystać z określonych komunikatów o błędach powiązania przychodzącego. Poniższa funkcja subskrybuje określony kanał błędów powiązania przychodzącego o wyższym priorytcie niż procedura obsługi błędów powiązania domyślnego.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Należy również ustawić właściwość
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
na nazwę funkcji.
Nagłówki komunikatów usługi Service Bus
Aby zapoznać się z podstawowymi nagłówkami komunikatów obsługiwanymi, zobacz sekcję nagłówków komunikatów usługi Service Busobsługa platformy Spring Cloud na potrzeby integracji spring.
Nuta
Podczas ustawiania klucza partycji priorytet nagłówka komunikatu jest wyższy niż właściwość Spring Cloud Stream. Dlatego spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
zaczynają obowiązywać tylko wtedy, gdy żaden z nagłówków ServiceBusMessageHeaders#SESSION_ID
i ServiceBusMessageHeaders#PARTITION_KEY
nie jest skonfigurowany.
Obsługa wielu powiązań
Połączenie z wieloma przestrzeniami nazw usługi Service Bus jest również obsługiwane przy użyciu wielu powiązań. W tym przykładzie parametry połączenia są przykładowe. Obsługiwane są również poświadczenia jednostek usługi i tożsamości zarządzanych. Użytkownicy mogą ustawiać powiązane właściwości w ustawieniach środowiska każdego powiązania.
Aby użyć wielu powiązań usługi ServiceBus, skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Nuta
W poprzednim pliku aplikacji pokazano, jak skonfigurować pojedynczy domyślny element poller dla aplikacji do wszystkich powiązań. Jeśli chcesz skonfigurować narzędzie poller dla określonego powiązania, możesz użyć konfiguracji, takiej jak
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.potrzebujemy zdefiniowania dwóch dostawców i dwóch konsumentów
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Aprowizowanie zasobów
Powiązanie usługi Service Bus obsługuje aprowizację kolejki, tematu i subskrypcji, użytkownicy mogą używać następujących właściwości w celu włączenia aprowizacji.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Nuta
Dozwolone wartości dla tenant-id
to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.
Dostosowywanie właściwości klienta usługi Service Bus
Deweloperzy mogą używać AzureServiceClientBuilderCustomizer
do dostosowywania właściwości klienta usługi Service Bus. Poniższy przykład dostosowuje właściwość sessionIdleTimeout
w ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Próbki
Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.