Udostępnij za pośrednictwem


Obsługa platformy Azure platformy Spring dla platformy Spring Integration

Ten artykuł dotyczy:✅ w wersji 4.19.0 ✅ w wersji 5.19.0

Rozszerzenie Spring Integration Extension dla platformy Azure udostępnia karty integracji spring dla różnych usług udostępnianych przez zestaw Azure SDK dla języka Java. Zapewniamy obsługę integracji spring dla tych usług platformy Azure: Event Hubs, Service Bus, Storage Queue. Poniżej znajduje się lista obsługiwanych kart:

Integracja platformy Spring z usługą Azure Event Hubs

Kluczowe pojęcia

Azure Event Hubs to platforma przesyłania strumieniowego danych big data i usługa pozyskiwania zdarzeń. Może odbierać i przetwarzać miliony zdarzeń na sekundę. Dane wysyłane do centrum zdarzeń można przekształcać i przechowywać przy użyciu dowolnego dostawcy analizy w czasie rzeczywistym lub kart wsadowych/magazynowych.

Integracja spring umożliwia uproszczone przesyłanie komunikatów w aplikacjach opartych na platformie Spring i obsługuje integrację z systemami zewnętrznymi za pośrednictwem kart deklaratywnych. Te karty zapewniają wyższy poziom abstrakcji dzięki obsłudze komunikacji wirtualnej, obsługi komunikatów i planowania platformy Spring. Projekt rozszerzenia Spring Integration for Event Hubs udostępnia karty i bramy kanału przychodzącego i wychodzącego dla usługi Azure Event Hubs.

Nuta

Interfejsy API obsługi RxJava są porzucane z wersji 4.0.0. Aby uzyskać szczegółowe informacje, zobacz Javadoc.

Grupa odbiorców

Usługa Event Hubs zapewnia podobną obsługę grupy odbiorców jako platformę Apache Kafka, ale z niewielką inną logiką. Podczas gdy platforma Kafka przechowuje wszystkie zatwierdzone przesunięcia w brokerze, musisz przechowywać przesunięcia komunikatów usługi Event Hubs przetwarzanych ręcznie. Zestaw SDK usługi Event Hubs udostępnia funkcję do przechowywania takich przesunięć w usłudze Azure Storage.

Obsługa partycjonowania

Usługa Event Hubs udostępnia podobną koncepcję partycji fizycznej jako platformy Kafka. Jednak w przeciwieństwie do automatycznego równoważenia obciążenia platformy Kafka między użytkownikami i partycjami usługa Event Hubs zapewnia rodzaj trybu wyprzedzania. Konto magazynu działa jako dzierżawa, aby określić, która partycja jest własnością użytkownika. Po uruchomieniu nowego użytkownika spróbuje ukraść niektóre partycje od najbardziej obciążonych użytkowników w celu osiągnięcia równoważenia obciążenia.

Aby określić strategię równoważenia obciążenia, deweloperzy mogą używać EventHubsContainerProperties dla konfiguracji. Zobacz poniższej sekcji przykład konfigurowania EventHubsContainerProperties.

Obsługa konsumentów usługi Batch

EventHubsInboundChannelAdapter obsługuje tryb przetwarzania wsadowego. Aby ją włączyć, użytkownicy mogą określić tryb odbiornika jako ListenerMode.BATCH podczas konstruowania wystąpienia EventHubsInboundChannelAdapter. Po włączeniu komunikat, którego ładunek jest listą zdarzeń wsadowych, zostanie odebrana i przekazana do kanału podrzędnego. Każdy nagłówek wiadomości jest również konwertowany jako lista, z której zawartość jest skojarzona wartość nagłówka analizowana z każdego zdarzenia. W przypadku nagłówków wspólnych identyfikatora partycji, modułu kontrolnego i ostatnio w kolejce właściwości są one prezentowane jako pojedyncza wartość dla całej partii zdarzeń współudzielonych w tej samej partii zdarzeń. Aby uzyskać więcej informacji, zobacz sekcję Nagłówki komunikatów usługi Event Hubs.

Nuta

Nagłówek punktu kontrolnego istnieje tylko wtedy, gdy jest używany ręczny tryb punktu kontrolnego.

