Udostępnij za pośrednictwem


Spring Cloud pomoc techniczna platformy Azure for Spring Integration

Ten artykuł dotyczy: ✔️ wersja 4.14.0 ✔️ w wersji 5.8.0

Rozszerzenie Spring Integration Extension dla platformy Azure udostępnia karty integracji spring dla różnych usług udostępnianych przez zestaw Azure SDK dla języka Java. Zapewniamy obsługę integracji spring dla tych usług platformy Azure: Event Hubs, Service Bus, Storage Queue. Poniżej znajduje się lista obsługiwanych kart:

Integracja platformy Spring z usługą Azure Event Hubs

Najważniejsze pojęcia

Usługa Azure Event Hubs to platforma przesyłania strumieniowego danych big data i usługa pozyskiwania zdarzeń. Może odbierać i przetwarzać miliony zdarzeń na sekundę. Dane wysłane do centrum zdarzeń mogą zostać przekształcone i zmagazynowane przy użyciu dowolnego dostawcy analityki czasu rzeczywistego lub adapterów przetwarzania wsadowego/magazynowania.

Integracja spring umożliwia uproszczone przesyłanie komunikatów w aplikacjach opartych na platformie Spring i obsługuje integrację z systemami zewnętrznymi za pośrednictwem kart deklaratywnych. Te karty zapewniają wyższy poziom abstrakcji dzięki obsłudze komunikacji wirtualnej, obsługi komunikatów i planowania platformy Spring. Projekt rozszerzenia Spring Integration for Event Hubs udostępnia karty i bramy kanału przychodzącego i wychodzącego dla usługi Azure Event Hubs.

Uwaga

Interfejsy API obsługi RxJava są porzucane z wersji 4.0.0. Aby uzyskać szczegółowe informacje, zobacz Javadoc.

Grupa konsumentów

Usługa Event Hubs zapewnia podobną obsługę grupy odbiorców jako platformę Apache Kafka, ale z niewielką inną logiką. Podczas gdy platforma Kafka przechowuje wszystkie zatwierdzone przesunięcia w brokerze, musisz przechowywać przesunięcia komunikatów usługi Event Hubs przetwarzanych ręcznie. Zestaw SDK usługi Event Hubs udostępnia funkcję do przechowywania takich przesunięć w usłudze Azure Storage.

Obsługa partycjonowania

Usługa Event Hubs udostępnia podobną koncepcję partycji fizycznej jako platformy Kafka. Jednak w przeciwieństwie do automatycznego równoważenia obciążenia platformy Kafka między użytkownikami i partycjami usługa Event Hubs zapewnia rodzaj trybu wyprzedzania. Konto magazynu działa jako dzierżawa, aby określić, która partycja jest własnością użytkownika. Po uruchomieniu nowego użytkownika spróbuje ukraść niektóre partycje od najbardziej obciążonych użytkowników w celu osiągnięcia równoważenia obciążenia.

Aby określić strategię równoważenia obciążenia, deweloperzy mogą używać EventHubsContainerProperties jej do konfiguracji. Zapoznaj się z następującą sekcją , aby zapoznać się z przykładem konfigurowania EventHubsContainerPropertiesprogramu .

Obsługa konsumentów usługi Batch

Element EventHubsInboundChannelAdapter obsługuje tryb przetwarzania wsadowego. Aby ją włączyć, użytkownicy mogą określić tryb odbiornika tak, jak ListenerMode.BATCH podczas konstruowania EventHubsInboundChannelAdapter wystąpienia. Po włączeniu komunikat, którego ładunek jest listą zdarzeń wsadowych, zostanie odebrany i przekazany do kanału podrzędnego. Każdy nagłówek wiadomości jest również konwertowany jako lista, z której zawartość jest skojarzona wartość nagłówka analizowana z każdego zdarzenia. W przypadku nagłówków wspólnych identyfikatora partycji, modułu kontrolnego i ostatnio w kolejce właściwości są one prezentowane jako pojedyncza wartość dla całej partii zdarzeń współudzielonych w tej samej partii zdarzeń. Aby uzyskać więcej informacji, zobacz sekcję Nagłówki komunikatów usługi Event Hubs.

