Udostępnij za pośrednictwem


Obsługa platformy Azure platformy Spring Cloud dla usługi Spring Cloud Stream

Ten artykuł dotyczy:✅ w wersji 4.19.0 ✅ w wersji 5.19.0

Spring Cloud Stream to struktura umożliwiająca tworzenie wysoce skalowalnych mikrousług opartych na zdarzeniach połączonych z udostępnionymi systemami obsługi komunikatów.

Platforma udostępnia elastyczny model programowania oparty na już ustalonych i znanych idiomach Spring i najlepszych rozwiązaniach. Te najlepsze rozwiązania obejmują obsługę trwałych semantyki pub/podsieci, grup odbiorców i partycji stanowych.

Bieżące implementacje bindera obejmują:

Powiązanie strumienia Spring Cloud dla usługi Azure Event Hubs

Kluczowe pojęcia

Powiązanie strumienia Spring Cloud dla usługi Azure Event Hubs zapewnia implementację powiązań dla platformy Spring Cloud Stream. Ta implementacja używa adapterów kanału spring Integration Event Hubs w jego podstawach. Z perspektywy projektu usługa Event Hubs jest podobna do platformy Kafka. Ponadto dostęp do usługi Event Hubs można uzyskać za pośrednictwem interfejsu API platformy Kafka. Jeśli projekt ma ścisłą zależność od interfejsu API platformy Kafka, możesz spróbować Event Hub przy użyciu przykładu interfejsu API platformy Kafka— przykład

Grupa odbiorców

Usługa Event Hubs zapewnia podobną obsługę grupy odbiorców jako platformę Apache Kafka, ale z niewielką inną logiką. Podczas gdy platforma Kafka przechowuje wszystkie zatwierdzone przesunięcia w brokerze, musisz przechowywać przesunięcia komunikatów usługi Event Hubs przetwarzanych ręcznie. Zestaw SDK usługi Event Hubs udostępnia funkcję do przechowywania takich przesunięć w usłudze Azure Storage.

Obsługa partycjonowania

Usługa Event Hubs udostępnia podobną koncepcję partycji fizycznej jako platformy Kafka. Jednak w przeciwieństwie do automatycznego równoważenia platformy Kafka między użytkownikami i partycjami usługa Event Hubs zapewnia rodzaj trybu wyprzedzania. Konto magazynu działa jako dzierżawa, aby określić, który użytkownik jest właścicielem której partycji. Po uruchomieniu nowego użytkownika próbuje ukraść niektóre partycje od najbardziej obciążonych użytkowników w celu osiągnięcia równowagi obciążenia.

Aby określić strategię równoważenia obciążenia, udostępniane są właściwości spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*. Aby uzyskać więcej informacji, zobacz sekcję Właściwości konsumenta.

Obsługa konsumentów usługi Batch

Binder spring Cloud Azure Stream Event Hubs obsługuje funkcję Spring Cloud Batch Consumer.

Aby pracować z trybem batch-consumer, ustaw właściwość spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode na wartość true. Po włączeniu komunikat z ładunkiem listy zdarzeń wsadowych jest odbierany i przekazywany do funkcji Consumer. Każdy nagłówek wiadomości jest również konwertowany na listę, z której zawartość jest skojarzona wartość nagłówka analizowana z każdego zdarzenia. Wspólne nagłówki identyfikatora partycji, modułu kontrolnego i ostatnio w kolejce właściwości są prezentowane jako pojedyncza wartość, ponieważ cała partia zdarzeń ma tę samą wartość. Aby uzyskać więcej informacji, zobacz nagłówki komunikatów usługi Event Hubs sekcji Spring Cloud Azure support for Spring Integration.

Nuta

Nagłówek punktu kontrolnego istnieje tylko wtedy, gdy jest używany tryb punktu kontrolnego MANUAL.

Punkt kontrolny odbiorcy wsadowego obsługuje dwa tryby: BATCH i MANUAL. BATCH tryb jest trybem automatycznego tworzenia punktów kontrolnych, aby wskazać całą partię zdarzeń razem po odebraniu przez binder. MANUAL tryb polega na określeniu punktów kontrolnych zdarzeń przez użytkowników. W przypadku użycia Checkpointer jest przekazywany do nagłówka wiadomości, a użytkownicy mogą używać go do tworzenia punktów kontrolnych.