Punkt kontrolny odbiorcy wsadowego obsługuje dwa tryby: BATCH i MANUAL. BATCH tryb jest trybem automatycznego tworzenia punktów kontrolnych w celu sprawdzenia całej partii zdarzeń jednocześnie po ich odebraniu. MANUAL tryb polega na określeniu punktów kontrolnych zdarzeń przez użytkowników. W przypadku użycia Checkpointer zostaną przekazane do nagłówka wiadomości, a użytkownicy mogą używać go do tworzenia punktów kontrolnych.

Zasady zużywania wsadowego mogą być określane przez właściwości max-size i max-wait-time, gdzie max-size jest niezbędną właściwością, podczas gdy max-wait-time jest opcjonalna. Aby określić strategię zużywania wsadowego, deweloperzy mogą używać EventHubsContainerProperties dla konfiguracji. Zobacz poniższej sekcji przykład konfigurowania EventHubsContainerProperties.

Konfiguracja zależności

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfiguracja

Ten starter udostępnia następujące 3 części opcji konfiguracji:

Właściwości konfiguracji połączenia

Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Event Hubs.

Nuta

Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzuj dostęp za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.

Konfigurowalne właściwości połączenia spring-cloud-azure-starter-integration-eventhubs:

Własność Typ Opis
spring.cloud.azure.eventhubs.enabled boolowski Określa, czy usługa Azure Event Hubs jest włączona.
spring.cloud.azure.eventhubs.connection-string Struna Wartość parametrów połączenia przestrzeni nazw usługi Event Hubs.
spring.cloud.azure.eventhubs.namespace Struna Wartość przestrzeni nazw usługi Event Hubs, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Struna Nazwa domeny wartości przestrzeni nazw usługi Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Struna Niestandardowy adres punktu końcowego.
spring.cloud.azure.eventhubs.shared-connection Boolowski Czy bazowe klasy EventProcessorClient i EventHubProducerAsyncClient używają tego samego połączenia. Domyślnie tworzone jest nowe połączenie i używane dla każdego utworzonego klienta centrum zdarzeń.

Właściwości konfiguracji punktu kontrolnego

Ta sekcja zawiera opcje konfiguracji usługi Storage Blobs Service, która jest używana do utrwalania własności partycji i informacji o punkcie kontrolnym.

Nuta

W wersji 4.0.0, jeśli właściwość spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nie jest włączona ręcznie, żaden kontener magazynu nie zostanie utworzony automatycznie.

Tworzenie punktów kontrolnych konfigurowalnych właściwości spring-cloud-azure-starter-integration-eventhubs:

Własność Typ Opis
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolowski Czy zezwolić na tworzenie kontenerów, jeśli nie istnieje.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Struna Nazwa konta magazynu.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Struna Klucz dostępu do konta magazynu.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Struna Nazwa kontenera magazynu.

Typowe opcje konfiguracji zestawu SDK usługi Azure Service można również konfigurować dla magazynu punktów kontrolnych obiektów blob usługi Storage. Obsługiwane opcje konfiguracji są wprowadzane w konfiguracji platformy Azure Spring Cloudi można je skonfigurować za pomocą ujednoliconego prefiksu spring.cloud.azure. lub prefiksu spring.cloud.azure.eventhubs.processor.checkpoint-store.

Właściwości konfiguracji procesora centrum zdarzeń

EventHubsInboundChannelAdapter używa EventProcessorClient do korzystania z komunikatów z centrum zdarzeń, aby skonfigurować ogólne właściwości EventProcessorClient, deweloperzy mogą używać EventHubsContainerProperties do konfiguracji. Zobacz poniższej sekcji o sposobie pracy z EventHubsInboundChannelAdapter.

Podstawowe użycie

Wysyłanie komunikatów do usługi Azure Event Hubs

  1. Wypełnij opcje konfiguracji poświadczeń.

    • W przypadku poświadczeń jako parametrów połączenia skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Nuta

Dozwolone wartości dla tenant-id to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.

  1. Utwórz DefaultMessageHandler za pomocą fasoli EventHubsTemplate do wysyłania komunikatów do usługi Event Hubs.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Wysyłanie komunikatów przy użyciu bramy.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Odbieranie komunikatów z usługi Azure Event Hubs

  1. Wypełnij opcje konfiguracji poświadczeń.

  2. Utwórz fasolę kanału komunikatów jako kanał wejściowy.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Utwórz EventHubsInboundChannelAdapter przy użyciu fasoli EventHubsMessageListenerContainer w celu odbierania komunikatów z usługi Event Hubs.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Utwórz powiązanie odbiorcy komunikatów za pomocą elementu EventHubsInboundChannelAdapter za pośrednictwem utworzonego wcześniej kanału komunikatów.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurowanie obiektu EventHubsMessageConverter w celu dostosowania obiektuMapper