Uwaga

Nagłówek punktu kontrolnego istnieje tylko wtedy, gdy jest używany tryb RĘCZNEgo punktu kontrolnego.

Punkt kontrolny odbiorcy wsadowego obsługuje dwa tryby: BATCH i MANUAL. BATCH tryb to tryb automatycznego tworzenia punktów kontrolnych umożliwiający sprawdzenie całej partii zdarzeń po ich odebraniu. MANUAL tryb polega na określeniu punktów kontrolnych zdarzeń przez użytkowników. Gdy jest używany, program Checkpointer zostanie przekazany do nagłówka wiadomości, a użytkownicy mogą używać go do tworzenia punktów kontrolnych.

Zasady zużywania wsadowego max-size można określić za pomocą właściwości i max-wait-time, gdzie max-size jest to wymagana właściwość, chociaż max-wait-time jest opcjonalna. Aby określić strategię zużywania wsadowego, deweloperzy mogą używać EventHubsContainerProperties jej do konfiguracji. Zapoznaj się z następującą sekcją , aby zapoznać się z przykładem konfigurowania EventHubsContainerPropertiesprogramu .

Konfiguracja zależności

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfigurowanie

Ten starter udostępnia następujące 3 części opcji konfiguracji:

Właściwości konfiguracji Połączenie ion

Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Event Hubs.

Uwaga

Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzowanie dostępu za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.

Połączenie ion konfigurowalne właściwości spring-cloud-azure-starter-integration-eventhubs:

Właściwość Type Opis
spring.cloud.azure.eventhubs.enabled boolean Określa, czy usługa Azure Event Hubs jest włączona.
spring.cloud.azure.eventhubs.connection-string String Przestrzeń nazw usługi Event Hubs parametry połączenia wartość.
spring.cloud.azure.eventhubs.namespace String Wartość przestrzeni nazw usługi Event Hubs, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name String Nazwa domeny wartości przestrzeni nazw usługi Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address String Niestandardowy adres punktu końcowego.
spring.cloud.azure.eventhubs.shared-connection Wartość logiczna Czy bazowe klasy EventProcessorClient i EventHubProducerAsyncClient używają tego samego połączenia. Domyślnie tworzone jest nowe połączenie i używane dla każdego utworzonego klienta centrum zdarzeń.

Właściwości konfiguracji punktu kontrolnego

Ta sekcja zawiera opcje konfiguracji usługi Storage Blobs Service, która jest używana do utrwalania własności partycji i informacji o punkcie kontrolnym.

Uwaga

W wersji 4.0.0 właściwość spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nie jest włączona ręcznie, żaden kontener magazynu nie zostanie utworzony automatycznie.

Tworzenie punktów kontrolnych konfigurowalnych właściwości spring-cloud-azure-starter-integration-eventhubs:

Właściwość Type Opis
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Wartość logiczna Czy zezwolić na tworzenie kontenerów, jeśli nie istnieje.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name String Nazwa konta magazynu.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String Klucz dostępu do konta magazynu.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String Nazwa kontenera magazynu.

Typowe opcje konfiguracji zestawu SDK usługi Azure Service można również konfigurować dla magazynu punktów kontrolnych obiektów blob usługi Storage. Obsługiwane opcje konfiguracji są wprowadzane w konfiguracji platformy Azure Spring Cloud i można je skonfigurować za pomocą ujednoliconego prefiksu lub prefiksu spring.cloud.azure.spring.cloud.azure.eventhubs.processor.checkpoint-store.

Właściwości konfiguracji procesora centrum zdarzeń