Rozmiar partii można określić, ustawiając właściwości max-size i max-wait-time, które mają prefiks spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. Właściwość max-size jest wymagana, a właściwość max-wait-time jest opcjonalna. Aby uzyskać więcej informacji, zobacz sekcję Właściwości konsumenta.

Konfiguracja zależności

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Alternatywnie możesz również użyć szablonu startowego Spring Cloud Azure Stream Event Hubs, jak pokazano w poniższym przykładzie dla narzędzia Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Konfiguracja

Powiążnik udostępnia następujące trzy części opcji konfiguracji:

Właściwości konfiguracji połączenia

Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Event Hubs.

Nuta

Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzuj dostęp za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.

Konfigurowalne właściwości połączenia spring-cloud-azure-stream-binder-eventhubs:

Własność Typ Opis
spring.cloud.azure.eventhubs.enabled boolowski Określa, czy usługa Azure Event Hubs jest włączona.
spring.cloud.azure.eventhubs.connection-string Struna Wartość parametrów połączenia przestrzeni nazw usługi Event Hubs.
spring.cloud.azure.eventhubs.namespace Struna Wartość przestrzeni nazw usługi Event Hubs, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Struna Nazwa domeny wartości przestrzeni nazw usługi Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Struna Niestandardowy adres punktu końcowego.

Napiwek

Typowe opcje konfiguracji zestawu SDK usługi Azure Service można również konfigurować dla powiązania usługi Azure Stream Azure Stream w usłudze Spring Cloud. Obsługiwane opcje konfiguracji są wprowadzane w konfiguracji platformy Azure Spring Cloudi można je skonfigurować za pomocą ujednoliconego prefiksu spring.cloud.azure. lub prefiksu spring.cloud.azure.eventhubs..

Binder obsługuje również Spring Could Azure Resource Manager domyślnie. Aby dowiedzieć się, jak pobrać parametry połączenia przy użyciu podmiotów zabezpieczeń, które nie zostały przyznane z rolami powiązanymi z Data, zobacz sekcję podstawowego użyciaSpring Could Azure Resource Manager.

Właściwości konfiguracji punktu kontrolnego

Ta sekcja zawiera opcje konfiguracji usługi Storage Blobs Service, która jest używana do utrwalania własności partycji i informacji o punkcie kontrolnym.

Nuta

W wersji 4.0.0, jeśli właściwość spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nie jest włączona ręcznie, żaden kontener magazynu nie zostanie utworzony automatycznie z nazwą z spring.cloud.stream.bindings.binding-name.destination.

Tworzenie punktów kontrolnych konfigurowalnych właściwości spring-cloud-azure-stream-binder-eventhubs:

Własność Typ Opis
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolowski Czy zezwolić na tworzenie kontenerów, jeśli nie istnieje.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Struna Nazwa konta magazynu.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Struna Klucz dostępu do konta magazynu.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Struna Nazwa kontenera magazynu.

Napiwek

Typowe opcje konfiguracji zestawu SDK usługi Azure Service można również konfigurować dla magazynu punktów kontrolnych obiektów blob usługi Storage. Obsługiwane opcje konfiguracji są wprowadzane w konfiguracji platformy Azure Spring Cloudi można je skonfigurować za pomocą ujednoliconego prefiksu spring.cloud.azure. lub prefiksu spring.cloud.azure.eventhubs.processor.checkpoint-store.

Właściwości konfiguracji powiązania usługi Azure Event Hubs

Następujące opcje są podzielone na cztery sekcje: Właściwości konsumenta, Zaawansowane konfiguracje konsumentów, Właściwości producenta i Zaawansowane konfiguracje producenta.

Właściwości odbiorcy

Te właściwości są widoczne za pośrednictwem EventHubsConsumerProperties.

Nuta

Aby uniknąć powtórzeń, ponieważ wersja 4.19.0 i 5.19.0, usługa Spring Cloud Azure Stream Binder Event Hubs obsługuje ustawianie wartości dla wszystkich kanałów w formacie spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Konfigurowalne właściwości rozwiązania spring-cloud-azure-stream-binder-eventhubs:

Własność Typ Opis
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Tryb punktu kontrolnego używany, gdy użytkownik decyduje o sposobie wyświetlania komunikatu punktu kontrolnego
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Liczba całkowita Decyduje o ilości komunikatu dla każdej partycji do wykonania jednego punktu kontrolnego. Zacznie obowiązywać tylko wtedy, gdy jest używany PARTITION_COUNT tryb punktu kontrolnego.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Czas trwania Decyduje o interwale czasu do wykonania jednego punktu kontrolnego. Zacznie obowiązywać tylko wtedy, gdy jest używany TIME tryb punktu kontrolnego.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Liczba całkowita Maksymalna liczba zdarzeń w partii. Wymagany dla trybu batch-consumer.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Czas trwania Maksymalny czas trwania partii. Zacznie obowiązywać tylko wtedy, gdy tryb batch-consumer jest włączony i jest opcjonalny.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Czas trwania Czas trwania interwału aktualizacji.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy Strategia równoważenia obciążenia.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Czas trwania Czas trwania, po którym wygasa własność partycji.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Boolowski Czy procesor zdarzeń powinien zażądać informacji o ostatnim zdarzeniu w kolejce na skojarzonej partycji i śledzić te informacje w miarę odbierania zdarzeń.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Liczba całkowita Liczba używana przez konsumenta do kontrolowania liczby zdarzeń, które odbiorca centrum zdarzeń będzie aktywnie odbierać i kolejkować lokalnie.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Mapowanie przy użyciu klucza jako identyfikatora partycji i wartości StartPositionProperties Mapa zawierająca położenie zdarzenia do użycia dla każdej partycji, jeśli punkt kontrolny partycji nie istnieje w magazynie punktów kontrolnych. Ta mapa jest kluczem z identyfikatora partycji.

Nuta

Konfiguracja initial-partition-event-position akceptuje map w celu określenia początkowej pozycji dla każdego centrum zdarzeń. W związku z tym jego kluczem jest identyfikator partycji, a wartość jest StartPositionProperties, która zawiera właściwości przesunięcia, numeru sekwencji, w kolejce daty i tego, czy włącznie. Można na przykład ustawić ją jako

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Zaawansowana konfiguracja konsumenta

Powyższe połączenia, punktów kontrolnychi typowe konfiguracje klienta zestawu Azure SDK obsługują dostosowywanie dla każdego odbiorcy powiązania, który można skonfigurować przy użyciu prefiksu spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Właściwości producenta

Te właściwości są widoczne za pośrednictwem EventHubsProducerProperties.

Nuta

Aby uniknąć powtórzeń, ponieważ wersja 4.19.0 i 5.19.0, usługa Spring Cloud Azure Stream Binder Event Hubs obsługuje ustawianie wartości dla wszystkich kanałów w formacie spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Konfigurowalne właściwości spring-cloud-azure-stream-binder-eventhubs:

Własność Typ Opis
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync boolowski Flaga przełącznika do synchronizacji producenta. Jeśli to prawda, producent będzie czekać na odpowiedź po operacji wysyłania.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout długi Czas oczekiwania na odpowiedź po operacji wysyłania. Zacznie obowiązywać tylko wtedy, gdy producent synchronizacji jest włączony.
Zaawansowana konfiguracja producenta

Powyższe połączenia i typowe konfiguracje klienta zestawu Azure SDK obsługują dostosowywanie dla każdego producenta binder, który można skonfigurować przy użyciu prefiksu spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Podstawowe użycie

Wysyłanie i odbieranie komunikatów z/do usługi Event Hubs

  1. Wypełnij opcje konfiguracji informacjami o poświadczeniach.

    • W przypadku poświadczeń jako parametrów połączenia skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Nuta

Dozwolone wartości dla tenant-id to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.

  • W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Definiowanie dostawcy i konsumenta.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Obsługa partycjonowania

Zostanie utworzona PartitionSupplier z informacjami o partycji udostępnianymi przez użytkownika w celu skonfigurowania informacji o partycji dotyczących wiadomości do wysłania. Poniższy schemat blokowy przedstawia proces uzyskiwania różnych priorytetów dla identyfikatora partycji i klucza:

Diagram przedstawiający schemat blokowy procesu obsługi partycjonowania.

Obsługa konsumentów usługi Batch

  1. Podaj opcje konfiguracji wsadowej, jak pokazano w poniższym przykładzie:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Definiowanie dostawcy i konsumenta.

    W przypadku trybu tworzenia punktów kontrolnych jako BATCHmożna użyć następującego kodu do wysyłania komunikatów i używania ich w partiach.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    W przypadku trybu tworzenia punktów kontrolnych jako MANUALmożna użyć następującego kodu do wysyłania komunikatów i używania/punktu kontrolnego w partiach.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Nuta

