Spring Cloud Azure-Unterstützung für Spring Integration
Dieser Artikel gilt für:✅ Version 4.19.0 ✅ Version 5.19.0
Spring Integration Extension for Azure stellt Spring Integration Adapter für die verschiedenen Dienste bereit, die vom Azure SDK für Javabereitgestellt werden. Wir bieten Unterstützung für Spring Integration für diese Azure-Dienste: Event Hubs, Service Bus, Storage Queue. Es folgt eine Liste der unterstützten Adapter:
-
spring-cloud-azure-starter-integration-eventhubs
– weitere Informationen finden Sie unter Spring Integration in Azure Event Hubs -
spring-cloud-azure-starter-integration-servicebus
– weitere Informationen finden Sie unter Spring Integration in Azure Service Bus -
spring-cloud-azure-starter-integration-storage-queue
– weitere Informationen finden Sie unter Spring Integration in Azure Storage Queue
Federintegration mit Azure Event Hubs
Schlüsselkonzepte
Azure Event Hubs ist eine Big Data Streaming-Plattform und ein Ereignisaufnahmedienst. Sie kann Millionen von Ereignissen pro Sekunde empfangen und verarbeiten. An einen Event Hub gesendete Daten können mithilfe eines beliebigen Echtzeitanalyseanbieters oder Batch-/Speicheradapters transformiert und gespeichert werden.
Spring Integration ermöglicht einfaches Messaging in Spring-basierten Anwendungen und unterstützt die Integration mit externen Systemen über deklarative Adapter. Diese Adapter bieten eine höhere Abstraktionsebene über die Unterstützung von Spring für Remoting, Messaging und Planung. Das Spring Integration für Event Hubs Erweiterungsprojekt bietet ein- und ausgehende Kanaladapter und Gateways für Azure Event Hubs.
Anmerkung
RxJava-Support-APIs werden ab Version 4.0.0 gelöscht. Weitere Informationen finden Sie unter Javadoc.
Verbrauchergruppe
Event Hubs bietet ähnliche Unterstützung von Consumergruppen wie Apache Kafka, aber mit leichter unterschiedlicher Logik. Während Kafka alle zugesicherten Offsets im Broker speichert, müssen Sie Offsets von Event Hubs-Nachrichten speichern, die manuell verarbeitet werden. Event Hubs SDK stellt die Funktion zum Speichern solcher Offsets in Azure Storage bereit.
Partitionierungsunterstützung
Event Hubs bietet ein ähnliches Konzept der physischen Partition wie Kafka. Im Gegensatz zu Kafkas automatischem Neuausgleich zwischen Verbrauchern und Partitionen bietet Event Hubs jedoch eine Art präemptiver Modus. Das Speicherkonto fungiert als Lease, um zu bestimmen, welche Partition im Besitz des Verbrauchers ist. Wenn ein neuer Verbraucher beginnt, versucht er, einige Partitionen von den meisten schwer geladenen Verbrauchern zu stehlen, um den Workloadausgleich zu erreichen.
Um die Lastenausgleichsstrategie anzugeben, können Entwickler EventHubsContainerProperties
für die Konfiguration verwenden. Ein Beispiel zum Konfigurieren EventHubsContainerProperties
.
Batch-Consumerunterstützung
Die EventHubsInboundChannelAdapter
unterstützt den Batch-Nutzungsmodus. Um ihn zu aktivieren, können Benutzer den Listenermodus beim Erstellen einer ListenerMode.BATCH
Instanz als EventHubsInboundChannelAdapter
angeben.
Wenn diese Option aktiviert ist, wird eine Nachricht, deren Nutzlast eine Liste von Batchereignissen ist, empfangen und an den nachgeschalteten Kanal übergeben. Jeder Nachrichtenkopf wird auch als Liste konvertiert, von denen der Inhalt der zugeordnete Headerwert ist, der von jedem Ereignis analysiert wird. Für die kommunalen Kopfzeilen der Partitions-ID, des Prüfpunkts und der letzten Queued-Eigenschaften werden sie als einzelner Wert für den gesamten Batch von Ereignissen gemeinsam dargestellt. Weitere Informationen finden Sie im Abschnitt Event Hubs-Nachrichtenkopfzeilen.
Anmerkung
Der Prüfpunktheader ist nur vorhanden, wenn MANUELL Prüfpunktmodus verwendet wird.
Die Prüfpunkterstellung von Batch-Consumer unterstützt zwei Modi: BATCH
und MANUAL
.
BATCH
Modus ist ein automatischer Prüfpunktmodus, um den gesamten Batch von Ereignissen zusammen zu überwachen, sobald sie empfangen wurden.
MANUAL
Modus besteht darin, die Ereignisse von Benutzern zu überprüfen. Bei Verwendung wird der Prüfpunkter- an den Nachrichtenkopf übergeben, und Benutzer können ihn verwenden, um Prüfpunkte zu erledigen.
Die Batchaufwendungsrichtlinie kann durch Eigenschaften von max-size
und max-wait-time
angegeben werden, wobei max-size
eine erforderliche Eigenschaft ist, während max-wait-time
optional ist.
Um die Strategie für die Batchnutzung anzugeben, können Entwickler EventHubsContainerProperties
für die Konfiguration verwenden. Ein Beispiel zum Konfigurieren EventHubsContainerProperties
.
Setup von Abhängigkeiten
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Konfiguration
Dieser Start bietet die folgenden drei Konfigurationsoptionen:
Verbindungskonfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Event Hubs verwendet werden.
Anmerkung
Wenn Sie einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource verwenden, lesen Sie Autorisieren des Zugriffs mit Microsoft Entra ID, um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.
Konfigurierbare Verbindungseigenschaften von spring-cloud-azure-starter-integration-eventhubs:
Eigentum | Art | Beschreibung |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolesch | Gibt an, ob azure Event Hubs aktiviert ist. |
spring.cloud.azure.eventhubs.connection-string | Schnur | Event Hubs Namespace-Verbindungszeichenfolgenwert. |
spring.cloud.azure.eventhubs.namespace | Schnur | Event Hubs-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DomainName bestehen. |
spring.cloud.azure.eventhubs.domain-name | Schnur | Domänenname eines Azure Event Hubs-Namespacewerts. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Schnur | Benutzerdefinierte Endpunktadresse. |
spring.cloud.azure.eventhubs.shared-connection | Boolesch | Gibt an, ob der zugrunde liegende EventProcessorClient und EventHubProducerAsyncClient dieselbe Verbindung verwenden. Standardmäßig wird eine neue Verbindung für jeden erstellten Event Hub-Client erstellt und verwendet. |
Prüfpunktkonfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen für den Speicher-Blobs-Dienst, der zum Beibehalten des Partitionsbesitzes und der Prüfpunktinformationen verwendet wird.
Anmerkung
Ab Version 4.0.0, wenn die Eigenschaft von spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nicht manuell aktiviert ist, wird kein Speichercontainer automatisch erstellt.
Prüfpunkt konfigurierbarer Eigenschaften von spring-cloud-azure-starter-integration-eventhubs:
Eigentum | Art | Beschreibung |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolesch | Gibt an, ob das Erstellen von Containern zulässig ist, falls nicht vorhanden. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Schnur | Name für das Speicherkonto. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Schnur | Zugriffsschlüssel für Speicherkonto. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Schnur | Name des Speichercontainers. |
Allgemeine Azure Service SDK-Konfigurationsoptionen sind auch für den Speicherspeicher für Speicher-Blob-Prüfpunktspeicher konfigurierbar. Die unterstützten Konfigurationsoptionen werden in Spring Cloud Azure-Konfigurationeingeführt und können entweder mit dem einheitlichen Präfix spring.cloud.azure.
oder dem Präfix von spring.cloud.azure.eventhubs.processor.checkpoint-store
konfiguriert werden.
Konfigurationseigenschaften des Event Hub-Prozessors
Die EventHubsInboundChannelAdapter
verwendet die EventProcessorClient
, um Nachrichten von einem Event Hub zu nutzen, um die Gesamteigenschaften eines EventProcessorClient
zu konfigurieren, können Entwickler EventHubsContainerProperties
für die Konfiguration verwenden. Weitere Informationen zum Arbeiten mit EventHubsInboundChannelAdapter
.
Grundlegende Nutzung
Senden von Nachrichten an Azure Event Hubs
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in Ihrer application.yml-Datei:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Anmerkung
Die für tenant-id
zulässigen Werte sind: common
, organizations
, consumers
oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt
Erstellen Sie
DefaultMessageHandler
mit demEventHubsTemplate
Bean, um Nachrichten an Event Hubs zu senden.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Senden von Nachrichten mithilfe des Gateways.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Empfangen von Nachrichten von Azure Event Hubs
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Erstellen Sie einen Nachrichtenkanal als Eingabekanal.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Erstellen Sie
EventHubsInboundChannelAdapter
mit demEventHubsMessageListenerContainer
Bean, um Nachrichten von Event Hubs zu empfangen.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Erstellen Sie eine Nachrichtenempfängerbindung mit EventHubsInboundChannelAdapter über den zuvor erstellten Nachrichtenkanal.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurieren von EventHubsMessageConverter zum Anpassen von objectMapper
EventHubsMessageConverter
wird als konfigurierbare Bohnen erstellt, damit Benutzer ObjectMapper anpassen können.
Batch-Consumerunterstützung
Um Nachrichten von Event Hubs in Batches zu nutzen, ähnelt es dem obigen Beispiel, neben den Benutzern sollten die batchaufwendigen Konfigurationsoptionen für EventHubsInboundChannelAdapter
festgelegt werden.
Beim Erstellen EventHubsInboundChannelAdapter
sollte der Listenermodus als BATCH
festgelegt werden. Legen Sie beim Erstellen von EventHubsMessageListenerContainer
den Prüfpunktmodus entweder als MANUAL
oder BATCH
fest, und die Batchoptionen können bei Bedarf konfiguriert werden.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Event Hubs-Nachrichtenkopfzeilen
In der folgenden Tabelle wird veranschaulicht, wie Die Nachrichteneigenschaften von Event Hubs den Kopfzeilen von Spring-Nachrichten zugeordnet werden. Für Azure Event Hubs wird die Nachricht als event
aufgerufen.
Zuordnung zwischen Event Hubs-Nachrichten-/Ereigniseigenschaften und Federnachrichtenkopfzeilen im Datensatzlistenermodus:
Event Hubs-Ereigniseigenschaften | Federnachrichtenkopfkonstanten | Art | Beschreibung |
---|---|---|---|
Enqueued time | EventHubsHeaders#ENQUEUED_TIME | Augenblick | Der Zeitpunkt, zu dem das Ereignis in UTC in der Event Hub-Partition enqueuiert wurde. |
Offset | EventHubsHeaders#OFFSET | Lang | Der Offset des Ereignisses, als es von der zugeordneten Event Hub-Partition empfangen wurde. |
Partitionsschlüssel | AzureHeaders#PARTITION_KEY | Schnur | Der Partitionshashschlüssel, wenn er beim ursprünglichen Veröffentlichen des Ereignisses festgelegt wurde. |
Partitions-ID | AzureHeaders#RAW_PARTITION_ID | Schnur | Die Partitions-ID des Event Hub. |
Sequenznummer | EventHubsHeaders#SEQUENCE_NUMBER | Lang | Die Sequenznummer, die dem Ereignis zugewiesen wurde, als es in der zugeordneten Event Hub-Partition enqueuiert wurde. |
Letzte Queued-Ereigniseigenschaften | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Die Eigenschaften des letzten enqueued-Ereignisses in dieser Partition. |
NA | AzureHeaders#CHECKPOINTER | Prüfpunkt | Die Kopfzeile für den Prüfpunkt der jeweiligen Nachricht. |
Benutzer können die Nachrichtenkopfzeilen für die zugehörigen Informationen jedes Ereignisses analysieren. Um einen Nachrichtenkopf für das Ereignis festzulegen, werden alle angepassten Kopfzeilen als Anwendungseigenschaft eines Ereignisses platziert, wobei der Header als Eigenschaftsschlüssel festgelegt wird. Wenn Ereignisse von Event Hubs empfangen werden, werden alle Anwendungseigenschaften in den Nachrichtenkopf konvertiert.
Anmerkung
Nachrichtenkopfzeilen des Partitionsschlüssels, enqueuierte Zeit, Offset- und Sequenznummer werden nicht unterstützt, um manuell festgelegt zu werden.
Wenn der Batch-Consumer-Modus aktiviert ist, werden die spezifischen Kopfzeilen von Batchnachrichten wie folgt aufgelistet, die eine Liste der Werte aus jedem einzelnen Event Hubs-Ereignis enthalten.
Zuordnung zwischen Event Hubs Message/Event Properties und Spring Message Headers im Batchlistener-Modus:
Event Hubs-Ereigniseigenschaften | Spring Batch-Nachrichtenkopfkonstanten | Art | Beschreibung |
---|---|---|---|
Enqueued time | EventHubsHeaders#ENQUEUED_TIME | Liste der Sofortnachrichten | Liste der sofortigen Ereignisse in UTC, wann jedes Ereignis in der Event Hub-Partition enqueuiert wurde. |
Offset | EventHubsHeaders#OFFSET | Liste der langen | Liste des Offsets jedes Ereignisses, wenn es von der zugeordneten Event Hub-Partition empfangen wurde. |
Partitionsschlüssel | AzureHeaders#PARTITION_KEY | Liste der Zeichenfolgen | Liste des Partitionshashingschlüssels, wenn er beim ursprünglichen Veröffentlichen jedes Ereignisses festgelegt wurde. |
Sequenznummer | EventHubsHeaders#SEQUENCE_NUMBER | Liste der langen | Liste der Sequenznummer, die jedem Ereignis zugewiesen wurde, als es in der zugeordneten Event Hub-Partition abgefragt wurde. |
Systemeigenschaften | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Liste der Karte | Liste der Systemeigenschaften jedes Ereignisses. |
Anwendungseigenschaften | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Liste der Karte | Liste der Anwendungseigenschaften jedes Ereignisses, in denen alle angepassten Nachrichtenkopfzeilen oder Ereigniseigenschaften platziert werden. |
Anmerkung
Bei der Veröffentlichung von Nachrichten werden alle oben genannten Batchheader aus den Nachrichten entfernt, sofern vorhanden.
Proben
Weitere Informationen finden Sie in den Azure-spring-boot-samples Repository auf GitHub.
Federintegration mit Azure Service Bus
Schlüsselkonzepte
Spring Integration ermöglicht einfaches Messaging in Spring-basierten Anwendungen und unterstützt die Integration mit externen Systemen über deklarative Adapter.
Das Spring Integration for Azure Service Bus-Erweiterungsprojekt bietet ein- und ausgehende Kanaladapter für Azure Service Bus.
Anmerkung
CompletableFuture-Support-APIs sind ab Version 2.10.0 veraltet und werden von Reaktorkern ab Version 4.0.0 ersetzt. Weitere Informationen finden Sie unter Javadoc.
Setup von Abhängigkeiten
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Konfiguration
Dieser Start bietet die folgenden 2 Teile der Konfigurationsoptionen:
Verbindungskonfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Service Bus verwendet werden.
Anmerkung
Wenn Sie einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource verwenden, lesen Sie Autorisieren des Zugriffs mit Microsoft Entra ID, um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.
Konfigurierbare Verbindungseigenschaften von spring-cloud-azure-starter-integration-servicebus:
Eigentum | Art | Beschreibung |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolesch | Gibt an, ob ein Azure Service Bus aktiviert ist. |
spring.cloud.azure.servicebus.connection-string | Schnur | Service Bus-Namespace-Verbindungszeichenfolgenwert. |
spring.cloud.azure.servicebus.custom-endpoint-address | Schnur | Die benutzerdefinierte Endpunktadresse, die beim Herstellen einer Verbindung mit Service Bus verwendet werden soll. |
spring.cloud.azure.servicebus.namespace | Schnur | Service Bus-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DomainName bestehen. |
spring.cloud.azure.servicebus.domain-name | Schnur | Domänenname eines Azure Service Bus-Namespacewerts. |
Konfigurationseigenschaften des Dienstbusprozessors
Die ServiceBusInboundChannelAdapter
verwendet die ServiceBusProcessorClient
, um Nachrichten zu nutzen, um die Gesamteigenschaften eines ServiceBusProcessorClient
zu konfigurieren, können Entwickler ServiceBusContainerProperties
für die Konfiguration verwenden. Weitere Informationen zum Arbeiten mit ServiceBusInboundChannelAdapter
.
Grundlegende Nutzung
Senden von Nachrichten an Azure Service Bus
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Anmerkung
Die für tenant-id
zulässigen Werte sind: common
, organizations
, consumers
oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt
Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in Ihrer application.yml-Datei:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Anmerkung
Die für tenant-id
zulässigen Werte sind: common
, organizations
, consumers
oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt
Erstellen Sie
DefaultMessageHandler
mit demServiceBusTemplate
Bean, um Nachrichten an Service Bus zu senden, und legen Sie den Entitätstyp für die ServiceBusTemplate fest. In diesem Beispiel wird die Servicebuswarteschlange als Beispiel akzeptiert.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Senden von Nachrichten mithilfe des Gateways.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Empfangen von Nachrichten von Azure Service Bus
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Erstellen Sie einen Nachrichtenkanal als Eingabekanal.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Erstellen Sie
ServiceBusInboundChannelAdapter
mit demServiceBusMessageListenerContainer
Bean, um Nachrichten an Service Bus zu empfangen. In diesem Beispiel wird die Servicebuswarteschlange als Beispiel akzeptiert.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Erstellen Sie eine Nachrichtenempfängerbindung mit
ServiceBusInboundChannelAdapter
über den zuvor erstellten Nachrichtenkanal.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Konfigurieren von ServiceBusMessageConverter zum Anpassen von objectMapper
ServiceBusMessageConverter
wird als konfigurierbare Bohnen erstellt, damit Benutzer ObjectMapper
anpassen können.
Dienstbus-Nachrichtenkopfzeilen
Bei einigen ServiceBus-Headern, die mehreren Springheaderkonstanten zugeordnet werden können, wird die Priorität verschiedener Federheader aufgelistet.
Zuordnung zwischen ServiceBus-Headern und Federkopfzeilen:
Service Bus-Nachrichtenkopfzeilen und -eigenschaften | Federnachrichtenkopfkonstanten | Art | Konfigurierbar | Beschreibung |
---|---|---|---|---|
Inhaltstyp | MessageHeaders#CONTENT_TYPE |
Schnur | Ja | Der RFC2045 Inhaltstypdeskriptor der Nachricht. |
Korrelations-ID | ServiceBusMessageHeaders#CORRELATION_ID |
Schnur | Ja | Die Korrelations-ID der Nachricht |
Nachrichten-ID | ServiceBusMessageHeaders#MESSAGE_ID |
Schnur | Ja | Die Nachrichten-ID der Nachricht, diese Kopfzeile hat höhere Priorität als MessageHeaders#ID . |
Nachrichten-ID | MessageHeaders#ID |
UUID | Ja | Die Nachrichten-ID der Nachricht, diese Kopfzeile hat eine niedrigere Priorität als ServiceBusMessageHeaders#MESSAGE_ID . |
Partitionsschlüssel | ServiceBusMessageHeaders#PARTITION_KEY |
Schnur | Ja | Der Partitionsschlüssel zum Senden der Nachricht an eine partitionierte Entität. |
Antworten auf | MessageHeaders#REPLY_CHANNEL |
Schnur | Ja | Die Adresse einer Entität, an die Antworten gesendet werden sollen. |
Antworten auf sitzungs-ID | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Schnur | Ja | Der Wert der ReplyToGroupId-Eigenschaft der Nachricht. |
Geplante Queue-Zeit utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Ja | Der Zeitpunkt, zu dem die Nachricht in Service Bus queuiert werden soll, hat diese Kopfzeile eine höhere Priorität als AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Geplante Queue-Zeit utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Ganze Zahl | Ja | Der Zeitpunkt, zu dem die Nachricht in Service Bus queuiert werden soll, hat diese Kopfzeile eine niedrigere Priorität als ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
Sitzungs-ID | ServiceBusMessageHeaders#SESSION_ID |
Schnur | Ja | Der Sitzungs-IDentifier für eine sitzungsfähige Entität. |
Zeit zum Leben | ServiceBusMessageHeaders#TIME_TO_LIVE |
Dauer | Ja | Die Dauer, bis diese Nachricht abläuft. |
An | ServiceBusMessageHeaders#TO |
Schnur | Ja | Die "to"-Adresse der Nachricht, die für die zukünftige Verwendung in Routingszenarien reserviert ist und derzeit vom Broker selbst ignoriert wird. |
Betreff | ServiceBusMessageHeaders#SUBJECT |
Schnur | Ja | Der Betreff für die Nachricht. |
Beschreibung des Fehlers "Inaktiver Buchstabe" | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Schnur | Nein | Die Beschreibung für eine Nachricht, die inaktiv war. |
Grund für inaktive Buchstaben | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Schnur | Nein | Der Grund, warum eine Nachricht in totschreibend war. |
Quelle für inaktive Buchstaben | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Schnur | Nein | Die Entität, in der die Nachricht inaktiv war. |
Lieferanzahl | ServiceBusMessageHeaders#DELIVERY_COUNT |
lang | Nein | Die Häufigkeit, mit der diese Nachricht an Clients übermittelt wurde. |
Nummer der Enqueued-Sequenz | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
lang | Nein | Die queuierte Sequenznummer, die einer Nachricht von Service Bus zugewiesen ist. |
Enqueued time | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Nein | Das Datum, zu dem diese Nachricht in Service Bus enqueuiert wurde. |
Läuft ab bei | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Nein | Das Datum, zu dem diese Nachricht abläuft. |
Sperrtoken | ServiceBusMessageHeaders#LOCK_TOKEN |
Schnur | Nein | Das Sperrtoken für die aktuelle Nachricht. |
Gesperrt bis | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Nein | Das Datum, zu dem die Sperre dieser Nachricht abläuft. |
Sequenznummer | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
lang | Nein | Die eindeutige Nummer, die einer Nachricht von Service Bus zugewiesen ist. |
Zustand | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Nein | Der Status der Nachricht, die aktiv, verzögert oder geplant sein kann. |
Unterstützung des Partitionsschlüssels
Dieser Starter unterstützt Service Bus-Partitionierung durch Festlegen des Partitionsschlüssels und der Sitzungs-ID im Nachrichtenheader. In diesem Abschnitt wird erläutert, wie Sie den Partitionsschlüssel für Nachrichten festlegen.
Empfohlen: Verwenden Sie ServiceBusMessageHeaders.PARTITION_KEY
als Schlüssel der Kopfzeile.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Nicht empfohlen, derzeit unterstützt: AzureHeaders.PARTITION_KEY
als Schlüssel der Kopfzeile.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Anmerkung
Wenn sowohl ServiceBusMessageHeaders.PARTITION_KEY
als auch AzureHeaders.PARTITION_KEY
in den Nachrichtenkopfzeilen festgelegt werden, wird ServiceBusMessageHeaders.PARTITION_KEY
bevorzugt.
Sitzungsunterstützung
In diesem Beispiel wird veranschaulicht, wie Die Sitzungs-ID einer Nachricht in der Anwendung manuell festgelegt wird.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Anmerkung
Wenn die ServiceBusMessageHeaders.SESSION_ID
in den Nachrichtenkopfzeilen festgelegt wird und auch ein anderer ServiceBusMessageHeaders.PARTITION_KEY
Header festgelegt wird, wird der Wert der Sitzungs-ID schließlich verwendet, um den Wert des Partitionsschlüssels zu überschreiben.
Anpassen von ServiceBus-Clienteigenschaften
Entwickler können AzureServiceClientBuilderCustomizer
zum Anpassen von Service Bus-Clienteigenschaften verwenden. Im folgenden Beispiel wird die eigenschaft sessionIdleTimeout
in ServiceBusClientBuilder
angepasst:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Proben
Weitere Informationen finden Sie in den Azure-spring-boot-samples Repository auf GitHub.
Spring Integration in Azure Storage Queue
Schlüsselkonzepte
Azure Queue Storage ist ein Dienst zum Speichern großer Anzahl von Nachrichten. Sie greifen über authentifizierte Anrufe über HTTP oder HTTPS auf Nachrichten von überall auf der Welt zu. Eine Warteschlangennachricht kann bis zu 64 KB groß sein. Eine Warteschlange kann Millionen von Nachrichten bis zum Gesamtkapazitätslimit eines Speicherkontos enthalten. Warteschlangen werden häufig verwendet, um einen Arbeitsbestand zu erstellen, der asynchron verarbeitet werden kann.
Setup von Abhängigkeiten
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Konfiguration
Dieser Start bietet die folgenden Konfigurationsoptionen:
Verbindungskonfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit der Azure Storage-Warteschlange verwendet werden.
Anmerkung
Wenn Sie einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource verwenden, lesen Sie Autorisieren des Zugriffs mit Microsoft Entra ID, um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.
Konfigurierbare Verbindungseigenschaften von spring-cloud-azure-starter-integration-storage-queue:
Eigentum | Art | Beschreibung |
---|---|---|
spring.cloud.azure.storage.queue.enabled | boolesch | Gibt an, ob eine Azure Storage-Warteschlange aktiviert ist. |
spring.cloud.azure.storage.queue.connection-string | Schnur | Namespace-Verbindungszeichenfolgenwert der Speicherwarteschlange. |
spring.cloud.azure.storage.queue.accountName | Schnur | Name des Speicherwarteschlangenkontos. |
spring.cloud.azure.storage.queue.accountKey | Schnur | Speicherwarteschlangenkontoschlüssel. |
spring.cloud.azure.storage.queue.endpoint | Schnur | Endpunkt des Speicherwarteschlangendiensts. |
spring.cloud.azure.storage.queue.sasToken | Schnur | Sas-Token-Anmeldeinformationen |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion, die beim Erstellen von API-Anforderungen verwendet wird. |
spring.cloud.azure.storage.queue.messageEncoding | Schnur | Codierung von Warteschlangennachrichten. |
Grundlegende Nutzung
Senden von Nachrichten an die Azure Storage-Warteschlange
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Anmerkung
Die für tenant-id
zulässigen Werte sind: common
, organizations
, consumers
oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt
Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in Ihrer application.yml-Datei:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Anmerkung
Die für tenant-id
zulässigen Werte sind: common
, organizations
, consumers
oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt
Erstellen Sie
DefaultMessageHandler
mit demStorageQueueTemplate
Bean, um Nachrichten an die Speicherwarteschlange zu senden.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Senden von Nachrichten mithilfe des Gateways.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Empfangen von Nachrichten aus der Azure Storage-Warteschlange
Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.
Erstellen Sie einen Nachrichtenkanal als Eingabekanal.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Erstellen Sie
StorageQueueMessageSource
mit demStorageQueueTemplate
Bean, um Nachrichten an die Speicherwarteschlange zu empfangen.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Erstellen Sie eine Nachrichtenempfängerbindung mit StorageQueueMessageSource, die im letzten Schritt über den zuvor erstellten Nachrichtenkanal erstellt wurde.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Proben
Weitere Informationen finden Sie in den Azure-spring-boot-samples Repository auf GitHub.