Funkcja EventHubsInboundChannelAdapter używa elementu EventProcessorClient do korzystania z komunikatów z centrum zdarzeń w celu skonfigurowania ogólnych właściwości EventProcessorClientprogramu , których deweloperzy mogą używać EventHubsContainerProperties do konfiguracji. Zapoznaj się z następującą sekcją dotyczącą pracy z EventHubsInboundChannelAdapterprogramem .

Podstawowy sposób użycia

Wysyłanie komunikatów do usługi Azure Event Hubs

  1. Wypełnij opcje konfiguracji poświadczeń.

    • W przypadku poświadczeń jako parametry połączenia skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml :

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Uwaga

Dozwolone tenant-id wartości to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użycie nieprawidłowego punktu końcowego (konta osobiste i konta organizacji) w sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje w dzierżawie. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Konwertowanie aplikacji z jedną dzierżawą na wielodostępny w usłudze Microsoft Entra ID.

  1. Utwórz DefaultMessageHandler za pomocą fasoli, EventHubsTemplate aby wysyłać komunikaty do usługi Event Hubs.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Wysyłanie komunikatów przy użyciu bramy.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Odbieranie komunikatów z usługi Azure Event Hubs

  1. Wypełnij opcje konfiguracji poświadczeń.

  2. Utwórz fasolę kanału komunikatów jako kanał wejściowy.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Utwórz EventHubsInboundChannelAdapter za pomocą fasoli EventHubsMessageListenerContainer , aby odbierać komunikaty z usługi Event Hubs.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Utwórz powiązanie odbiorcy komunikatów za pomocą elementu EventHubsInboundChannelAdapter za pośrednictwem utworzonego wcześniej kanału komunikatów.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurowanie obiektu EventHubsMessageConverter w celu dostosowania obiektuMapper

EventHubsMessageConverter jest konfigurowalny fasola, aby umożliwić użytkownikom dostosowywanie obiektu ObjectMapper.

Obsługa konsumentów usługi Batch

Aby korzystać z komunikatów z usługi Event Hubs w partiach, jest podobne do powyższego przykładu, oprócz tego użytkownicy powinni ustawić opcje konfiguracji związane z wsadem dla EventHubsInboundChannelAdapterelementu .

Podczas tworzenia EventHubsInboundChannelAdapternależy ustawić tryb odbiornika jako BATCH. Podczas tworzenia fasoli programu ustaw tryb punktu kontrolnego EventHubsMessageListenerContainerjako MANUAL lub BATCH, a opcje wsadowe można skonfigurować zgodnie z potrzebami.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Nagłówki komunikatów usługi Event Hubs

W poniższej tabeli przedstawiono sposób mapowania właściwości komunikatów usługi Event Hubs na nagłówki komunikatów spring. W przypadku usługi Azure Event Hubs komunikat jest wywoływany jako event.

Mapowanie między komunikatami usługi Event Hubs/właściwościami zdarzeń i nagłówkami komunikatów spring w trybie odbiornika rekordu:

Właściwości zdarzeń usługi Event Hubs Stałe nagłówka komunikatów spring Type Opis
Czas w kolejce EventHubsHeaders#ENQUEUED_TIME Błyskawiczne Moment, w formacie UTC, kiedy zdarzenie zostało w kolejce w partycji centrum zdarzeń.
Przesunięcie EventHubsHeaders#OFFSET Długi Przesunięcie zdarzenia, gdy zostało odebrane z skojarzonej partycji centrum zdarzeń.
Klucz partycji AzureHeaders#PARTITION_KEY String Klucz wyznaczania wartości skrótu partycji, jeśli został ustawiony podczas pierwotnego publikowania zdarzenia.
Identyfikator partycji AzureHeaders#RAW_PARTITION_ID String Identyfikator partycji centrum zdarzeń.
Numer sekwencyjny EventHubsHeaders#SEQUENCE_NUMBER Długi Numer sekwencji przypisany do zdarzenia, gdy został on w kolejce w skojarzonej partycji centrum zdarzeń.
Ostatnie właściwości zdarzeń w kolejce EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Właściwości ostatniego zdarzenia w kolejce w tej partycji.
NA AzureHeaders#CHECKPOINTER Checkpointer Nagłówek punktu kontrolnego dla określonego komunikatu.