W trybie przetwarzania wsadowego domyślny typ zawartości powiązania strumienia Spring Cloud jest application/json, dlatego upewnij się, że ładunek komunikatu jest zgodny z typem zawartości. Na przykład w przypadku używania domyślnego typu zawartości application/json do odbierania komunikatów z ładunkiem String ładunek powinien być JSON String, otoczony podwójnymi cudzysłowymi dla oryginalnego tekstu String. Podczas text/plain typu zawartości może to być obiekt String bezpośrednio. Aby uzyskać więcej informacji, zobacz Negocjowanie typu zawartości strumienia Spring Cloud.

Obsługa komunikatów o błędach

  • Obsługa komunikatów o błędach powiązania wychodzącego

    Domyślnie platforma Spring Integration tworzy globalny kanał błędów o nazwie errorChannel. Skonfiguruj następujący punkt końcowy komunikatu, aby obsługiwać komunikaty o błędach powiązania wychodzącego.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Obsługa komunikatów o błędach powiązania przychodzącego

    Powiązanie usługi Event Hubs spring Cloud Stream obsługuje jedno rozwiązanie do obsługi błędów dla powiązań komunikatów przychodzących: procedur obsługi błędów.

    program obsługi błędów :

    Usługa Spring Cloud Stream udostępnia mechanizm obsługi błędów niestandardowych, dodając Consumer, który akceptuje wystąpienia ErrorMessage. Aby uzyskać więcej informacji, zobacz Obsługa komunikatów o błędach w dokumentacji usługi Spring Cloud Stream.

    • Domyślna procedura obsługi błędów powiązania

      Skonfiguruj pojedynczą Consumer fasolę, aby korzystać ze wszystkich komunikatów o błędach powiązania przychodzącego. Następująca funkcja domyślna subskrybuje każdy kanał błędów powiązania przychodzącego:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Należy również ustawić właściwość spring.cloud.stream.default.error-handler-definition na nazwę funkcji.

    • Procedura obsługi błędów specyficzna dla powiązania

      Skonfiguruj Consumer fasoli, aby korzystać z określonych komunikatów o błędach powiązania przychodzącego. Następująca funkcja subskrybuje określony kanał błędów powiązania przychodzącego i ma wyższy priorytet niż domyślna procedura obsługi błędów powiązania:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Należy również ustawić właściwość spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition na nazwę funkcji.

Nagłówki komunikatów usługi Event Hubs

Aby zapoznać się z podstawowymi nagłówkami komunikatów obsługiwanymi, zobacz sekcję nagłówków komunikatów usługi Event Hubsobsługa platformy Azure spring Cloud dla usługi Spring Integration.

Obsługa wielu powiązań

Połączenie z wieloma przestrzeniami nazw usługi Event Hubs jest również obsługiwane przy użyciu wielu powiązań. W tym przykładzie parametry połączenia są przykładowe. Obsługiwane są również poświadczenia jednostek usługi i tożsamości zarządzanych. Powiązane właściwości można ustawić w ustawieniach środowiska każdego powiązania.

  1. Aby użyć wielu powiązań z usługą Event Hubs, skonfiguruj następujące właściwości w pliku application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nuta

    W poprzednim pliku aplikacji pokazano, jak skonfigurować pojedynczy domyślny element poller dla aplikacji do wszystkich powiązań. Jeśli chcesz skonfigurować narzędzie poller dla określonego powiązania, możesz użyć konfiguracji, takiej jak spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Musimy zdefiniować dwóch dostawców i dwóch konsumentów:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Aprowizowanie zasobów

Binder usługi Event Hubs obsługuje aprowizację centrum zdarzeń i grupy odbiorców. Użytkownicy mogą użyć następujących właściwości w celu włączenia aprowizacji.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Nuta

Dozwolone wartości dla tenant-id to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.

Próbki

Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.

Powiązanie strumienia Spring Cloud dla usługi Azure Service Bus

Kluczowe pojęcia

Powiązanie strumienia Spring Cloud dla usługi Azure Service Bus zapewnia implementację powiązania dla struktury Spring Cloud Stream Framework. Ta implementacja używa adapterów kanału Spring Integration Service Bus na jego podstawie.

