Obsługa platformy Azure platformy Spring dla platformy Spring Integration
Ten artykuł dotyczy:✅ w wersji 4.19.0 ✅ w wersji 5.19.0
Rozszerzenie Spring Integration Extension dla platformy Azure udostępnia karty integracji spring dla różnych usług udostępnianych przez zestaw Azure SDK dla języka Java. Zapewniamy obsługę integracji spring dla tych usług platformy Azure: Event Hubs, Service Bus, Storage Queue. Poniżej znajduje się lista obsługiwanych kart:
-
spring-cloud-azure-starter-integration-eventhubs
— aby uzyskać więcej informacji, zobacz Spring Integration with Azure Event Hubs -
spring-cloud-azure-starter-integration-servicebus
— aby uzyskać więcej informacji, zobacz Spring Integration with Azure Service Bus (Integracja Spring z usługą Azure Service Bus -
spring-cloud-azure-starter-integration-storage-queue
— aby uzyskać więcej informacji, zobacz Spring Integration with Azure Storage Queue
Integracja platformy Spring z usługą Azure Event Hubs
Kluczowe pojęcia
Azure Event Hubs to platforma przesyłania strumieniowego danych big data i usługa pozyskiwania zdarzeń. Może odbierać i przetwarzać miliony zdarzeń na sekundę. Dane wysyłane do centrum zdarzeń można przekształcać i przechowywać przy użyciu dowolnego dostawcy analizy w czasie rzeczywistym lub kart wsadowych/magazynowych.
Integracja spring umożliwia uproszczone przesyłanie komunikatów w aplikacjach opartych na platformie Spring i obsługuje integrację z systemami zewnętrznymi za pośrednictwem kart deklaratywnych. Te karty zapewniają wyższy poziom abstrakcji dzięki obsłudze komunikacji wirtualnej, obsługi komunikatów i planowania platformy Spring. Projekt rozszerzenia Spring Integration for Event Hubs udostępnia karty i bramy kanału przychodzącego i wychodzącego dla usługi Azure Event Hubs.
Nuta
Interfejsy API obsługi RxJava są porzucane z wersji 4.0.0. Aby uzyskać szczegółowe informacje, zobacz Javadoc.
Grupa odbiorców
Usługa Event Hubs zapewnia podobną obsługę grupy odbiorców jako platformę Apache Kafka, ale z niewielką inną logiką. Podczas gdy platforma Kafka przechowuje wszystkie zatwierdzone przesunięcia w brokerze, musisz przechowywać przesunięcia komunikatów usługi Event Hubs przetwarzanych ręcznie. Zestaw SDK usługi Event Hubs udostępnia funkcję do przechowywania takich przesunięć w usłudze Azure Storage.
Obsługa partycjonowania
Usługa Event Hubs udostępnia podobną koncepcję partycji fizycznej jako platformy Kafka. Jednak w przeciwieństwie do automatycznego równoważenia obciążenia platformy Kafka między użytkownikami i partycjami usługa Event Hubs zapewnia rodzaj trybu wyprzedzania. Konto magazynu działa jako dzierżawa, aby określić, która partycja jest własnością użytkownika. Po uruchomieniu nowego użytkownika spróbuje ukraść niektóre partycje od najbardziej obciążonych użytkowników w celu osiągnięcia równoważenia obciążenia.
Aby określić strategię równoważenia obciążenia, deweloperzy mogą używać EventHubsContainerProperties
dla konfiguracji. Zobacz poniższej sekcji przykład konfigurowania EventHubsContainerProperties
.
Obsługa konsumentów usługi Batch
EventHubsInboundChannelAdapter
obsługuje tryb przetwarzania wsadowego. Aby ją włączyć, użytkownicy mogą określić tryb odbiornika jako ListenerMode.BATCH
podczas konstruowania wystąpienia EventHubsInboundChannelAdapter
.
Po włączeniu komunikat, którego ładunek jest listą zdarzeń wsadowych, zostanie odebrana i przekazana do kanału podrzędnego. Każdy nagłówek wiadomości jest również konwertowany jako lista, z której zawartość jest skojarzona wartość nagłówka analizowana z każdego zdarzenia. W przypadku nagłówków wspólnych identyfikatora partycji, modułu kontrolnego i ostatnio w kolejce właściwości są one prezentowane jako pojedyncza wartość dla całej partii zdarzeń współudzielonych w tej samej partii zdarzeń. Aby uzyskać więcej informacji, zobacz sekcję Nagłówki komunikatów usługi Event Hubs.
Nuta
Nagłówek punktu kontrolnego istnieje tylko wtedy, gdy jest używany ręczny tryb punktu kontrolnego.
Punkt kontrolny odbiorcy wsadowego obsługuje dwa tryby: BATCH
i MANUAL
.
BATCH
tryb jest trybem automatycznego tworzenia punktów kontrolnych w celu sprawdzenia całej partii zdarzeń jednocześnie po ich odebraniu.
MANUAL
tryb polega na określeniu punktów kontrolnych zdarzeń przez użytkowników. W przypadku użycia Checkpointer zostaną przekazane do nagłówka wiadomości, a użytkownicy mogą używać go do tworzenia punktów kontrolnych.
Zasady zużywania wsadowego mogą być określane przez właściwości max-size
i max-wait-time
, gdzie max-size
jest niezbędną właściwością, podczas gdy max-wait-time
jest opcjonalna.
Aby określić strategię zużywania wsadowego, deweloperzy mogą używać EventHubsContainerProperties
dla konfiguracji. Zobacz poniższej sekcji przykład konfigurowania EventHubsContainerProperties
.
Konfiguracja zależności
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Konfiguracja
Ten starter udostępnia następujące 3 części opcji konfiguracji:
Właściwości konfiguracji połączenia
Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Event Hubs.
Nuta
Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzuj dostęp za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.
Konfigurowalne właściwości połączenia spring-cloud-azure-starter-integration-eventhubs:
Własność | Typ | Opis |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolowski | Określa, czy usługa Azure Event Hubs jest włączona. |
spring.cloud.azure.eventhubs.connection-string | Struna | Wartość parametrów połączenia przestrzeni nazw usługi Event Hubs. |
spring.cloud.azure.eventhubs.namespace | Struna | Wartość przestrzeni nazw usługi Event Hubs, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Struna | Nazwa domeny wartości przestrzeni nazw usługi Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Struna | Niestandardowy adres punktu końcowego. |
spring.cloud.azure.eventhubs.shared-connection | Boolowski | Czy bazowe klasy EventProcessorClient i EventHubProducerAsyncClient używają tego samego połączenia. Domyślnie tworzone jest nowe połączenie i używane dla każdego utworzonego klienta centrum zdarzeń. |
Właściwości konfiguracji punktu kontrolnego
Ta sekcja zawiera opcje konfiguracji usługi Storage Blobs Service, która jest używana do utrwalania własności partycji i informacji o punkcie kontrolnym.
Nuta
W wersji 4.0.0, jeśli właściwość spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nie jest włączona ręcznie, żaden kontener magazynu nie zostanie utworzony automatycznie.
Tworzenie punktów kontrolnych konfigurowalnych właściwości spring-cloud-azure-starter-integration-eventhubs:
Własność | Typ | Opis |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolowski | Czy zezwolić na tworzenie kontenerów, jeśli nie istnieje. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Struna | Nazwa konta magazynu. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Struna | Klucz dostępu do konta magazynu. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Struna | Nazwa kontenera magazynu. |
Typowe opcje konfiguracji zestawu SDK usługi Azure Service można również konfigurować dla magazynu punktów kontrolnych obiektów blob usługi Storage. Obsługiwane opcje konfiguracji są wprowadzane w konfiguracji platformy Azure Spring Cloudi można je skonfigurować za pomocą ujednoliconego prefiksu spring.cloud.azure.
lub prefiksu spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Właściwości konfiguracji procesora centrum zdarzeń
EventHubsInboundChannelAdapter
używa EventProcessorClient
do korzystania z komunikatów z centrum zdarzeń, aby skonfigurować ogólne właściwości EventProcessorClient
, deweloperzy mogą używać EventHubsContainerProperties
do konfiguracji. Zobacz poniższej sekcji o sposobie pracy z EventHubsInboundChannelAdapter
.
Podstawowe użycie
Wysyłanie komunikatów do usługi Azure Event Hubs
Wypełnij opcje konfiguracji poświadczeń.
W przypadku poświadczeń jako parametrów połączenia skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Nuta
Dozwolone wartości dla tenant-id
to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.
Utwórz
DefaultMessageHandler
za pomocą fasoliEventHubsTemplate
do wysyłania komunikatów do usługi Event Hubs.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Wysyłanie komunikatów przy użyciu bramy.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Odbieranie komunikatów z usługi Azure Event Hubs
Wypełnij opcje konfiguracji poświadczeń.
Utwórz fasolę kanału komunikatów jako kanał wejściowy.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Utwórz
EventHubsInboundChannelAdapter
przy użyciu fasoliEventHubsMessageListenerContainer
w celu odbierania komunikatów z usługi Event Hubs.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Utwórz powiązanie odbiorcy komunikatów za pomocą elementu EventHubsInboundChannelAdapter za pośrednictwem utworzonego wcześniej kanału komunikatów.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurowanie obiektu EventHubsMessageConverter w celu dostosowania obiektuMapper
EventHubsMessageConverter
jest stosowana jako konfigurowalna fasola umożliwiająca użytkownikom dostosowywanie obiektuMapper.
Obsługa konsumentów usługi Batch
Aby korzystać z komunikatów z usługi Event Hubs w partiach, podobnie jak w przypadku powyższego przykładu, oprócz tego użytkownicy powinni ustawić opcje konfiguracji związane z przetwarzaniem wsadowym dla EventHubsInboundChannelAdapter
.
Podczas tworzenia EventHubsInboundChannelAdapter
tryb odbiornika powinien być ustawiony jako BATCH
. Podczas tworzenia fasoli EventHubsMessageListenerContainer
ustaw tryb punktu kontrolnego jako MANUAL
lub BATCH
, a opcje wsadowe można skonfigurować zgodnie z potrzebami.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Nagłówki komunikatów usługi Event Hubs
W poniższej tabeli przedstawiono sposób mapowania właściwości komunikatów usługi Event Hubs na nagłówki komunikatów spring. W przypadku usługi Azure Event Hubs komunikat jest wywoływany jako event
.
Mapowanie między komunikatami usługi Event Hubs/właściwościami zdarzeń i nagłówkami komunikatów spring w trybie odbiornika rekordu:
Właściwości zdarzeń usługi Event Hubs | Stałe nagłówka komunikatów spring | Typ | Opis |
---|---|---|---|
Czas w kolejce | EventHubsHeaders#ENQUEUED_TIME | Chwila | Moment, w formacie UTC, kiedy zdarzenie zostało w kolejce w partycji centrum zdarzeń. |
Przesunięcie | EventHubsHeaders#OFFSET | Długi | Przesunięcie zdarzenia, gdy zostało odebrane z skojarzonej partycji centrum zdarzeń. |
Klucz partycji | AzureHeaders#PARTITION_KEY | Struna | Klucz wyznaczania wartości skrótu partycji, jeśli został ustawiony podczas pierwotnego publikowania zdarzenia. |
Identyfikator partycji | AzureHeaders#RAW_PARTITION_ID | Struna | Identyfikator partycji centrum zdarzeń. |
Numer sekwencji | EventHubsHeaders#SEQUENCE_NUMBER | Długi | Numer sekwencji przypisany do zdarzenia, gdy został on w kolejce w skojarzonej partycji centrum zdarzeń. |
Ostatnie właściwości zdarzeń w kolejce | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Właściwości ostatniego zdarzenia w kolejce w tej partycji. |
NIE | AzureHeaders#CHECKPOINTER | Checkpointer | Nagłówek punktu kontrolnego dla określonego komunikatu. |
Użytkownicy mogą analizować nagłówki komunikatów dla powiązanych informacji o każdym zdarzeniu. Aby ustawić nagłówek komunikatu dla zdarzenia, wszystkie dostosowane nagłówki zostaną umieszczone jako właściwość aplikacji zdarzenia, gdzie nagłówek jest ustawiony jako klucz właściwości. Po odebraniu zdarzeń z usługi Event Hubs wszystkie właściwości aplikacji zostaną przekonwertowane na nagłówek komunikatu.
Nuta
Nagłówki komunikatów klucza partycji, czas w kolejce, przesunięcie i numer sekwencji nie są obsługiwane ręcznie.
Po włączeniu trybu batch-consumer określone nagłówki wsadowych komunikatów są wymienione poniżej, który zawiera listę wartości z każdego pojedynczego zdarzenia usługi Event Hubs.
Mapowanie między komunikatami usługi Event Hubs/ właściwościami zdarzeń i nagłówkami komunikatów spring w trybie odbiornika usługi Batch:
Właściwości zdarzeń usługi Event Hubs | Stałe nagłówka komunikatów spring batch | Typ | Opis |
---|---|---|---|
Czas w kolejce | EventHubsHeaders#ENQUEUED_TIME | Lista błyskawicznych | Lista wystąpień w formacie UTC, kiedy każde zdarzenie zostało w kolejce w partycji centrum zdarzeń. |
Przesunięcie | EventHubsHeaders#OFFSET | Lista długich | Lista przesunięcia każdego zdarzenia, które zostało odebrane ze skojarzonej partycji centrum zdarzeń. |
Klucz partycji | AzureHeaders#PARTITION_KEY | Lista ciągów | Lista klucza wyznaczania wartości skrótu partycji, jeśli została ustawiona podczas pierwotnego publikowania każdego zdarzenia. |
Numer sekwencji | EventHubsHeaders#SEQUENCE_NUMBER | Lista długich | Lista numerów sekwencji przypisanych do każdego zdarzenia w kolejce w skojarzonej partycji centrum zdarzeń. |
Właściwości systemu | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Lista mapy | Lista właściwości systemowych każdego zdarzenia. |
Właściwości aplikacji | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Lista mapy | Lista właściwości aplikacji każdego zdarzenia, w którym są umieszczane wszystkie dostosowane nagłówki komunikatów lub właściwości zdarzenia. |
Nuta
Podczas publikowania komunikatów wszystkie powyższe nagłówki wsadowe zostaną usunięte z komunikatów, jeśli istnieją.
Próbki
Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.
Integracja platformy Spring z usługą Azure Service Bus
Kluczowe pojęcia
Integracja spring umożliwia uproszczone przesyłanie komunikatów w aplikacjach opartych na platformie Spring i obsługuje integrację z systemami zewnętrznymi za pośrednictwem kart deklaratywnych.
Projekt rozszerzenia Spring Integration for Azure Service Bus udostępnia karty kanału przychodzącego i wychodzącego dla usługi Azure Service Bus.
Nuta
Interfejsy API obsługi completableFuture zostały wycofane z wersji 2.10.0 i zostały zastąpione przez Reactor Core z wersji 4.0.0. Aby uzyskać szczegółowe informacje, zobacz Javadoc.
Konfiguracja zależności
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Konfiguracja
Ten starter udostępnia następujące 2 części opcji konfiguracji:
Właściwości konfiguracji połączenia
Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z usługą Azure Service Bus.
Nuta
Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzuj dostęp za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.
Konfigurowalne właściwości połączenia spring-cloud-azure-starter-integration-servicebus:
Własność | Typ | Opis |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolowski | Określa, czy usługa Azure Service Bus jest włączona. |
spring.cloud.azure.servicebus.connection-string | Struna | Wartość parametrów połączenia przestrzeni nazw usługi Service Bus. |
spring.cloud.azure.servicebus.custom-endpoint-address | Struna | Niestandardowy adres punktu końcowego do użycia podczas nawiązywania połączenia z usługą Service Bus. |
spring.cloud.azure.servicebus.namespace | Struna | Wartość przestrzeni nazw usługi Service Bus, która jest prefiksem nazwy FQDN. Nazwa FQDN powinna składać się z nazwy NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Struna | Nazwa domeny wartości przestrzeni nazw usługi Azure Service Bus. |
Właściwości konfiguracji procesora usługi Service Bus
ServiceBusInboundChannelAdapter
używa ServiceBusProcessorClient
do korzystania z komunikatów, aby skonfigurować ogólne właściwości ServiceBusProcessorClient
, deweloperzy mogą używać ServiceBusContainerProperties
do konfiguracji. Zobacz poniższej sekcji o sposobie pracy z ServiceBusInboundChannelAdapter
.
Podstawowe użycie
Wysyłanie komunikatów do usługi Azure Service Bus
Wypełnij opcje konfiguracji poświadczeń.
W przypadku poświadczeń jako parametrów połączenia skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Nuta
Dozwolone wartości dla tenant-id
to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.
W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Nuta
Dozwolone wartości dla tenant-id
to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.
Utwórz
DefaultMessageHandler
za pomocą fasoliServiceBusTemplate
, aby wysyłać komunikaty do usługi Service Bus, ustaw typ jednostki serviceBusTemplate. W tym przykładzie kolejka usługi Service Bus jest przykładowa.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Wysyłanie komunikatów przy użyciu bramy.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Odbieranie komunikatów z usługi Azure Service Bus
Wypełnij opcje konfiguracji poświadczeń.
Utwórz fasolę kanału komunikatów jako kanał wejściowy.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Utwórz
ServiceBusInboundChannelAdapter
przy użyciu fasoliServiceBusMessageListenerContainer
w celu odbierania komunikatów do usługi Service Bus. W tym przykładzie kolejka usługi Service Bus jest przykładowa.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Utwórz powiązanie odbiorcy komunikatów z
ServiceBusInboundChannelAdapter
za pośrednictwem utworzonego wcześniej kanału wiadomości.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurowanie obiektu ServiceBusMessageConverter w celu dostosowania obiektuMapper
ServiceBusMessageConverter
jest stosowana jako konfigurowalna fasola umożliwiająca użytkownikom dostosowywanie ObjectMapper
.
Nagłówki komunikatów usługi Service Bus
W przypadku niektórych nagłówków usługi Service Bus, które mogą być mapowane na wiele stałych nagłówków Spring, jest wyświetlany priorytet różnych nagłówków Spring.
Mapowanie między nagłówkami usługi Service Bus i nagłówkami spring:
Nagłówki i właściwości komunikatów usługi Service Bus | Stałe nagłówka wiadomości spring | Typ | Konfigurowalne | Opis |
---|---|---|---|---|
Typ zawartości | MessageHeaders#CONTENT_TYPE |
Struna | Tak | Deskryptor RFC2045 typu zawartości komunikatu. |
Identyfikator korelacji | ServiceBusMessageHeaders#CORRELATION_ID |
Struna | Tak | Identyfikator korelacji komunikatu |
Identyfikator komunikatu | ServiceBusMessageHeaders#MESSAGE_ID |
Struna | Tak | Identyfikator komunikatu komunikatu, ten nagłówek ma wyższy priorytet niż MessageHeaders#ID . |
Identyfikator komunikatu | MessageHeaders#ID |
Identyfikator UUID | Tak | Identyfikator komunikatu wiadomości, ten nagłówek ma niższy priorytet niż ServiceBusMessageHeaders#MESSAGE_ID . |
Klucz partycji | ServiceBusMessageHeaders#PARTITION_KEY |
Struna | Tak | Klucz partycji do wysyłania komunikatu do jednostki podzielonej na partycje. |
Odpowiedz na | MessageHeaders#REPLY_CHANNEL |
Struna | Tak | Adres jednostki do wysyłania odpowiedzi. |
Odpowiedz na identyfikator sesji | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Struna | Tak | Wartość właściwości ReplyToGroupId wiadomości. |
Zaplanowana godzina kolejki utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Tak | Data/godzina, w której komunikat powinien być w kolejce w usłudze Service Bus, ten nagłówek ma wyższy priorytet niż AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Zaplanowana godzina kolejki utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Liczba całkowita | Tak | Data/godzina, w której komunikat powinien być w kolejce w usłudze Service Bus, ten nagłówek ma niższy priorytet niż ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
Identyfikator sesji | ServiceBusMessageHeaders#SESSION_ID |
Struna | Tak | Identyfikator sesji dla jednostki obsługującej sesję. |
Czas wygaśnięcia | ServiceBusMessageHeaders#TIME_TO_LIVE |
Czas trwania | Tak | Czas trwania przed wygaśnięciem tej wiadomości. |
Do | ServiceBusMessageHeaders#TO |
Struna | Tak | Adres "do" komunikatu, zarezerwowany do użycia w przyszłości w scenariuszach routingu i obecnie ignorowany przez samego brokera. |
Temat | ServiceBusMessageHeaders#SUBJECT |
Struna | Tak | Temat wiadomości. |
Opis błędu utraconych komunikatów | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Struna | Nie | Opis komunikatu, który został utracony. |
Przyczyna utraconych listów | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Struna | Nie | Powodem, dla którego wiadomość została martwa. |
Źródło utraconych listów | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Struna | Nie | Jednostka, w której wiadomość została martwa. |
Liczba dostaw | ServiceBusMessageHeaders#DELIVERY_COUNT |
długi | Nie | Liczba przypadków dostarczenia tego komunikatu do klientów. |
Numer sekwencji w kolejce | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
długi | Nie | Numer sekwencji w kolejce przypisany do komunikatu przez usługę Service Bus. |
Czas w kolejce | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Nie | Data/godzina, w której ten komunikat został w kolejce w usłudze Service Bus. |
Wygasa o godzinie | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Nie | Data/godzina wygaśnięcia tej wiadomości. |
Blokowanie tokenu | ServiceBusMessageHeaders#LOCK_TOKEN |
Struna | Nie | Token blokady dla bieżącego komunikatu. |
Zablokowane do | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Nie | Data/godzina wygaśnięcia blokady tej wiadomości. |
Numer sekwencji | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
długi | Nie | Unikatowy numer przypisany do komunikatu przez usługę Service Bus. |
Stan | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Nie | Stan komunikatu, który może być aktywny, odroczony lub zaplanowany. |
Obsługa klucza partycji
Ten starter obsługuje partycjonowanie usługi Service Bus, umożliwiając ustawienie klucza partycji i identyfikatora sesji w nagłówku komunikatu. W tej sekcji przedstawiono sposób ustawiania klucza partycji dla komunikatów.
Zalecane: użyj ServiceBusMessageHeaders.PARTITION_KEY
jako klucza nagłówka.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Niezalecane, ale obecnie obsługiwane: AzureHeaders.PARTITION_KEY
jako klucz nagłówka.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nuta
Jeśli obie ServiceBusMessageHeaders.PARTITION_KEY
i AzureHeaders.PARTITION_KEY
są ustawione w nagłówkach komunikatów, preferowane jest ServiceBusMessageHeaders.PARTITION_KEY
.
Obsługa sesji
W tym przykładzie pokazano, jak ręcznie ustawić identyfikator sesji komunikatu w aplikacji.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nuta
Gdy ServiceBusMessageHeaders.SESSION_ID
jest ustawiona w nagłówkach wiadomości, a inny nagłówek ServiceBusMessageHeaders.PARTITION_KEY
jest również ustawiony, wartość identyfikatora sesji zostanie ostatecznie użyta do zastąpienia wartości klucza partycji.
Dostosowywanie właściwości klienta usługi Service Bus
Deweloperzy mogą używać AzureServiceClientBuilderCustomizer
do dostosowywania właściwości klienta usługi Service Bus. Poniższy przykład dostosowuje właściwość sessionIdleTimeout
w ServiceBusClientBuilder
:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Próbki
Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.
Integracja platformy Spring z kolejką usługi Azure Storage
Kluczowe pojęcia
Azure Queue Storage to usługa do przechowywania dużej liczby komunikatów. Uzyskujesz dostęp do komunikatów z dowolnego miejsca na świecie za pośrednictwem uwierzytelnionych wywołań przy użyciu protokołu HTTP lub HTTPS. Komunikat kolejki może mieć rozmiar do 64 KB. Kolejka może zawierać miliony komunikatów do całkowitego limitu pojemności konta magazynu. Kolejki są często używane do tworzenia listy prac w celu asynchronicznego przetwarzania.
Konfiguracja zależności
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Konfiguracja
Ten starter udostępnia następujące opcje konfiguracji:
Właściwości konfiguracji połączenia
Ta sekcja zawiera opcje konfiguracji używane do nawiązywania połączenia z kolejką usługi Azure Storage.
Nuta
Jeśli zdecydujesz się użyć podmiotu zabezpieczeń do uwierzytelniania i autoryzacji za pomocą identyfikatora Entra firmy Microsoft na potrzeby uzyskiwania dostępu do zasobu platformy Azure, zobacz Autoryzuj dostęp za pomocą identyfikatora Entra firmy Microsoft, aby upewnić się, że podmiot zabezpieczeń otrzymał wystarczające uprawnienia dostępu do zasobu platformy Azure.
Konfigurowalne właściwości połączenia spring-cloud-azure-starter-integration-storage-queue:
Własność | Typ | Opis |
---|---|---|
spring.cloud.azure.storage.queue.enabled | boolowski | Określa, czy kolejka usługi Azure Storage jest włączona. |
spring.cloud.azure.storage.queue.connection-string | Struna | Wartość parametrów połączenia przestrzeni nazw kolejki magazynu. |
spring.cloud.azure.storage.queue.accountName | Struna | Nazwa konta kolejki usługi Storage. |
spring.cloud.azure.storage.queue.accountKey | Struna | Klucz konta kolejki usługi Storage. |
spring.cloud.azure.storage.queue.endpoint | Struna | Punkt końcowy usługi Kolejka magazynu. |
spring.cloud.azure.storage.queue.sasToken | Struna | Poświadczenia tokenu sygnatury dostępu współdzielonego |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion używana podczas tworzenia żądań interfejsu API. |
spring.cloud.azure.storage.queue.messageEncoding | Struna | Kodowanie komunikatów w kolejce. |
Podstawowe użycie
Wysyłanie komunikatów do kolejki usługi Azure Storage
Wypełnij opcje konfiguracji poświadczeń.
W przypadku poświadczeń jako parametrów połączenia skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
W przypadku poświadczeń jako tożsamości zarządzanych skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Nuta
Dozwolone wartości dla tenant-id
to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.
W przypadku poświadczeń jako jednostki usługi skonfiguruj następujące właściwości w pliku application.yml:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Nuta
Dozwolone wartości dla tenant-id
to: common
, organizations
, consumers
lub identyfikator dzierżawy. Aby uzyskać więcej informacji na temat tych wartości, zobacz sekcję Użyto nieprawidłowego punktu końcowego (kont osobistych i organizacji) sekcji Błąd AADSTS50020 — konto użytkownika od dostawcy tożsamości nie istnieje wdzierżawy. Aby uzyskać informacje na temat konwertowania aplikacji z jedną dzierżawą, zobacz Convert single-tenant app to multitenant on Microsoft Entra ID.
Utwórz
DefaultMessageHandler
za pomocą fasoliStorageQueueTemplate
do wysyłania komunikatów do kolejki magazynu.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Utwórz powiązanie bramy komunikatów z powyższym programem obsługi komunikatów za pośrednictwem kanału komunikatów.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Wysyłanie komunikatów przy użyciu bramy.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Odbieranie komunikatów z kolejki usługi Azure Storage
Wypełnij opcje konfiguracji poświadczeń.
Utwórz fasolę kanału komunikatów jako kanał wejściowy.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Utwórz
StorageQueueMessageSource
za pomocą fasoliStorageQueueTemplate
w celu odbierania komunikatów do kolejki magazynu.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Utwórz powiązanie odbiorcy komunikatów za pomocą elementu StorageQueueMessageSource utworzonego w ostatnim kroku za pośrednictwem utworzonego wcześniej kanału komunikatów.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Próbki
Aby uzyskać więcej informacji, zobacz repozytorium azure-spring-boot-samples w witrynie GitHub.