Użytkownicy mogą analizować nagłówki komunikatów dla powiązanych informacji o każdym zdarzeniu. Aby ustawić nagłówek komunikatu dla zdarzenia, wszystkie dostosowane nagłówki zostaną umieszczone jako właściwość aplikacji zdarzenia, gdzie nagłówek jest ustawiony jako klucz właściwości. Po odebraniu zdarzeń z usługi Event Hubs wszystkie właściwości aplikacji zostaną przekonwertowane na nagłówek komunikatu.

Uwaga

Nagłówki komunikatów klucza partycji, czas w kolejce, przesunięcie i numer sekwencji nie są obsługiwane ręcznie.

Po włączeniu trybu batch-consumer określone nagłówki wsadowych komunikatów są wymienione poniżej, który zawiera listę wartości z każdego pojedynczego zdarzenia usługi Event Hubs.

Mapowanie między komunikatami usługi Event Hubs/ właściwościami zdarzeń i nagłówkami komunikatów spring w trybie odbiornika usługi Batch:

Właściwości zdarzeń usługi Event Hubs Stałe nagłówka komunikatów spring batch Type Opis
Czas w kolejce EventHubsHeaders#ENQUEUED_TIME Lista błyskawicznych Lista wystąpień w formacie UTC, kiedy każde zdarzenie zostało w kolejce w partycji centrum zdarzeń.
Przesunięcie EventHubsHeaders#OFFSET Lista długich Lista przesunięcia każdego zdarzenia, które zostało odebrane ze skojarzonej partycji centrum zdarzeń.
Klucz partycji AzureHeaders#PARTITION_KEY Lista ciągów Lista klucza wyznaczania wartości skrótu partycji, jeśli została ustawiona podczas pierwotnego publikowania każdego zdarzenia.
Numer sekwencyjny EventHubsHeaders#SEQUENCE_NUMBER Lista długich Lista numerów sekwencji przypisanych do każdego zdarzenia w kolejce w skojarzonej partycji centrum zdarzeń.
Właściwości systemu EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Lista mapy Lista właściwości systemowych każdego zdarzenia.
Właściwości aplikacji EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Lista mapy Lista właściwości aplikacji każdego zdarzenia, w którym są umieszczane wszystkie dostosowane nagłówki komunikatów lub właściwości zdarzenia.

Uwaga

Podczas publikowania komunikatów wszystkie powyższe nagłówki wsadowe zostaną usunięte z komunikatów, jeśli istnieją.

Przykłady

Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.

Integracja platformy Spring z usługą Azure Service Bus

Najważniejsze pojęcia

Integracja spring umożliwia uproszczone przesyłanie komunikatów w aplikacjach opartych na platformie Spring i obsługuje integrację z systemami zewnętrznymi za pośrednictwem kart deklaratywnych.

Projekt rozszerzenia Spring Integration for Azure Service Bus udostępnia karty kanału przychodzącego i wychodzącego dla usługi Azure Service Bus.

Uwaga

Interfejsy API obsługi completableFuture zostały wycofane z wersji 2.10.0 i zostały zastąpione przez Reactor Core z wersji 4.0.0. Aby uzyskać szczegółowe informacje, zobacz Javadoc.

Konfiguracja zależności

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfigurowanie

Ten starter udostępnia następujące 2 części opcji konfiguracji:

właściwości konfiguracji Połączenie ion

Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Service Bus.

Uwaga

Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzowanie dostępu za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.

Połączenie ion konfigurowalne właściwości rozwiązania spring-cloud-azure-starter-integration-servicebus:

Właściwość Type Opis
spring.cloud.azure.servicebus.enabled boolean Określa, czy usługa Azure Service Bus jest włączona.
spring.cloud.azure.servicebus.connection-string String Przestrzeń nazw usługi Service Bus parametry połączenia wartość.
spring.cloud.azure.servicebus.namespace String Wartość przestrzeni nazw usługi Service Bus, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name String Nazwa domeny wartości przestrzeni nazw usługi Azure Service Bus.

Właściwości konfiguracji procesora usługi Service Bus

Funkcja ServiceBusInboundChannelAdapter używa elementu ServiceBusProcessorClient do korzystania z komunikatów ServiceBusProcessorClientw celu skonfigurowania ogólnych właściwości elementu , których deweloperzy mogą używać ServiceBusContainerProperties do konfiguracji. Zapoznaj się z następującą sekcją dotyczącą pracy z ServiceBusInboundChannelAdapterprogramem .

Podstawowy sposób użycia

Wysyłanie komunikatów do usługi Azure Service Bus

  1. Wypełnij opcje konfiguracji poświadczeń.

    • W przypadku poświadczeń jako parametry połączenia skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Uwaga

Dozwolone tenant-id wartości to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użycie nieprawidłowego punktu końcowego (konta osobiste i konta organizacji) w sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje w dzierżawie. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Konwertowanie aplikacji z jedną dzierżawą na wielodostępny w usłudze Microsoft Entra ID.

  • W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Uwaga

Dozwolone tenant-id wartości to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użycie nieprawidłowego punktu końcowego (konta osobiste i konta organizacji) w sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje w dzierżawie. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Konwertowanie aplikacji z jedną dzierżawą na wielodostępny w usłudze Microsoft Entra ID.

  1. Utwórz DefaultMessageHandler za pomocą fasoli, ServiceBusTemplate aby wysyłać komunikaty do usługi Service Bus, ustaw typ jednostki serviceBusTemplate. W tym przykładzie kolejka usługi Service Bus jest przykładowa.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Wysyłanie komunikatów przy użyciu bramy.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Odbieranie komunikatów z usługi Azure Service Bus

  1. Wypełnij opcje konfiguracji poświadczeń.

  2. Utwórz fasolę kanału komunikatów jako kanał wejściowy.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Utwórz ServiceBusInboundChannelAdapter za pomocą fasoli ServiceBusMessageListenerContainer , aby odbierać komunikaty do usługi Service Bus. W tym przykładzie kolejka usługi Service Bus jest przykładowa.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Utwórz powiązanie ServiceBusInboundChannelAdapter odbiorcy komunikatów za pośrednictwem utworzonego wcześniej kanału komunikatów.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurowanie obiektu ServiceBusMessageConverter w celu dostosowania obiektuMapper

ServiceBusMessageConverter element jest konfigurowalny, aby umożliwić użytkownikom dostosowywanie ObjectMapperelementu .

Nagłówki komunikatów usługi Service Bus

W przypadku niektórych nagłówków usługi Service Bus, które mogą być mapowane na wiele stałych nagłówków Spring, jest wyświetlany priorytet różnych nagłówków Spring.

Mapowanie między nagłówkami usługi Service Bus i nagłówkami spring:

Nagłówki i właściwości komunikatów usługi Service Bus Stałe nagłówka wiadomości spring Typ Konfigurowalny opis
Typ zawartości MessageHeaders#CONTENT_TYPE String Tak Deskryptor RFC2045 typu zawartości komunikatu.
Identyfikator korelacji ServiceBusMessageHeaders#CORRELATION_ID String Tak Identyfikator korelacji komunikatu
Identyfikator komunikatu ServiceBusMessageHeaders#MESSAGE_ID String Tak Identyfikator wiadomości, ten nagłówek ma wyższy priorytet niż MessageHeaders#ID.
Identyfikator komunikatu MessageHeaders#ID UUID Tak Identyfikator komunikatu wiadomości, ten nagłówek ma niższy priorytet niż ServiceBusMessageHeaders#MESSAGE_ID.
Klucz partycji ServiceBusMessageHeaders#PARTITION_KEY String Tak Klucz partycji do wysyłania komunikatu do jednostki podzielonej na partycje.
Odpowiedz MessageHeaders#REPLY_CHANNEL String Tak Adres jednostki do wysyłania odpowiedzi.
Odpowiedz na identyfikator sesji ServiceBusMessageHeaders#REPLY_TO_SESSION_ID String Tak Wartość właściwości ReplyToGroupId wiadomości.
Zaplanowana godzina kolejki utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Tak Data/godzina, w której komunikat powinien być w kolejce w usłudze Service Bus, ten nagłówek ma wyższy priorytet niż AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Zaplanowana godzina kolejki utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Liczba całkowita Tak Data/godzina, w której komunikat powinien być w kolejce w usłudze Service Bus, ten nagłówek ma niższy priorytet niż ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Identyfikator sesji ServiceBusMessageHeaders#SESSION_ID String Tak Identyfikator sesji dla jednostki obsługującej sesję.
Time to live (Czas wygaśnięcia) ServiceBusMessageHeaders#TIME_TO_LIVE Czas trwania Tak Czas trwania przed wygaśnięciem tej wiadomości.
To ServiceBusMessageHeaders#TO String Tak Adres "do" komunikatu, zarezerwowany do użycia w przyszłości w scenariuszach routingu i obecnie ignorowany przez samego brokera.
Subject ServiceBusMessageHeaders#SUBJECT String Tak Temat wiadomości.
Opis błędu utraconych komunikatów ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION String Nie Opis komunikatu, który został utracony.
Przyczyna utraconych listów ServiceBusMessageHeaders#DEAD_LETTER_REASON String Nie Powodem, dla którego wiadomość została martwa.
Źródło utraconych listów ServiceBusMessageHeaders#DEAD_LETTER_SOURCE String Nie Jednostka, w której wiadomość została martwa.
Liczba dostaw ServiceBusMessageHeaders#DELIVERY_COUNT długi Nie. Liczba przypadków dostarczenia tego komunikatu do klientów.
Numer sekwencji w kolejce ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER długi Nie. Numer sekwencji w kolejce przypisany do komunikatu przez usługę Service Bus.
Czas w kolejce ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Nie. Data/godzina, w której ten komunikat został w kolejce w usłudze Service Bus.
Wygasa o godzinie ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Nie. Data/godzina wygaśnięcia tej wiadomości.
Blokowanie tokenu ServiceBusMessageHeaders#LOCK_TOKEN String Nie Token blokady dla bieżącego komunikatu.
Zablokowane do ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Nie. Data/godzina wygaśnięcia blokady tej wiadomości.
Numer sekwencyjny ServiceBusMessageHeaders#SEQUENCE_NUMBER długi Nie. Unikatowy numer przypisany do komunikatu przez usługę Service Bus.
Stan ServiceBusMessageHeaders#STATE ServiceBusMessageState Nie. Stan komunikatu, który może być aktywny, odroczony lub zaplanowany.

Obsługa klucza partycji

Ten starter obsługuje partycjonowanie usługi Service Bus, zezwalając na ustawienie klucza partycji i identyfikatora sesji w nagłówku komunikatu. W tej sekcji przedstawiono sposób ustawiania klucza partycji dla komunikatów.

Zalecane: użyj ServiceBusMessageHeaders.PARTITION_KEY jako klucza nagłówka.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Niezalecane, ale obecnie obsługiwane:AzureHeaders.PARTITION_KEY jako klucz nagłówka.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Uwaga

Jeśli obie ServiceBusMessageHeaders.PARTITION_KEY opcje i AzureHeaders.PARTITION_KEY są ustawione w nagłówkach komunikatów, ServiceBusMessageHeaders.PARTITION_KEY jest preferowana.

Obsługa sesji