Zaplanowana wiadomość

Ten binder obsługuje przesyłanie komunikatów do tematu w celu opóźnionego przetwarzania. Użytkownicy mogą wysyłać zaplanowane komunikaty z nagłówkiem x-delay wyrażając w milisekundach czas opóźnienia komunikatu. Wiadomość zostanie dostarczona do odpowiednich tematów po x-delay milisekundach.

Grupa odbiorców

Temat usługi Service Bus zapewnia podobną obsługę grupy odbiorców jako platformy Apache Kafka, ale z niewielką różnicą logiki. Ten binder opiera się na Subscription tematu, który działa jako grupa odbiorców.

Konfiguracja zależności

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Alternatywnie możesz również użyć szablonu startowego spring Cloud Azure Stream Service Bus, jak pokazano w poniższym przykładzie dla narzędzia Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Konfiguracja

Binder udostępnia następujące dwie części opcji konfiguracji:

Właściwości konfiguracji połączenia

Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Service Bus.

Nuta

Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzuj dostęp za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.

Konfigurowalne właściwości połączenia spring-cloud-azure-stream-binder-servicebus:

Własność Typ Opis
spring.cloud.azure.servicebus.enabled boolowski Określa, czy usługa Azure Service Bus jest włączona.
spring.cloud.azure.servicebus.connection-string Struna Wartość parametrów połączenia przestrzeni nazw usługi Service Bus.
spring.cloud.azure.servicebus.custom-endpoint-address Struna Niestandardowy adres punktu końcowego do użycia podczas nawiązywania połączenia z usługą Service Bus.
spring.cloud.azure.servicebus.namespace Struna Wartość przestrzeni nazw usługi Service Bus, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Struna Nazwa domeny wartości przestrzeni nazw usługi Azure Service Bus.

Nuta

Typowe opcje konfiguracji zestawu SDK usługi Azure Service można również konfigurować dla powiązania usługi Azure Stream Bus spring Cloud. Obsługiwane opcje konfiguracji są wprowadzane w konfiguracji platformy Azure Spring Cloudi można je skonfigurować za pomocą ujednoliconego prefiksu spring.cloud.azure. lub prefiksu spring.cloud.azure.servicebus..

Binder obsługuje również Spring Could Azure Resource Manager domyślnie. Aby dowiedzieć się, jak pobrać parametry połączenia przy użyciu podmiotów zabezpieczeń, które nie zostały przyznane z rolami powiązanymi z Data, zobacz sekcję podstawowego użyciaSpring Could Azure Resource Manager.

Właściwości konfiguracji powiązania usługi Azure Service Bus

Następujące opcje są podzielone na cztery sekcje: Właściwości konsumenta, Zaawansowane konfiguracje konsumentów, Właściwości producenta i Zaawansowane konfiguracje producenta.

Właściwości odbiorcy

Te właściwości są widoczne za pośrednictwem ServiceBusConsumerProperties.

Nuta

Aby uniknąć powtórzeń, ponieważ wersja 4.19.0 i 5.19.0, usługa Spring Cloud Azure Stream Binder Service Bus obsługuje ustawianie wartości dla wszystkich kanałów w formacie spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Konfigurowalne właściwości rozwiązania spring-cloud-azure-stream-binder-servicebus:

Własność Typ Domyślny Opis
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-odrzucone boolowski fałszywy Jeśli komunikaty, które zakończyły się niepowodzeniem, są kierowane do biblioteki DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Liczba całkowita 1 Maksymalna liczba współbieżnych komunikatów, które powinien przetworzyć klient procesora usługi Service Bus. Po włączeniu sesji ma zastosowanie do każdej sesji.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Liczba całkowita zero Maksymalna liczba współbieżnych sesji do przetworzenia w danym momencie.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Boolowski zero Czy sesja jest włączona.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Liczba całkowita 0 Liczba wstępnego pobierania klienta procesora usługi Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue Kolejka podrzędna żaden Typ kolejki podrzędnej do nawiązania połączenia.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Czas trwania 5 m Czas na kontynuowanie automatycznego odnawiania blokady.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Tryb odbierania klienta procesora usługi Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Boolowski prawdziwy Czy komunikaty mają być automatycznie rozliczane. W przypadku ustawienia wartości false nagłówek komunikatu Checkpointer zostanie dodany w celu umożliwienia deweloperom ręcznego rozliczania komunikatów.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabajty Długi 1024 Maksymalny rozmiar kolejki/tematu w megabajtach, czyli rozmiar pamięci przydzielonej do kolejki/tematu.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Czas trwania P10675199DT2H48M5.4775807S. (10675199 dni, 2 godziny, 48 minut, 5 sekund i 477 milisekund) Czas trwania, po upływie którego komunikat wygaśnie, począwszy od momentu wysłania komunikatu do usługi Service Bus.