EventHubsMessageConverter jest stosowana jako konfigurowalna fasola umożliwiająca użytkownikom dostosowywanie obiektuMapper.

Obsługa konsumentów usługi Batch

Aby korzystać z komunikatów z usługi Event Hubs w partiach, podobnie jak w przypadku powyższego przykładu, oprócz tego użytkownicy powinni ustawić opcje konfiguracji związane z przetwarzaniem wsadowym dla EventHubsInboundChannelAdapter.

Podczas tworzenia EventHubsInboundChannelAdaptertryb odbiornika powinien być ustawiony jako BATCH. Podczas tworzenia fasoli EventHubsMessageListenerContainerustaw tryb punktu kontrolnego jako MANUAL lub BATCH, a opcje wsadowe można skonfigurować zgodnie z potrzebami.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Nagłówki komunikatów usługi Event Hubs

W poniższej tabeli przedstawiono sposób mapowania właściwości komunikatów usługi Event Hubs na nagłówki komunikatów spring. W przypadku usługi Azure Event Hubs komunikat jest wywoływany jako event.

Mapowanie między komunikatami usługi Event Hubs/właściwościami zdarzeń i nagłówkami komunikatów spring w trybie odbiornika rekordu:

Właściwości zdarzeń usługi Event Hubs Stałe nagłówka komunikatów spring Typ Opis
Czas w kolejce EventHubsHeaders#ENQUEUED_TIME Chwila Moment, w formacie UTC, kiedy zdarzenie zostało w kolejce w partycji centrum zdarzeń.
Przesunięcie EventHubsHeaders#OFFSET Długi Przesunięcie zdarzenia, gdy zostało odebrane z skojarzonej partycji centrum zdarzeń.
Klucz partycji AzureHeaders#PARTITION_KEY Struna Klucz wyznaczania wartości skrótu partycji, jeśli został ustawiony podczas pierwotnego publikowania zdarzenia.
Identyfikator partycji AzureHeaders#RAW_PARTITION_ID Struna Identyfikator partycji centrum zdarzeń.
Numer sekwencji EventHubsHeaders#SEQUENCE_NUMBER Długi Numer sekwencji przypisany do zdarzenia, gdy został on w kolejce w skojarzonej partycji centrum zdarzeń.
Ostatnie właściwości zdarzeń w kolejce EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Właściwości ostatniego zdarzenia w kolejce w tej partycji.
NIE AzureHeaders#CHECKPOINTER Checkpointer Nagłówek punktu kontrolnego dla określonego komunikatu.

Użytkownicy mogą analizować nagłówki komunikatów dla powiązanych informacji o każdym zdarzeniu. Aby ustawić nagłówek komunikatu dla zdarzenia, wszystkie dostosowane nagłówki zostaną umieszczone jako właściwość aplikacji zdarzenia, gdzie nagłówek jest ustawiony jako klucz właściwości. Po odebraniu zdarzeń z usługi Event Hubs wszystkie właściwości aplikacji zostaną przekonwertowane na nagłówek komunikatu.

Nuta

Nagłówki komunikatów klucza partycji, czas w kolejce, przesunięcie i numer sekwencji nie są obsługiwane ręcznie.

Po włączeniu trybu batch-consumer określone nagłówki wsadowych komunikatów są wymienione poniżej, który zawiera listę wartości z każdego pojedynczego zdarzenia usługi Event Hubs.

Mapowanie między komunikatami usługi Event Hubs/ właściwościami zdarzeń i nagłówkami komunikatów spring w trybie odbiornika usługi Batch:

Właściwości zdarzeń usługi Event Hubs Stałe nagłówka komunikatów spring batch Typ Opis
Czas w kolejce EventHubsHeaders#ENQUEUED_TIME Lista błyskawicznych Lista wystąpień w formacie UTC, kiedy każde zdarzenie zostało w kolejce w partycji centrum zdarzeń.
Przesunięcie EventHubsHeaders#OFFSET Lista długich Lista przesunięcia każdego zdarzenia, które zostało odebrane ze skojarzonej partycji centrum zdarzeń.
Klucz partycji AzureHeaders#PARTITION_KEY Lista ciągów Lista klucza wyznaczania wartości skrótu partycji, jeśli została ustawiona podczas pierwotnego publikowania każdego zdarzenia.
Numer sekwencji EventHubsHeaders#SEQUENCE_NUMBER Lista długich Lista numerów sekwencji przypisanych do każdego zdarzenia w kolejce w skojarzonej partycji centrum zdarzeń.
Właściwości systemu EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Lista mapy Lista właściwości systemowych każdego zdarzenia.
Właściwości aplikacji EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Lista mapy Lista właściwości aplikacji każdego zdarzenia, w którym są umieszczane wszystkie dostosowane nagłówki komunikatów lub właściwości zdarzenia.

Nuta

Podczas publikowania komunikatów wszystkie powyższe nagłówki wsadowe zostaną usunięte z komunikatów, jeśli istnieją.

Próbki

Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.

Integracja platformy Spring z usługą Azure Service Bus

Kluczowe pojęcia

Integracja spring umożliwia uproszczone przesyłanie komunikatów w aplikacjach opartych na platformie Spring i obsługuje integrację z systemami zewnętrznymi za pośrednictwem kart deklaratywnych.

Projekt rozszerzenia Spring Integration for Azure Service Bus udostępnia karty kanału przychodzącego i wychodzącego dla usługi Azure Service Bus.

Nuta

Interfejsy API obsługi completableFuture zostały wycofane z wersji 2.10.0 i zostały zastąpione przez Reactor Core z wersji 4.0.0. Aby uzyskać szczegółowe informacje, zobacz Javadoc.

Konfiguracja zależności

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfiguracja

Ten starter udostępnia następujące 2 części opcji konfiguracji:

Właściwości konfiguracji połączenia

Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Service Bus.

Nuta

Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzuj dostęp za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.

Konfigurowalne właściwości połączenia spring-cloud-azure-starter-integration-servicebus:

Własność Typ Opis
spring.cloud.azure.servicebus.enabled boolowski Określa, czy usługa Azure Service Bus jest włączona.
spring.cloud.azure.servicebus.connection-string Struna Wartość parametrów połączenia przestrzeni nazw usługi Service Bus.
spring.cloud.azure.servicebus.custom-endpoint-address Struna Niestandardowy adres punktu końcowego do użycia podczas nawiązywania połączenia z usługą Service Bus.
spring.cloud.azure.servicebus.namespace Struna Wartość przestrzeni nazw usługi Service Bus, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Struna Nazwa domeny wartości przestrzeni nazw usługi Azure Service Bus.

Właściwości konfiguracji procesora usługi Service Bus

ServiceBusInboundChannelAdapter używa ServiceBusProcessorClient do korzystania z komunikatów, aby skonfigurować ogólne właściwości ServiceBusProcessorClient, deweloperzy mogą używać ServiceBusContainerProperties do konfiguracji. Zobacz poniższej sekcji o sposobie pracy z ServiceBusInboundChannelAdapter.

Podstawowe użycie

Wysyłanie komunikatów do usługi Azure Service Bus

  1. Wypełnij opcje konfiguracji poświadczeń.

    • W przypadku poświadczeń jako parametrów połączenia skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Nuta

Dozwolone wartości dla tenant-id to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.

  • W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Nuta

Dozwolone wartości dla tenant-id to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.

  1. Utwórz DefaultMessageHandler za pomocą fasoli ServiceBusTemplate, aby wysyłać komunikaty do usługi Service Bus, ustaw typ jednostki serviceBusTemplate. W tym przykładzie kolejka usługi Service Bus jest przykładowa.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Wysyłanie komunikatów przy użyciu bramy.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Odbieranie komunikatów z usługi Azure Service Bus

  1. Wypełnij opcje konfiguracji poświadczeń.

  2. Utwórz fasolę kanału komunikatów jako kanał wejściowy.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Utwórz ServiceBusInboundChannelAdapter przy użyciu fasoli ServiceBusMessageListenerContainer w celu odbierania komunikatów do usługi Service Bus. W tym przykładzie kolejka usługi Service Bus jest przykładowa.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Utwórz powiązanie odbiorcy komunikatów z ServiceBusInboundChannelAdapter za pośrednictwem utworzonego wcześniej kanału wiadomości.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurowanie obiektu ServiceBusMessageConverter w celu dostosowania obiektuMapper