W tym przykładzie pokazano, jak ręcznie ustawić identyfikator sesji komunikatu w aplikacji.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Uwaga

Po ustawieniu elementu ServiceBusMessageHeaders.SESSION_ID w nagłówkach wiadomości i ustawieniu innego ServiceBusMessageHeaders.PARTITION_KEY nagłówka wartość identyfikatora sesji zostanie ostatecznie użyta do zastąpienia wartości klucza partycji.

Przykłady

Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.

Integracja platformy Spring z kolejką usługi Azure Storage

Najważniejsze pojęcia

Azure Queue Storage to usługa służąca do przechowywania dużej liczby komunikatów. Uzyskujesz dostęp do komunikatów z dowolnego miejsca na świecie za pośrednictwem uwierzytelnionych wywołań przy użyciu protokołu HTTP lub HTTPS. Komunikat kolejki może mieć rozmiar do 64 KB. Kolejka może zawierać miliony komunikatów do całkowitego limitu pojemności konta magazynu. Kolejki są często używane do tworzenia listy prac w celu asynchronicznego przetwarzania.

Konfiguracja zależności

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfigurowanie

Ten starter udostępnia następujące opcje konfiguracji:

właściwości konfiguracji Połączenie ion

Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z kolejką usługi Azure Storage.

Uwaga

Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzowanie dostępu za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.

Połączenie ion konfigurowalne właściwości spring-cloud-azure-starter-integration-storage-queue:

Właściwość Type Opis
spring.cloud.azure.storage.queue.enabled boolean Określa, czy kolejka usługi Azure Storage jest włączona.
spring.cloud.azure.storage.queue.connection-string String Przestrzeń nazw kolejki magazynu parametry połączenia wartość.
spring.cloud.azure.storage.queue.accountName String Nazwa konta kolejki usługi Storage.
spring.cloud.azure.storage.queue.accountKey String Klucz konta kolejki usługi Storage.
spring.cloud.azure.storage.queue.endpoint String Punkt końcowy usługi Kolejka magazynu.
spring.cloud.azure.storage.queue.sasToken String Poświadczenia tokenu sygnatury dostępu współdzielonego
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion używana podczas tworzenia żądań interfejsu API.
spring.cloud.azure.storage.queue.messageEncoding String Kodowanie komunikatów w kolejce.

Podstawowy sposób użycia

Wysyłanie komunikatów do kolejki usługi Azure Storage

  1. Wypełnij opcje konfiguracji poświadczeń.

    • W przypadku poświadczeń jako parametry połączenia skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Uwaga

Dozwolone tenant-id wartości to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użycie nieprawidłowego punktu końcowego (konta osobiste i konta organizacji) w sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje w dzierżawie. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Konwertowanie aplikacji z jedną dzierżawą na wielodostępny w usłudze Microsoft Entra ID.

  • W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Uwaga

Dozwolone tenant-id wartości to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użycie nieprawidłowego punktu końcowego (konta osobiste i konta organizacji) w sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje w dzierżawie. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Konwertowanie aplikacji z jedną dzierżawą na wielodostępny w usłudze Microsoft Entra ID.

  1. Utwórz DefaultMessageHandler za pomocą fasoli, StorageQueueTemplate aby wysyłać komunikaty do kolejki magazynu.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Wysyłanie komunikatów przy użyciu bramy.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Odbieranie komunikatów z kolejki usługi Azure Storage

  1. Wypełnij opcje konfiguracji poświadczeń.

  2. Utwórz fasolę kanału komunikatów jako kanał wejściowy.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Utwórz StorageQueueMessageSource za StorageQueueTemplate pomocą fasoli, aby odbierać komunikaty do kolejki magazynu.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Utwórz powiązanie odbiorcy komunikatów za pomocą elementu StorageQueueMessageSource utworzonego w ostatnim kroku za pośrednictwem utworzonego wcześniej kanału komunikatów.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Przykłady

Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.