Ważny

W przypadku korzystania z usługi Azure Resource Manager (ARM) należy skonfigurować właściwość spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type. Aby uzyskać więcej informacji, zobacz przykład servicebus-queue-binder-arm w witrynie GitHub.

Zaawansowana konfiguracja konsumenta

Powyższe połączenia i typowe konfiguracje klienta zestawu Azure SDK obsługują dostosowywanie dla każdego konsumenta binder, który można skonfigurować przy użyciu prefiksu spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Właściwości producenta

Te właściwości są widoczne za pośrednictwem ServiceBusProducerProperties.

Nuta

Aby uniknąć powtórzeń, ponieważ wersja 4.19.0 i 5.19.0, usługa Spring Cloud Azure Stream Binder Service Bus obsługuje ustawianie wartości dla wszystkich kanałów w formacie spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Konfigurowalne właściwości narzędzia spring-cloud-azure-stream-binder-servicebus:

Własność Typ Domyślny Opis
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync boolowski fałszywy Przełącz flagę synchronizacji producenta.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout długi 10000 Wartość limitu czasu wysyłania producenta.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType zero Typ jednostki usługi Service Bus producenta, wymagany dla producenta powiązania.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabajty Długi 1024 Maksymalny rozmiar kolejki/tematu w megabajtach, czyli rozmiar pamięci przydzielonej do kolejki/tematu.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Czas trwania P10675199DT2H48M5.4775807S. (10675199 dni, 2 godziny, 48 minut, 5 sekund i 477 milisekund) Czas trwania, po upływie którego komunikat wygaśnie, począwszy od momentu wysłania komunikatu do usługi Service Bus.

Ważny

W przypadku korzystania z producenta powiązania należy skonfigurować właściwość spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type.

Zaawansowana konfiguracja producenta

Powyższe połączenia i typowe konfiguracje klienta zestawu Azure SDK obsługują dostosowywanie dla każdego producenta binder, który można skonfigurować przy użyciu prefiksu spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Podstawowe użycie

Wysyłanie i odbieranie komunikatów z/do usługi Service Bus

  1. Wypełnij opcje konfiguracji informacjami o poświadczeniach.

    • W przypadku poświadczeń jako parametrów połączenia skonfiguruj następujące właściwości w pliku application.yml:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Nuta

Dozwolone wartości dla tenant-id to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.

  • W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Definiowanie dostawcy i konsumenta.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Obsługa klucza partycji

Powiązanie obsługuje partycjonowanie usługi Service Bus, umożliwiając ustawienie klucza partycji i identyfikatora sesji w nagłówku komunikatu. W tej sekcji przedstawiono sposób ustawiania klucza partycji dla komunikatów.

Usługa Spring Cloud Stream udostępnia właściwość wyrażenia SpEL klucza partycji spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Na przykład ustawienie tej właściwości jako "'partitionKey-' + headers[<message-header-key>]" i dodanie nagłówka o nazwie message-header-key. Usługa Spring Cloud Stream używa wartości dla tego nagłówka podczas oceniania wyrażenia w celu przypisania klucza partycji. Poniższy kod zawiera przykładowego producenta:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Obsługa sesji

Powiążnik obsługuje sesje komunikatów usługi Service Bus. Identyfikator sesji wiadomości można ustawić za pośrednictwem nagłówka komunikatu.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Nuta

Zgodnie z partycjonowania usługi Service Busidentyfikator sesji ma wyższy priorytet niż klucz partycji. Dlatego po ustawieniu zarówno nagłówków ServiceBusMessageHeaders#SESSION_ID, jak i ServiceBusMessageHeaders#PARTITION_KEY wartość identyfikatora sesji zostanie ostatecznie użyta do zastąpienia wartości klucza partycji.

