Spring Cloud pomoc techniczna platformy Azure for Spring Integration
Ten artykuł dotyczy: ✔️ wersja 4.14.0 ✔️ w wersji 5.8.0
Rozszerzenie Spring Integration Extension dla platformy Azure udostępnia karty integracji spring dla różnych usług udostępnianych przez zestaw Azure SDK dla języka Java. Zapewniamy obsługę integracji spring dla tych usług platformy Azure: Event Hubs, Service Bus, Storage Queue. Poniżej znajduje się lista obsługiwanych kart:
spring-cloud-azure-starter-integration-eventhubs
— aby uzyskać więcej informacji, zobacz Spring Integration with Azure Event Hubs (Integracja spring z usługą Azure Event Hubs)spring-cloud-azure-starter-integration-servicebus
— aby uzyskać więcej informacji, zobacz Spring Integration with Azure Service Bus (Integracja spring z usługą Azure Service Bus)spring-cloud-azure-starter-integration-storage-queue
— aby uzyskać więcej informacji, zobacz Spring Integration with Azure Storage Queue (Integracja platformy Spring z kolejką usługi Azure Storage)
Integracja platformy Spring z usługą Azure Event Hubs
Najważniejsze pojęcia
Usługa Azure Event Hubs to platforma przesyłania strumieniowego danych big data i usługa pozyskiwania zdarzeń. Może odbierać i przetwarzać miliony zdarzeń na sekundę. Dane wysłane do centrum zdarzeń mogą zostać przekształcone i zmagazynowane przy użyciu dowolnego dostawcy analityki czasu rzeczywistego lub adapterów przetwarzania wsadowego/magazynowania.
Integracja spring umożliwia uproszczone przesyłanie komunikatów w aplikacjach opartych na platformie Spring i obsługuje integrację z systemami zewnętrznymi za pośrednictwem kart deklaratywnych. Te karty zapewniają wyższy poziom abstrakcji dzięki obsłudze komunikacji wirtualnej, obsługi komunikatów i planowania platformy Spring. Projekt rozszerzenia Spring Integration for Event Hubs udostępnia karty i bramy kanału przychodzącego i wychodzącego dla usługi Azure Event Hubs.
Uwaga
Interfejsy API obsługi RxJava są porzucane z wersji 4.0.0. Aby uzyskać szczegółowe informacje, zobacz Javadoc.
Grupa konsumentów
Usługa Event Hubs zapewnia podobną obsługę grupy odbiorców jako platformę Apache Kafka, ale z niewielką inną logiką. Podczas gdy platforma Kafka przechowuje wszystkie zatwierdzone przesunięcia w brokerze, musisz przechowywać przesunięcia komunikatów usługi Event Hubs przetwarzanych ręcznie. Zestaw SDK usługi Event Hubs udostępnia funkcję do przechowywania takich przesunięć w usłudze Azure Storage.
Obsługa partycjonowania
Usługa Event Hubs udostępnia podobną koncepcję partycji fizycznej jako platformy Kafka. Jednak w przeciwieństwie do automatycznego równoważenia obciążenia platformy Kafka między użytkownikami i partycjami usługa Event Hubs zapewnia rodzaj trybu wyprzedzania. Konto magazynu działa jako dzierżawa, aby określić, która partycja jest własnością użytkownika. Po uruchomieniu nowego użytkownika spróbuje ukraść niektóre partycje od najbardziej obciążonych użytkowników w celu osiągnięcia równoważenia obciążenia.
Aby określić strategię równoważenia obciążenia, deweloperzy mogą używać EventHubsContainerProperties
jej do konfiguracji. Zapoznaj się z następującą sekcją , aby zapoznać się z przykładem konfigurowania EventHubsContainerProperties
programu .
Obsługa konsumentów usługi Batch
Element EventHubsInboundChannelAdapter
obsługuje tryb przetwarzania wsadowego. Aby ją włączyć, użytkownicy mogą określić tryb odbiornika tak, jak ListenerMode.BATCH
podczas konstruowania EventHubsInboundChannelAdapter
wystąpienia.
Po włączeniu komunikat, którego ładunek jest listą zdarzeń wsadowych, zostanie odebrany i przekazany do kanału podrzędnego. Każdy nagłówek wiadomości jest również konwertowany jako lista, z której zawartość jest skojarzona wartość nagłówka analizowana z każdego zdarzenia. W przypadku nagłówków wspólnych identyfikatora partycji, modułu kontrolnego i ostatnio w kolejce właściwości są one prezentowane jako pojedyncza wartość dla całej partii zdarzeń współudzielonych w tej samej partii zdarzeń. Aby uzyskać więcej informacji, zobacz sekcję Nagłówki komunikatów usługi Event Hubs.
Uwaga
Nagłówek punktu kontrolnego istnieje tylko wtedy, gdy jest używany tryb RĘCZNEgo punktu kontrolnego.
Punkt kontrolny odbiorcy wsadowego obsługuje dwa tryby: BATCH
i MANUAL
. BATCH
tryb to tryb automatycznego tworzenia punktów kontrolnych umożliwiający sprawdzenie całej partii zdarzeń po ich odebraniu. MANUAL
tryb polega na określeniu punktów kontrolnych zdarzeń przez użytkowników. Gdy jest używany, program Checkpointer zostanie przekazany do nagłówka wiadomości, a użytkownicy mogą używać go do tworzenia punktów kontrolnych.
Zasady zużywania wsadowego max-size
można określić za pomocą właściwości i max-wait-time
, gdzie max-size
jest to wymagana właściwość, chociaż max-wait-time
jest opcjonalna.
Aby określić strategię zużywania wsadowego, deweloperzy mogą używać EventHubsContainerProperties
jej do konfiguracji. Zapoznaj się z następującą sekcją , aby zapoznać się z przykładem konfigurowania EventHubsContainerProperties
programu .
Konfiguracja zależności
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Konfigurowanie
Ten starter udostępnia następujące 3 części opcji konfiguracji:
Właściwości konfiguracji Połączenie ion
Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Event Hubs.
Uwaga
Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzowanie dostępu za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.
Połączenie ion konfigurowalne właściwości spring-cloud-azure-starter-integration-eventhubs:
Właściwość | Type | Opis |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolean | Określa, czy usługa Azure Event Hubs jest włączona. |
spring.cloud.azure.eventhubs.connection-string | String | Przestrzeń nazw usługi Event Hubs parametry połączenia wartość. |
spring.cloud.azure.eventhubs.namespace | String | Wartość przestrzeni nazw usługi Event Hubs, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | String | Nazwa domeny wartości przestrzeni nazw usługi Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | String | Niestandardowy adres punktu końcowego. |
spring.cloud.azure.eventhubs.shared-connection | Wartość logiczna | Czy bazowe klasy EventProcessorClient i EventHubProducerAsyncClient używają tego samego połączenia. Domyślnie tworzone jest nowe połączenie i używane dla każdego utworzonego klienta centrum zdarzeń. |
Właściwości konfiguracji punktu kontrolnego
Ta sekcja zawiera opcje konfiguracji usługi Storage Blobs Service, która jest używana do utrwalania własności partycji i informacji o punkcie kontrolnym.
Uwaga
W wersji 4.0.0 właściwość spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nie jest włączona ręcznie, żaden kontener magazynu nie zostanie utworzony automatycznie.
Tworzenie punktów kontrolnych konfigurowalnych właściwości spring-cloud-azure-starter-integration-eventhubs:
Właściwość | Type | Opis |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Wartość logiczna | Czy zezwolić na tworzenie kontenerów, jeśli nie istnieje. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | String | Nazwa konta magazynu. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | String | Klucz dostępu do konta magazynu. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | String | Nazwa kontenera magazynu. |
Typowe opcje konfiguracji zestawu SDK usługi Azure Service można również konfigurować dla magazynu punktów kontrolnych obiektów blob usługi Storage. Obsługiwane opcje konfiguracji są wprowadzane w konfiguracji platformy Azure Spring Cloud i można je skonfigurować za pomocą ujednoliconego prefiksu lub prefiksu spring.cloud.azure.
spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Właściwości konfiguracji procesora centrum zdarzeń
Funkcja EventHubsInboundChannelAdapter
używa elementu EventProcessorClient
do korzystania z komunikatów z centrum zdarzeń w celu skonfigurowania ogólnych właściwości EventProcessorClient
programu , których deweloperzy mogą używać EventHubsContainerProperties
do konfiguracji. Zapoznaj się z następującą sekcją dotyczącą pracy z EventHubsInboundChannelAdapter
programem .
Podstawowy sposób użycia
Wysyłanie komunikatów do usługi Azure Event Hubs
Wypełnij opcje konfiguracji poświadczeń.
W przypadku poświadczeń jako parametry połączenia skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Uwaga
Dozwolone tenant-id
wartości to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użycie nieprawidłowego punktu końcowego (konta osobiste i konta organizacji) w sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje w dzierżawie. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Konwertowanie aplikacji z jedną dzierżawą na wielodostępny w usłudze Microsoft Entra ID.
Utwórz
DefaultMessageHandler
za pomocą fasoli,EventHubsTemplate
aby wysyłać komunikaty do usługi Event Hubs.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Wysyłanie komunikatów przy użyciu bramy.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Odbieranie komunikatów z usługi Azure Event Hubs
Wypełnij opcje konfiguracji poświadczeń.
Utwórz fasolę kanału komunikatów jako kanał wejściowy.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Utwórz
EventHubsInboundChannelAdapter
za pomocą fasoliEventHubsMessageListenerContainer
, aby odbierać komunikaty z usługi Event Hubs.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Utwórz powiązanie odbiorcy komunikatów za pomocą elementu EventHubsInboundChannelAdapter za pośrednictwem utworzonego wcześniej kanału komunikatów.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurowanie obiektu EventHubsMessageConverter w celu dostosowania obiektuMapper
EventHubsMessageConverter
jest konfigurowalny fasola, aby umożliwić użytkownikom dostosowywanie obiektu ObjectMapper.
Obsługa konsumentów usługi Batch
Aby korzystać z komunikatów z usługi Event Hubs w partiach, jest podobne do powyższego przykładu, oprócz tego użytkownicy powinni ustawić opcje konfiguracji związane z wsadem dla EventHubsInboundChannelAdapter
elementu .
Podczas tworzenia EventHubsInboundChannelAdapter
należy ustawić tryb odbiornika jako BATCH
. Podczas tworzenia fasoli programu ustaw tryb punktu kontrolnego EventHubsMessageListenerContainer
jako MANUAL
lub BATCH
, a opcje wsadowe można skonfigurować zgodnie z potrzebami.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Nagłówki komunikatów usługi Event Hubs
W poniższej tabeli przedstawiono sposób mapowania właściwości komunikatów usługi Event Hubs na nagłówki komunikatów spring. W przypadku usługi Azure Event Hubs komunikat jest wywoływany jako event
.
Mapowanie między komunikatami usługi Event Hubs/właściwościami zdarzeń i nagłówkami komunikatów spring w trybie odbiornika rekordu:
Właściwości zdarzeń usługi Event Hubs | Stałe nagłówka komunikatów spring | Type | Opis |
---|---|---|---|
Czas w kolejce | EventHubsHeaders#ENQUEUED_TIME | Błyskawiczne | Moment, w formacie UTC, kiedy zdarzenie zostało w kolejce w partycji centrum zdarzeń. |
Przesunięcie | EventHubsHeaders#OFFSET | Długi | Przesunięcie zdarzenia, gdy zostało odebrane z skojarzonej partycji centrum zdarzeń. |
Klucz partycji | AzureHeaders#PARTITION_KEY | String | Klucz wyznaczania wartości skrótu partycji, jeśli został ustawiony podczas pierwotnego publikowania zdarzenia. |
Identyfikator partycji | AzureHeaders#RAW_PARTITION_ID | String | Identyfikator partycji centrum zdarzeń. |
Numer sekwencyjny | EventHubsHeaders#SEQUENCE_NUMBER | Długi | Numer sekwencji przypisany do zdarzenia, gdy został on w kolejce w skojarzonej partycji centrum zdarzeń. |
Ostatnie właściwości zdarzeń w kolejce | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Właściwości ostatniego zdarzenia w kolejce w tej partycji. |
NA | AzureHeaders#CHECKPOINTER | Checkpointer | Nagłówek punktu kontrolnego dla określonego komunikatu. |
Użytkownicy mogą analizować nagłówki komunikatów dla powiązanych informacji o każdym zdarzeniu. Aby ustawić nagłówek komunikatu dla zdarzenia, wszystkie dostosowane nagłówki zostaną umieszczone jako właściwość aplikacji zdarzenia, gdzie nagłówek jest ustawiony jako klucz właściwości. Po odebraniu zdarzeń z usługi Event Hubs wszystkie właściwości aplikacji zostaną przekonwertowane na nagłówek komunikatu.
Uwaga
Nagłówki komunikatów klucza partycji, czas w kolejce, przesunięcie i numer sekwencji nie są obsługiwane ręcznie.
Po włączeniu trybu batch-consumer określone nagłówki wsadowych komunikatów są wymienione poniżej, który zawiera listę wartości z każdego pojedynczego zdarzenia usługi Event Hubs.
Mapowanie między komunikatami usługi Event Hubs/ właściwościami zdarzeń i nagłówkami komunikatów spring w trybie odbiornika usługi Batch:
Właściwości zdarzeń usługi Event Hubs | Stałe nagłówka komunikatów spring batch | Type | Opis |
---|---|---|---|
Czas w kolejce | EventHubsHeaders#ENQUEUED_TIME | Lista błyskawicznych | Lista wystąpień w formacie UTC, kiedy każde zdarzenie zostało w kolejce w partycji centrum zdarzeń. |
Przesunięcie | EventHubsHeaders#OFFSET | Lista długich | Lista przesunięcia każdego zdarzenia, które zostało odebrane ze skojarzonej partycji centrum zdarzeń. |
Klucz partycji | AzureHeaders#PARTITION_KEY | Lista ciągów | Lista klucza wyznaczania wartości skrótu partycji, jeśli została ustawiona podczas pierwotnego publikowania każdego zdarzenia. |
Numer sekwencyjny | EventHubsHeaders#SEQUENCE_NUMBER | Lista długich | Lista numerów sekwencji przypisanych do każdego zdarzenia w kolejce w skojarzonej partycji centrum zdarzeń. |
Właściwości systemu | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Lista mapy | Lista właściwości systemowych każdego zdarzenia. |
Właściwości aplikacji | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Lista mapy | Lista właściwości aplikacji każdego zdarzenia, w którym są umieszczane wszystkie dostosowane nagłówki komunikatów lub właściwości zdarzenia. |
Uwaga
Podczas publikowania komunikatów wszystkie powyższe nagłówki wsadowe zostaną usunięte z komunikatów, jeśli istnieją.
Przykłady
Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.
Integracja platformy Spring z usługą Azure Service Bus
Najważniejsze pojęcia
Integracja spring umożliwia uproszczone przesyłanie komunikatów w aplikacjach opartych na platformie Spring i obsługuje integrację z systemami zewnętrznymi za pośrednictwem kart deklaratywnych.
Projekt rozszerzenia Spring Integration for Azure Service Bus udostępnia karty kanału przychodzącego i wychodzącego dla usługi Azure Service Bus.
Uwaga
Interfejsy API obsługi completableFuture zostały wycofane z wersji 2.10.0 i zostały zastąpione przez Reactor Core z wersji 4.0.0. Aby uzyskać szczegółowe informacje, zobacz Javadoc.
Konfiguracja zależności
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Konfigurowanie
Ten starter udostępnia następujące 2 części opcji konfiguracji:
właściwości konfiguracji Połączenie ion
Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Service Bus.
Uwaga
Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzowanie dostępu za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.
Połączenie ion konfigurowalne właściwości rozwiązania spring-cloud-azure-starter-integration-servicebus:
Właściwość | Type | Opis |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolean | Określa, czy usługa Azure Service Bus jest włączona. |
spring.cloud.azure.servicebus.connection-string | String | Przestrzeń nazw usługi Service Bus parametry połączenia wartość. |
spring.cloud.azure.servicebus.namespace | String | Wartość przestrzeni nazw usługi Service Bus, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | String | Nazwa domeny wartości przestrzeni nazw usługi Azure Service Bus. |
Właściwości konfiguracji procesora usługi Service Bus
Funkcja ServiceBusInboundChannelAdapter
używa elementu ServiceBusProcessorClient
do korzystania z komunikatów ServiceBusProcessorClient
w celu skonfigurowania ogólnych właściwości elementu , których deweloperzy mogą używać ServiceBusContainerProperties
do konfiguracji. Zapoznaj się z następującą sekcją dotyczącą pracy z ServiceBusInboundChannelAdapter
programem .
Podstawowy sposób użycia
Wysyłanie komunikatów do usługi Azure Service Bus
Wypełnij opcje konfiguracji poświadczeń.
W przypadku poświadczeń jako parametry połączenia skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Uwaga
Dozwolone tenant-id
wartości to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użycie nieprawidłowego punktu końcowego (konta osobiste i konta organizacji) w sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje w dzierżawie. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Konwertowanie aplikacji z jedną dzierżawą na wielodostępny w usłudze Microsoft Entra ID.
W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Uwaga
Dozwolone tenant-id
wartości to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użycie nieprawidłowego punktu końcowego (konta osobiste i konta organizacji) w sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje w dzierżawie. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Konwertowanie aplikacji z jedną dzierżawą na wielodostępny w usłudze Microsoft Entra ID.
Utwórz
DefaultMessageHandler
za pomocą fasoli,ServiceBusTemplate
aby wysyłać komunikaty do usługi Service Bus, ustaw typ jednostki serviceBusTemplate. W tym przykładzie kolejka usługi Service Bus jest przykładowa.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Wysyłanie komunikatów przy użyciu bramy.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Odbieranie komunikatów z usługi Azure Service Bus
Wypełnij opcje konfiguracji poświadczeń.
Utwórz fasolę kanału komunikatów jako kanał wejściowy.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Utwórz
ServiceBusInboundChannelAdapter
za pomocą fasoliServiceBusMessageListenerContainer
, aby odbierać komunikaty do usługi Service Bus. W tym przykładzie kolejka usługi Service Bus jest przykładowa.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Utwórz powiązanie
ServiceBusInboundChannelAdapter
odbiorcy komunikatów za pośrednictwem utworzonego wcześniej kanału komunikatów.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurowanie obiektu ServiceBusMessageConverter w celu dostosowania obiektuMapper
ServiceBusMessageConverter
element jest konfigurowalny, aby umożliwić użytkownikom dostosowywanie ObjectMapper
elementu .
Nagłówki komunikatów usługi Service Bus
W przypadku niektórych nagłówków usługi Service Bus, które mogą być mapowane na wiele stałych nagłówków Spring, jest wyświetlany priorytet różnych nagłówków Spring.
Mapowanie między nagłówkami usługi Service Bus i nagłówkami spring:
Nagłówki i właściwości komunikatów usługi Service Bus | Stałe nagłówka wiadomości spring | Typ | Konfigurowalny | opis |
---|---|---|---|---|
Typ zawartości | MessageHeaders#CONTENT_TYPE |
String | Tak | Deskryptor RFC2045 typu zawartości komunikatu. |
Identyfikator korelacji | ServiceBusMessageHeaders#CORRELATION_ID |
String | Tak | Identyfikator korelacji komunikatu |
Identyfikator komunikatu | ServiceBusMessageHeaders#MESSAGE_ID |
String | Tak | Identyfikator wiadomości, ten nagłówek ma wyższy priorytet niż MessageHeaders#ID . |
Identyfikator komunikatu | MessageHeaders#ID |
UUID | Tak | Identyfikator komunikatu wiadomości, ten nagłówek ma niższy priorytet niż ServiceBusMessageHeaders#MESSAGE_ID . |
Klucz partycji | ServiceBusMessageHeaders#PARTITION_KEY |
String | Tak | Klucz partycji do wysyłania komunikatu do jednostki podzielonej na partycje. |
Odpowiedz | MessageHeaders#REPLY_CHANNEL |
String | Tak | Adres jednostki do wysyłania odpowiedzi. |
Odpowiedz na identyfikator sesji | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
String | Tak | Wartość właściwości ReplyToGroupId wiadomości. |
Zaplanowana godzina kolejki utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Tak | Data/godzina, w której komunikat powinien być w kolejce w usłudze Service Bus, ten nagłówek ma wyższy priorytet niż AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Zaplanowana godzina kolejki utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Liczba całkowita | Tak | Data/godzina, w której komunikat powinien być w kolejce w usłudze Service Bus, ten nagłówek ma niższy priorytet niż ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
Identyfikator sesji | ServiceBusMessageHeaders#SESSION_ID |
String | Tak | Identyfikator sesji dla jednostki obsługującej sesję. |
Time to live (Czas wygaśnięcia) | ServiceBusMessageHeaders#TIME_TO_LIVE |
Czas trwania | Tak | Czas trwania przed wygaśnięciem tej wiadomości. |
To | ServiceBusMessageHeaders#TO |
String | Tak | Adres "do" komunikatu, zarezerwowany do użycia w przyszłości w scenariuszach routingu i obecnie ignorowany przez samego brokera. |
Subject | ServiceBusMessageHeaders#SUBJECT |
String | Tak | Temat wiadomości. |
Opis błędu utraconych komunikatów | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
String | Nie | Opis komunikatu, który został utracony. |
Przyczyna utraconych listów | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
String | Nie | Powodem, dla którego wiadomość została martwa. |
Źródło utraconych listów | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
String | Nie | Jednostka, w której wiadomość została martwa. |
Liczba dostaw | ServiceBusMessageHeaders#DELIVERY_COUNT |
długi | Nie. | Liczba przypadków dostarczenia tego komunikatu do klientów. |
Numer sekwencji w kolejce | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
długi | Nie. | Numer sekwencji w kolejce przypisany do komunikatu przez usługę Service Bus. |
Czas w kolejce | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Nie. | Data/godzina, w której ten komunikat został w kolejce w usłudze Service Bus. |
Wygasa o godzinie | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Nie. | Data/godzina wygaśnięcia tej wiadomości. |
Blokowanie tokenu | ServiceBusMessageHeaders#LOCK_TOKEN |
String | Nie | Token blokady dla bieżącego komunikatu. |
Zablokowane do | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Nie. | Data/godzina wygaśnięcia blokady tej wiadomości. |
Numer sekwencyjny | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
długi | Nie. | Unikatowy numer przypisany do komunikatu przez usługę Service Bus. |
Stan | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Nie. | Stan komunikatu, który może być aktywny, odroczony lub zaplanowany. |
Obsługa klucza partycji
Ten starter obsługuje partycjonowanie usługi Service Bus, zezwalając na ustawienie klucza partycji i identyfikatora sesji w nagłówku komunikatu. W tej sekcji przedstawiono sposób ustawiania klucza partycji dla komunikatów.
Zalecane: użyj ServiceBusMessageHeaders.PARTITION_KEY
jako klucza nagłówka.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Niezalecane, ale obecnie obsługiwane:AzureHeaders.PARTITION_KEY
jako klucz nagłówka.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Uwaga
Jeśli obie ServiceBusMessageHeaders.PARTITION_KEY
opcje i AzureHeaders.PARTITION_KEY
są ustawione w nagłówkach komunikatów, ServiceBusMessageHeaders.PARTITION_KEY
jest preferowana.
Obsługa sesji
W tym przykładzie pokazano, jak ręcznie ustawić identyfikator sesji komunikatu w aplikacji.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Uwaga
Po ustawieniu elementu ServiceBusMessageHeaders.SESSION_ID
w nagłówkach wiadomości i ustawieniu innego ServiceBusMessageHeaders.PARTITION_KEY
nagłówka wartość identyfikatora sesji zostanie ostatecznie użyta do zastąpienia wartości klucza partycji.
Przykłady
Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.
Integracja platformy Spring z kolejką usługi Azure Storage
Najważniejsze pojęcia
Azure Queue Storage to usługa służąca do przechowywania dużej liczby komunikatów. Uzyskujesz dostęp do komunikatów z dowolnego miejsca na świecie za pośrednictwem uwierzytelnionych wywołań przy użyciu protokołu HTTP lub HTTPS. Komunikat kolejki może mieć rozmiar do 64 KB. Kolejka może zawierać miliony komunikatów do całkowitego limitu pojemności konta magazynu. Kolejki są często używane do tworzenia listy prac w celu asynchronicznego przetwarzania.
Konfiguracja zależności
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Konfigurowanie
Ten starter udostępnia następujące opcje konfiguracji:
właściwości konfiguracji Połączenie ion
Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z kolejką usługi Azure Storage.
Uwaga
Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzowanie dostępu za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.
Połączenie ion konfigurowalne właściwości spring-cloud-azure-starter-integration-storage-queue:
Właściwość | Type | Opis |
---|---|---|
spring.cloud.azure.storage.queue.enabled | boolean | Określa, czy kolejka usługi Azure Storage jest włączona. |
spring.cloud.azure.storage.queue.connection-string | String | Przestrzeń nazw kolejki magazynu parametry połączenia wartość. |
spring.cloud.azure.storage.queue.accountName | String | Nazwa konta kolejki usługi Storage. |
spring.cloud.azure.storage.queue.accountKey | String | Klucz konta kolejki usługi Storage. |
spring.cloud.azure.storage.queue.endpoint | String | Punkt końcowy usługi Kolejka magazynu. |
spring.cloud.azure.storage.queue.sasToken | String | Poświadczenia tokenu sygnatury dostępu współdzielonego |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion używana podczas tworzenia żądań interfejsu API. |
spring.cloud.azure.storage.queue.messageEncoding | String | Kodowanie komunikatów w kolejce. |
Podstawowy sposób użycia
Wysyłanie komunikatów do kolejki usługi Azure Storage
Wypełnij opcje konfiguracji poświadczeń.
W przypadku poświadczeń jako parametry połączenia skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Uwaga
Dozwolone tenant-id
wartości to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użycie nieprawidłowego punktu końcowego (konta osobiste i konta organizacji) w sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje w dzierżawie. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Konwertowanie aplikacji z jedną dzierżawą na wielodostępny w usłudze Microsoft Entra ID.
W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Uwaga
Dozwolone tenant-id
wartości to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użycie nieprawidłowego punktu końcowego (konta osobiste i konta organizacji) w sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje w dzierżawie. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Konwertowanie aplikacji z jedną dzierżawą na wielodostępny w usłudze Microsoft Entra ID.
Utwórz
DefaultMessageHandler
za pomocą fasoli,StorageQueueTemplate
aby wysyłać komunikaty do kolejki magazynu.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Wysyłanie komunikatów przy użyciu bramy.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Odbieranie komunikatów z kolejki usługi Azure Storage
Wypełnij opcje konfiguracji poświadczeń.
Utwórz fasolę kanału komunikatów jako kanał wejściowy.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Utwórz
StorageQueueMessageSource
zaStorageQueueTemplate
pomocą fasoli, aby odbierać komunikaty do kolejki magazynu.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Utwórz powiązanie odbiorcy komunikatów za pomocą elementu StorageQueueMessageSource utworzonego w ostatnim kroku za pośrednictwem utworzonego wcześniej kanału komunikatów.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Przykłady
Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.