ServiceBusMessageConverter jest stosowana jako konfigurowalna fasola umożliwiająca użytkownikom dostosowywanie ObjectMapper.

Nagłówki komunikatów usługi Service Bus

W przypadku niektórych nagłówków usługi Service Bus, które mogą być mapowane na wiele stałych nagłówków Spring, jest wyświetlany priorytet różnych nagłówków Spring.

Mapowanie między nagłówkami usługi Service Bus i nagłówkami spring:

Nagłówki i właściwości komunikatów usługi Service Bus Stałe nagłówka wiadomości spring Typ Konfigurowalne Opis
Typ zawartości MessageHeaders#CONTENT_TYPE Struna Tak Deskryptor RFC2045 typu zawartości komunikatu.
Identyfikator korelacji ServiceBusMessageHeaders#CORRELATION_ID Struna Tak Identyfikator korelacji komunikatu
Identyfikator komunikatu ServiceBusMessageHeaders#MESSAGE_ID Struna Tak Identyfikator komunikatu komunikatu, ten nagłówek ma wyższy priorytet niż MessageHeaders#ID.
Identyfikator komunikatu MessageHeaders#ID Identyfikator UUID Tak Identyfikator komunikatu wiadomości, ten nagłówek ma niższy priorytet niż ServiceBusMessageHeaders#MESSAGE_ID.
Klucz partycji ServiceBusMessageHeaders#PARTITION_KEY Struna Tak Klucz partycji do wysyłania komunikatu do jednostki podzielonej na partycje.
Odpowiedz na MessageHeaders#REPLY_CHANNEL Struna Tak Adres jednostki do wysyłania odpowiedzi.
Odpowiedz na identyfikator sesji ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Struna Tak Wartość właściwości ReplyToGroupId wiadomości.
Zaplanowana godzina kolejki utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Tak Data/godzina, w której komunikat powinien być w kolejce w usłudze Service Bus, ten nagłówek ma wyższy priorytet niż AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Zaplanowana godzina kolejki utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Liczba całkowita Tak Data/godzina, w której komunikat powinien być w kolejce w usłudze Service Bus, ten nagłówek ma niższy priorytet niż ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Identyfikator sesji ServiceBusMessageHeaders#SESSION_ID Struna Tak Identyfikator sesji dla jednostki obsługującej sesję.
Czas wygaśnięcia ServiceBusMessageHeaders#TIME_TO_LIVE Czas trwania Tak Czas trwania przed wygaśnięciem tej wiadomości.
Do ServiceBusMessageHeaders#TO Struna Tak Adres "do" komunikatu, zarezerwowany do użycia w przyszłości w scenariuszach routingu i obecnie ignorowany przez samego brokera.
Temat ServiceBusMessageHeaders#SUBJECT Struna Tak Temat wiadomości.
Opis błędu utraconych komunikatów ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Struna Nie Opis komunikatu, który został utracony.
Przyczyna utraconych listów ServiceBusMessageHeaders#DEAD_LETTER_REASON Struna Nie Powodem, dla którego wiadomość została martwa.
Źródło utraconych listów ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Struna Nie Jednostka, w której wiadomość została martwa.
Liczba dostaw ServiceBusMessageHeaders#DELIVERY_COUNT długi Nie Liczba przypadków dostarczenia tego komunikatu do klientów.
Numer sekwencji w kolejce ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER długi Nie Numer sekwencji w kolejce przypisany do komunikatu przez usługę Service Bus.
Czas w kolejce ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Nie Data/godzina, w której ten komunikat został w kolejce w usłudze Service Bus.
Wygasa o godzinie ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Nie Data/godzina wygaśnięcia tej wiadomości.
Blokowanie tokenu ServiceBusMessageHeaders#LOCK_TOKEN Struna Nie Token blokady dla bieżącego komunikatu.
Zablokowane do ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Nie Data/godzina wygaśnięcia blokady tej wiadomości.
Numer sekwencji ServiceBusMessageHeaders#SEQUENCE_NUMBER długi Nie Unikatowy numer przypisany do komunikatu przez usługę Service Bus.
Stan ServiceBusMessageHeaders#STATE ServiceBusMessageState Nie Stan komunikatu, który może być aktywny, odroczony lub zaplanowany.

Obsługa klucza partycji

Ten starter obsługuje partycjonowanie usługi Service Bus, umożliwiając ustawienie klucza partycji i identyfikatora sesji w nagłówku komunikatu. W tej sekcji przedstawiono sposób ustawiania klucza partycji dla komunikatów.