Obsługa komunikatów o błędach

  • Obsługa komunikatów o błędach powiązania wychodzącego

    Domyślnie platforma Spring Integration tworzy globalny kanał błędów o nazwie errorChannel. Skonfiguruj następujący punkt końcowy komunikatu, aby obsłużyć komunikat o błędzie powiązania wychodzącego.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Obsługa komunikatów o błędach powiązania przychodzącego

    Powiązanie usługi Service Bus spring Cloud Stream obsługuje dwa rozwiązania do obsługi błędów dla powiązań komunikatów przychodzących: procedury obsługi błędów i procedur obsługi błędów programu binder.

    program obsługi błędów bindera:

    Domyślna procedura obsługi błędów bindera obsługuje powiązanie przychodzące. Ta procedura obsługi służy do wysyłania komunikatów, które zakończyły się niepowodzeniem do kolejki utraconych komunikatów po włączeniu spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected. W przeciwnym razie komunikaty, które zakończyły się niepowodzeniem, zostaną porzucone. Procedura obsługi błędów bindera wzajemnie się wyklucza z innymi dostarczonymi procedurami obsługi błędów.

    program obsługi błędów :

    Usługa Spring Cloud Stream udostępnia mechanizm obsługi błędów niestandardowych, dodając Consumer, który akceptuje wystąpienia ErrorMessage. Aby uzyskać więcej informacji, zobacz Obsługa komunikatów o błędach w dokumentacji usługi Spring Cloud Stream.

    • Domyślna procedura obsługi błędów powiązania

      Skonfiguruj pojedynczą Consumer fasolę, aby korzystać ze wszystkich komunikatów o błędach powiązania przychodzącego. Następująca funkcja domyślna subskrybuje każdy kanał błędów powiązania przychodzącego:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Należy również ustawić właściwość spring.cloud.stream.default.error-handler-definition na nazwę funkcji.

    • Procedura obsługi błędów specyficzna dla powiązania

      Skonfiguruj Consumer fasoli, aby korzystać z określonych komunikatów o błędach powiązania przychodzącego. Poniższa funkcja subskrybuje określony kanał błędów powiązania przychodzącego o wyższym priorytcie niż procedura obsługi błędów powiązania domyślnego.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Należy również ustawić właściwość spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition na nazwę funkcji.

Nagłówki komunikatów usługi Service Bus

Aby zapoznać się z podstawowymi nagłówkami komunikatów obsługiwanymi, zobacz sekcję nagłówków komunikatów usługi Service Busobsługa platformy Spring Cloud na potrzeby integracji spring.

Nuta

Podczas ustawiania klucza partycji priorytet nagłówka komunikatu jest wyższy niż właściwość Spring Cloud Stream. Dlatego spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression zaczynają obowiązywać tylko wtedy, gdy żaden z nagłówków ServiceBusMessageHeaders#SESSION_ID i ServiceBusMessageHeaders#PARTITION_KEY nie jest skonfigurowany.

Obsługa wielu powiązań

Połączenie z wieloma przestrzeniami nazw usługi Service Bus jest również obsługiwane przy użyciu wielu powiązań. W tym przykładzie parametry połączenia są przykładowe. Obsługiwane są również poświadczenia jednostek usługi i tożsamości zarządzanych. Użytkownicy mogą ustawiać powiązane właściwości w ustawieniach środowiska każdego powiązania.

  1. Aby użyć wielu powiązań usługi ServiceBus, skonfiguruj następujące właściwości w pliku application.yml:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Nuta

    W poprzednim pliku aplikacji pokazano, jak skonfigurować pojedynczy domyślny element poller dla aplikacji do wszystkich powiązań. Jeśli chcesz skonfigurować narzędzie poller dla określonego powiązania, możesz użyć konfiguracji, takiej jak spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. potrzebujemy zdefiniowania dwóch dostawców i dwóch konsumentów

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Aprowizowanie zasobów

Powiązanie usługi Service Bus obsługuje aprowizację kolejki, tematu i subskrypcji, użytkownicy mogą używać następujących właściwości w celu włączenia aprowizacji.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Nuta

Dozwolone wartości dla tenant-id to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.

Dostosowywanie właściwości klienta usługi Service Bus

Deweloperzy mogą używać AzureServiceClientBuilderCustomizer do dostosowywania właściwości klienta usługi Service Bus. Poniższy przykład dostosowuje właściwość sessionIdleTimeout w ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Próbki

Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.