Zalecane: użyj ServiceBusMessageHeaders.PARTITION_KEY jako klucza nagłówka.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Niezalecane, ale obecnie obsługiwane: AzureHeaders.PARTITION_KEY jako klucz nagłówka.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nuta

Jeśli obie ServiceBusMessageHeaders.PARTITION_KEY i AzureHeaders.PARTITION_KEY są ustawione w nagłówkach komunikatów, preferowane jest ServiceBusMessageHeaders.PARTITION_KEY.

Obsługa sesji

W tym przykładzie pokazano, jak ręcznie ustawić identyfikator sesji komunikatu w aplikacji.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nuta

Gdy ServiceBusMessageHeaders.SESSION_ID jest ustawiona w nagłówkach wiadomości, a inny nagłówek ServiceBusMessageHeaders.PARTITION_KEY jest również ustawiony, wartość identyfikatora sesji zostanie ostatecznie użyta do zastąpienia wartości klucza partycji.

Dostosowywanie właściwości klienta usługi Service Bus

Deweloperzy mogą używać AzureServiceClientBuilderCustomizer do dostosowywania właściwości klienta usługi Service Bus. Poniższy przykład dostosowuje właściwość sessionIdleTimeout w ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Próbki

Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.

Integracja platformy Spring z kolejką usługi Azure Storage

Kluczowe pojęcia

Azure Queue Storage to usługa do przechowywania dużej liczby komunikatów. Uzyskujesz dostęp do komunikatów z dowolnego miejsca na świecie za pośrednictwem uwierzytelnionych wywołań przy użyciu protokołu HTTP lub HTTPS. Komunikat kolejki może mieć rozmiar do 64 KB. Kolejka może zawierać miliony komunikatów do całkowitego limitu pojemności konta magazynu. Kolejki są często używane do tworzenia listy prac w celu asynchronicznego przetwarzania.

Konfiguracja zależności

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfiguracja

Ten starter udostępnia następujące opcje konfiguracji:

Właściwości konfiguracji połączenia

Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z kolejką usługi Azure Storage.

Nuta

Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzuj dostęp za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.

Konfigurowalne właściwości połączenia spring-cloud-azure-starter-integration-storage-queue:

Własność Typ Opis
spring.cloud.azure.storage.queue.enabled boolowski Określa, czy kolejka usługi Azure Storage jest włączona.
spring.cloud.azure.storage.queue.connection-string Struna Wartość parametrów połączenia przestrzeni nazw kolejki magazynu.
spring.cloud.azure.storage.queue.accountName Struna Nazwa konta kolejki usługi Storage.
spring.cloud.azure.storage.queue.accountKey Struna Klucz konta kolejki usługi Storage.
spring.cloud.azure.storage.queue.endpoint Struna Punkt końcowy usługi Kolejka magazynu.
spring.cloud.azure.storage.queue.sasToken Struna Poświadczenia tokenu sygnatury dostępu współdzielonego
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion używana podczas tworzenia żądań interfejsu API.
spring.cloud.azure.storage.queue.messageEncoding Struna Kodowanie komunikatów w kolejce.

Podstawowe użycie

Wysyłanie komunikatów do kolejki usługi Azure Storage

  1. Wypełnij opcje konfiguracji poświadczeń.

    • W przypadku poświadczeń jako parametrów połączenia skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      
    • W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Nuta

Dozwolone wartości dla tenant-id to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.

  • W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Nuta

Dozwolone wartości dla tenant-id to: common, organizations, consumerslub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.

  1. Utwórz DefaultMessageHandler za pomocą fasoli StorageQueueTemplate do wysyłania komunikatów do kolejki magazynu.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Wysyłanie komunikatów przy użyciu bramy.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Odbieranie komunikatów z kolejki usługi Azure Storage

  1. Wypełnij opcje konfiguracji poświadczeń.

  2. Utwórz fasolę kanału komunikatów jako kanał wejściowy.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Utwórz StorageQueueMessageSource za pomocą fasoli StorageQueueTemplate w celu odbierania komunikatów do kolejki magazynu.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Utwórz powiązanie odbiorcy komunikatów za pomocą elementu StorageQueueMessageSource utworzonego w ostatnim kroku za pośrednictwem utworzonego wcześniej kanału komunikatów.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Próbki

Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.