Freigeben über


Spring Cloud Azure-Unterstützung für Spring Integration

Dieser Artikel gilt für:✅ Version 4.19.0 ✅ Version 5.19.0

Spring Integration Extension for Azure stellt Spring Integration Adapter für die verschiedenen Dienste bereit, die vom Azure SDK für Javabereitgestellt werden. Wir bieten Unterstützung für Spring Integration für diese Azure-Dienste: Event Hubs, Service Bus, Storage Queue. Es folgt eine Liste der unterstützten Adapter:

Federintegration mit Azure Event Hubs

Schlüsselkonzepte

Azure Event Hubs ist eine Big Data Streaming-Plattform und ein Ereignisaufnahmedienst. Sie kann Millionen von Ereignissen pro Sekunde empfangen und verarbeiten. An einen Event Hub gesendete Daten können mithilfe eines beliebigen Echtzeitanalyseanbieters oder Batch-/Speicheradapters transformiert und gespeichert werden.

Spring Integration ermöglicht einfaches Messaging in Spring-basierten Anwendungen und unterstützt die Integration mit externen Systemen über deklarative Adapter. Diese Adapter bieten eine höhere Abstraktionsebene über die Unterstützung von Spring für Remoting, Messaging und Planung. Das Spring Integration für Event Hubs Erweiterungsprojekt bietet ein- und ausgehende Kanaladapter und Gateways für Azure Event Hubs.

Anmerkung

RxJava-Support-APIs werden ab Version 4.0.0 gelöscht. Weitere Informationen finden Sie unter Javadoc.

Verbrauchergruppe

Event Hubs bietet ähnliche Unterstützung von Consumergruppen wie Apache Kafka, aber mit leichter unterschiedlicher Logik. Während Kafka alle zugesicherten Offsets im Broker speichert, müssen Sie Offsets von Event Hubs-Nachrichten speichern, die manuell verarbeitet werden. Event Hubs SDK stellt die Funktion zum Speichern solcher Offsets in Azure Storage bereit.

Partitionierungsunterstützung

Event Hubs bietet ein ähnliches Konzept der physischen Partition wie Kafka. Im Gegensatz zu Kafkas automatischem Neuausgleich zwischen Verbrauchern und Partitionen bietet Event Hubs jedoch eine Art präemptiver Modus. Das Speicherkonto fungiert als Lease, um zu bestimmen, welche Partition im Besitz des Verbrauchers ist. Wenn ein neuer Verbraucher beginnt, versucht er, einige Partitionen von den meisten schwer geladenen Verbrauchern zu stehlen, um den Workloadausgleich zu erreichen.

Um die Lastenausgleichsstrategie anzugeben, können Entwickler EventHubsContainerProperties für die Konfiguration verwenden. Ein Beispiel zum Konfigurieren EventHubsContainerProperties.

Batch-Consumerunterstützung

Die EventHubsInboundChannelAdapter unterstützt den Batch-Nutzungsmodus. Um ihn zu aktivieren, können Benutzer den Listenermodus beim Erstellen einer ListenerMode.BATCH Instanz als EventHubsInboundChannelAdapter angeben. Wenn diese Option aktiviert ist, wird eine Nachricht, deren Nutzlast eine Liste von Batchereignissen ist, empfangen und an den nachgeschalteten Kanal übergeben. Jeder Nachrichtenkopf wird auch als Liste konvertiert, von denen der Inhalt der zugeordnete Headerwert ist, der von jedem Ereignis analysiert wird. Für die kommunalen Kopfzeilen der Partitions-ID, des Prüfpunkts und der letzten Queued-Eigenschaften werden sie als einzelner Wert für den gesamten Batch von Ereignissen gemeinsam dargestellt. Weitere Informationen finden Sie im Abschnitt Event Hubs-Nachrichtenkopfzeilen.

Anmerkung

Der Prüfpunktheader ist nur vorhanden, wenn MANUELL Prüfpunktmodus verwendet wird.

Die Prüfpunkterstellung von Batch-Consumer unterstützt zwei Modi: BATCH und MANUAL. BATCH Modus ist ein automatischer Prüfpunktmodus, um den gesamten Batch von Ereignissen zusammen zu überwachen, sobald sie empfangen wurden. MANUAL Modus besteht darin, die Ereignisse von Benutzern zu überprüfen. Bei Verwendung wird der Prüfpunkter- an den Nachrichtenkopf übergeben, und Benutzer können ihn verwenden, um Prüfpunkte zu erledigen.

Die Batchaufwendungsrichtlinie kann durch Eigenschaften von max-size und max-wait-timeangegeben werden, wobei max-size eine erforderliche Eigenschaft ist, während max-wait-time optional ist. Um die Strategie für die Batchnutzung anzugeben, können Entwickler EventHubsContainerProperties für die Konfiguration verwenden. Ein Beispiel zum Konfigurieren EventHubsContainerProperties.

Setup von Abhängigkeiten

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Konfiguration

Dieser Start bietet die folgenden drei Konfigurationsoptionen:

Verbindungskonfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Event Hubs verwendet werden.

Anmerkung

Wenn Sie einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource verwenden, lesen Sie Autorisieren des Zugriffs mit Microsoft Entra ID, um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.

Konfigurierbare Verbindungseigenschaften von spring-cloud-azure-starter-integration-eventhubs:

Eigentum Art Beschreibung
spring.cloud.azure.eventhubs.enabled boolesch Gibt an, ob azure Event Hubs aktiviert ist.
spring.cloud.azure.eventhubs.connection-string Schnur Event Hubs Namespace-Verbindungszeichenfolgenwert.
spring.cloud.azure.eventhubs.namespace Schnur Event Hubs-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DomainName bestehen.
spring.cloud.azure.eventhubs.domain-name Schnur Domänenname eines Azure Event Hubs-Namespacewerts.
spring.cloud.azure.eventhubs.custom-endpoint-address Schnur Benutzerdefinierte Endpunktadresse.
spring.cloud.azure.eventhubs.shared-connection Boolesch Gibt an, ob der zugrunde liegende EventProcessorClient und EventHubProducerAsyncClient dieselbe Verbindung verwenden. Standardmäßig wird eine neue Verbindung für jeden erstellten Event Hub-Client erstellt und verwendet.

Prüfpunktkonfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen für den Speicher-Blobs-Dienst, der zum Beibehalten des Partitionsbesitzes und der Prüfpunktinformationen verwendet wird.

Anmerkung

Ab Version 4.0.0, wenn die Eigenschaft von spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nicht manuell aktiviert ist, wird kein Speichercontainer automatisch erstellt.

Prüfpunkt konfigurierbarer Eigenschaften von spring-cloud-azure-starter-integration-eventhubs:

Eigentum Art Beschreibung
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolesch Gibt an, ob das Erstellen von Containern zulässig ist, falls nicht vorhanden.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Schnur Name für das Speicherkonto.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Schnur Zugriffsschlüssel für Speicherkonto.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Schnur Name des Speichercontainers.

Allgemeine Azure Service SDK-Konfigurationsoptionen sind auch für den Speicherspeicher für Speicher-Blob-Prüfpunktspeicher konfigurierbar. Die unterstützten Konfigurationsoptionen werden in Spring Cloud Azure-Konfigurationeingeführt und können entweder mit dem einheitlichen Präfix spring.cloud.azure. oder dem Präfix von spring.cloud.azure.eventhubs.processor.checkpoint-storekonfiguriert werden.

Konfigurationseigenschaften des Event Hub-Prozessors

Die EventHubsInboundChannelAdapter verwendet die EventProcessorClient, um Nachrichten von einem Event Hub zu nutzen, um die Gesamteigenschaften eines EventProcessorClientzu konfigurieren, können Entwickler EventHubsContainerProperties für die Konfiguration verwenden. Weitere Informationen zum Arbeiten mit EventHubsInboundChannelAdapter.

Grundlegende Nutzung

Senden von Nachrichten an Azure Event Hubs

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

    • Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in Ihrer application.yml Datei:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in Ihrer application.yml Datei:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in Ihrer application.yml-Datei:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Anmerkung

Die für tenant-id zulässigen Werte sind: common, organizations, consumersoder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt Abschnitt Abschnitt Fehler-AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandantenvorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  1. Erstellen Sie DefaultMessageHandler mit dem EventHubsTemplate Bean, um Nachrichten an Event Hubs zu senden.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Senden von Nachrichten mithilfe des Gateways.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Empfangen von Nachrichten von Azure Event Hubs

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

  2. Erstellen Sie einen Nachrichtenkanal als Eingabekanal.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Erstellen Sie EventHubsInboundChannelAdapter mit dem EventHubsMessageListenerContainer Bean, um Nachrichten von Event Hubs zu empfangen.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Erstellen Sie eine Nachrichtenempfängerbindung mit EventHubsInboundChannelAdapter über den zuvor erstellten Nachrichtenkanal.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurieren von EventHubsMessageConverter zum Anpassen von objectMapper

EventHubsMessageConverter wird als konfigurierbare Bohnen erstellt, damit Benutzer ObjectMapper anpassen können.

Batch-Consumerunterstützung

Um Nachrichten von Event Hubs in Batches zu nutzen, ähnelt es dem obigen Beispiel, neben den Benutzern sollten die batchaufwendigen Konfigurationsoptionen für EventHubsInboundChannelAdapterfestgelegt werden.

Beim Erstellen EventHubsInboundChannelAdaptersollte der Listenermodus als BATCHfestgelegt werden. Legen Sie beim Erstellen von EventHubsMessageListenerContainerden Prüfpunktmodus entweder als MANUAL oder BATCHfest, und die Batchoptionen können bei Bedarf konfiguriert werden.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Event Hubs-Nachrichtenkopfzeilen

In der folgenden Tabelle wird veranschaulicht, wie Die Nachrichteneigenschaften von Event Hubs den Kopfzeilen von Spring-Nachrichten zugeordnet werden. Für Azure Event Hubs wird die Nachricht als eventaufgerufen.

Zuordnung zwischen Event Hubs-Nachrichten-/Ereigniseigenschaften und Federnachrichtenkopfzeilen im Datensatzlistenermodus:

Event Hubs-Ereigniseigenschaften Federnachrichtenkopfkonstanten Art Beschreibung
Enqueued time EventHubsHeaders#ENQUEUED_TIME Augenblick Der Zeitpunkt, zu dem das Ereignis in UTC in der Event Hub-Partition enqueuiert wurde.
Offset EventHubsHeaders#OFFSET Lang Der Offset des Ereignisses, als es von der zugeordneten Event Hub-Partition empfangen wurde.
Partitionsschlüssel AzureHeaders#PARTITION_KEY Schnur Der Partitionshashschlüssel, wenn er beim ursprünglichen Veröffentlichen des Ereignisses festgelegt wurde.
Partitions-ID AzureHeaders#RAW_PARTITION_ID Schnur Die Partitions-ID des Event Hub.
Sequenznummer EventHubsHeaders#SEQUENCE_NUMBER Lang Die Sequenznummer, die dem Ereignis zugewiesen wurde, als es in der zugeordneten Event Hub-Partition enqueuiert wurde.
Letzte Queued-Ereigniseigenschaften EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Die Eigenschaften des letzten enqueued-Ereignisses in dieser Partition.
NA AzureHeaders#CHECKPOINTER Prüfpunkt Die Kopfzeile für den Prüfpunkt der jeweiligen Nachricht.

Benutzer können die Nachrichtenkopfzeilen für die zugehörigen Informationen jedes Ereignisses analysieren. Um einen Nachrichtenkopf für das Ereignis festzulegen, werden alle angepassten Kopfzeilen als Anwendungseigenschaft eines Ereignisses platziert, wobei der Header als Eigenschaftsschlüssel festgelegt wird. Wenn Ereignisse von Event Hubs empfangen werden, werden alle Anwendungseigenschaften in den Nachrichtenkopf konvertiert.

Anmerkung

Nachrichtenkopfzeilen des Partitionsschlüssels, enqueuierte Zeit, Offset- und Sequenznummer werden nicht unterstützt, um manuell festgelegt zu werden.

Wenn der Batch-Consumer-Modus aktiviert ist, werden die spezifischen Kopfzeilen von Batchnachrichten wie folgt aufgelistet, die eine Liste der Werte aus jedem einzelnen Event Hubs-Ereignis enthalten.

Zuordnung zwischen Event Hubs Message/Event Properties und Spring Message Headers im Batchlistener-Modus:

Event Hubs-Ereigniseigenschaften Spring Batch-Nachrichtenkopfkonstanten Art Beschreibung
Enqueued time EventHubsHeaders#ENQUEUED_TIME Liste der Sofortnachrichten Liste der sofortigen Ereignisse in UTC, wann jedes Ereignis in der Event Hub-Partition enqueuiert wurde.
Offset EventHubsHeaders#OFFSET Liste der langen Liste des Offsets jedes Ereignisses, wenn es von der zugeordneten Event Hub-Partition empfangen wurde.
Partitionsschlüssel AzureHeaders#PARTITION_KEY Liste der Zeichenfolgen Liste des Partitionshashingschlüssels, wenn er beim ursprünglichen Veröffentlichen jedes Ereignisses festgelegt wurde.
Sequenznummer EventHubsHeaders#SEQUENCE_NUMBER Liste der langen Liste der Sequenznummer, die jedem Ereignis zugewiesen wurde, als es in der zugeordneten Event Hub-Partition abgefragt wurde.
Systemeigenschaften EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Liste der Karte Liste der Systemeigenschaften jedes Ereignisses.
Anwendungseigenschaften EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Liste der Karte Liste der Anwendungseigenschaften jedes Ereignisses, in denen alle angepassten Nachrichtenkopfzeilen oder Ereigniseigenschaften platziert werden.

Anmerkung

Bei der Veröffentlichung von Nachrichten werden alle oben genannten Batchheader aus den Nachrichten entfernt, sofern vorhanden.

Proben

Weitere Informationen finden Sie in den Azure-spring-boot-samples Repository auf GitHub.

Federintegration mit Azure Service Bus

Schlüsselkonzepte

Spring Integration ermöglicht einfaches Messaging in Spring-basierten Anwendungen und unterstützt die Integration mit externen Systemen über deklarative Adapter.

Das Spring Integration for Azure Service Bus-Erweiterungsprojekt bietet ein- und ausgehende Kanaladapter für Azure Service Bus.

Anmerkung

CompletableFuture-Support-APIs sind ab Version 2.10.0 veraltet und werden von Reaktorkern ab Version 4.0.0 ersetzt. Weitere Informationen finden Sie unter Javadoc.

Setup von Abhängigkeiten

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Konfiguration

Dieser Start bietet die folgenden 2 Teile der Konfigurationsoptionen:

Verbindungskonfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Service Bus verwendet werden.

Anmerkung

Wenn Sie einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource verwenden, lesen Sie Autorisieren des Zugriffs mit Microsoft Entra ID, um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.

Konfigurierbare Verbindungseigenschaften von spring-cloud-azure-starter-integration-servicebus:

Eigentum Art Beschreibung
spring.cloud.azure.servicebus.enabled boolesch Gibt an, ob ein Azure Service Bus aktiviert ist.
spring.cloud.azure.servicebus.connection-string Schnur Service Bus-Namespace-Verbindungszeichenfolgenwert.
spring.cloud.azure.servicebus.custom-endpoint-address Schnur Die benutzerdefinierte Endpunktadresse, die beim Herstellen einer Verbindung mit Service Bus verwendet werden soll.
spring.cloud.azure.servicebus.namespace Schnur Service Bus-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DomainName bestehen.
spring.cloud.azure.servicebus.domain-name Schnur Domänenname eines Azure Service Bus-Namespacewerts.

Konfigurationseigenschaften des Dienstbusprozessors

Die ServiceBusInboundChannelAdapter verwendet die ServiceBusProcessorClient, um Nachrichten zu nutzen, um die Gesamteigenschaften eines ServiceBusProcessorClientzu konfigurieren, können Entwickler ServiceBusContainerProperties für die Konfiguration verwenden. Weitere Informationen zum Arbeiten mit ServiceBusInboundChannelAdapter.

Grundlegende Nutzung

Senden von Nachrichten an Azure Service Bus

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

    • Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in Ihrer application.yml Datei:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in Ihrer application.yml Datei:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Anmerkung

Die für tenant-id zulässigen Werte sind: common, organizations, consumersoder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt Abschnitt Abschnitt Fehler-AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandantenvorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  • Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in Ihrer application.yml-Datei:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Anmerkung

Die für tenant-id zulässigen Werte sind: common, organizations, consumersoder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt Abschnitt Abschnitt Fehler-AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandantenvorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  1. Erstellen Sie DefaultMessageHandler mit dem ServiceBusTemplate Bean, um Nachrichten an Service Bus zu senden, und legen Sie den Entitätstyp für die ServiceBusTemplate fest. In diesem Beispiel wird die Servicebuswarteschlange als Beispiel akzeptiert.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Senden von Nachrichten mithilfe des Gateways.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Empfangen von Nachrichten von Azure Service Bus

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

  2. Erstellen Sie einen Nachrichtenkanal als Eingabekanal.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Erstellen Sie ServiceBusInboundChannelAdapter mit dem ServiceBusMessageListenerContainer Bean, um Nachrichten an Service Bus zu empfangen. In diesem Beispiel wird die Servicebuswarteschlange als Beispiel akzeptiert.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Erstellen Sie eine Nachrichtenempfängerbindung mit ServiceBusInboundChannelAdapter über den zuvor erstellten Nachrichtenkanal.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Konfigurieren von ServiceBusMessageConverter zum Anpassen von objectMapper

ServiceBusMessageConverter wird als konfigurierbare Bohnen erstellt, damit Benutzer ObjectMapperanpassen können.

Dienstbus-Nachrichtenkopfzeilen

Bei einigen ServiceBus-Headern, die mehreren Springheaderkonstanten zugeordnet werden können, wird die Priorität verschiedener Federheader aufgelistet.

Zuordnung zwischen ServiceBus-Headern und Federkopfzeilen:

Service Bus-Nachrichtenkopfzeilen und -eigenschaften Federnachrichtenkopfkonstanten Art Konfigurierbar Beschreibung
Inhaltstyp MessageHeaders#CONTENT_TYPE Schnur Ja Der RFC2045 Inhaltstypdeskriptor der Nachricht.
Korrelations-ID ServiceBusMessageHeaders#CORRELATION_ID Schnur Ja Die Korrelations-ID der Nachricht
Nachrichten-ID ServiceBusMessageHeaders#MESSAGE_ID Schnur Ja Die Nachrichten-ID der Nachricht, diese Kopfzeile hat höhere Priorität als MessageHeaders#ID.
Nachrichten-ID MessageHeaders#ID UUID Ja Die Nachrichten-ID der Nachricht, diese Kopfzeile hat eine niedrigere Priorität als ServiceBusMessageHeaders#MESSAGE_ID.
Partitionsschlüssel ServiceBusMessageHeaders#PARTITION_KEY Schnur Ja Der Partitionsschlüssel zum Senden der Nachricht an eine partitionierte Entität.
Antworten auf MessageHeaders#REPLY_CHANNEL Schnur Ja Die Adresse einer Entität, an die Antworten gesendet werden sollen.
Antworten auf sitzungs-ID ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Schnur Ja Der Wert der ReplyToGroupId-Eigenschaft der Nachricht.
Geplante Queue-Zeit utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Ja Der Zeitpunkt, zu dem die Nachricht in Service Bus queuiert werden soll, hat diese Kopfzeile eine höhere Priorität als AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Geplante Queue-Zeit utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Ganze Zahl Ja Der Zeitpunkt, zu dem die Nachricht in Service Bus queuiert werden soll, hat diese Kopfzeile eine niedrigere Priorität als ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Sitzungs-ID ServiceBusMessageHeaders#SESSION_ID Schnur Ja Der Sitzungs-IDentifier für eine sitzungsfähige Entität.
Zeit zum Leben ServiceBusMessageHeaders#TIME_TO_LIVE Dauer Ja Die Dauer, bis diese Nachricht abläuft.
An ServiceBusMessageHeaders#TO Schnur Ja Die "to"-Adresse der Nachricht, die für die zukünftige Verwendung in Routingszenarien reserviert ist und derzeit vom Broker selbst ignoriert wird.
Betreff ServiceBusMessageHeaders#SUBJECT Schnur Ja Der Betreff für die Nachricht.
Beschreibung des Fehlers "Inaktiver Buchstabe" ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Schnur Nein Die Beschreibung für eine Nachricht, die inaktiv war.
Grund für inaktive Buchstaben ServiceBusMessageHeaders#DEAD_LETTER_REASON Schnur Nein Der Grund, warum eine Nachricht in totschreibend war.
Quelle für inaktive Buchstaben ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Schnur Nein Die Entität, in der die Nachricht inaktiv war.
Lieferanzahl ServiceBusMessageHeaders#DELIVERY_COUNT lang Nein Die Häufigkeit, mit der diese Nachricht an Clients übermittelt wurde.
Nummer der Enqueued-Sequenz ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER lang Nein Die queuierte Sequenznummer, die einer Nachricht von Service Bus zugewiesen ist.
Enqueued time ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Nein Das Datum, zu dem diese Nachricht in Service Bus enqueuiert wurde.
Läuft ab bei ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Nein Das Datum, zu dem diese Nachricht abläuft.
Sperrtoken ServiceBusMessageHeaders#LOCK_TOKEN Schnur Nein Das Sperrtoken für die aktuelle Nachricht.
Gesperrt bis ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Nein Das Datum, zu dem die Sperre dieser Nachricht abläuft.
Sequenznummer ServiceBusMessageHeaders#SEQUENCE_NUMBER lang Nein Die eindeutige Nummer, die einer Nachricht von Service Bus zugewiesen ist.
Zustand ServiceBusMessageHeaders#STATE ServiceBusMessageState Nein Der Status der Nachricht, die aktiv, verzögert oder geplant sein kann.

Unterstützung des Partitionsschlüssels

Dieser Starter unterstützt Service Bus-Partitionierung durch Festlegen des Partitionsschlüssels und der Sitzungs-ID im Nachrichtenheader. In diesem Abschnitt wird erläutert, wie Sie den Partitionsschlüssel für Nachrichten festlegen.

Empfohlen: Verwenden Sie ServiceBusMessageHeaders.PARTITION_KEY als Schlüssel der Kopfzeile.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nicht empfohlen, derzeit unterstützt: AzureHeaders.PARTITION_KEY als Schlüssel der Kopfzeile.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Anmerkung

Wenn sowohl ServiceBusMessageHeaders.PARTITION_KEY als auch AzureHeaders.PARTITION_KEY in den Nachrichtenkopfzeilen festgelegt werden, wird ServiceBusMessageHeaders.PARTITION_KEY bevorzugt.

Sitzungsunterstützung

In diesem Beispiel wird veranschaulicht, wie Die Sitzungs-ID einer Nachricht in der Anwendung manuell festgelegt wird.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Anmerkung

Wenn die ServiceBusMessageHeaders.SESSION_ID in den Nachrichtenkopfzeilen festgelegt wird und auch ein anderer ServiceBusMessageHeaders.PARTITION_KEY Header festgelegt wird, wird der Wert der Sitzungs-ID schließlich verwendet, um den Wert des Partitionsschlüssels zu überschreiben.

Anpassen von ServiceBus-Clienteigenschaften

Entwickler können AzureServiceClientBuilderCustomizer zum Anpassen von Service Bus-Clienteigenschaften verwenden. Im folgenden Beispiel wird die eigenschaft sessionIdleTimeout in ServiceBusClientBuilderangepasst:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Proben

Weitere Informationen finden Sie in den Azure-spring-boot-samples Repository auf GitHub.

Spring Integration in Azure Storage Queue

Schlüsselkonzepte

Azure Queue Storage ist ein Dienst zum Speichern großer Anzahl von Nachrichten. Sie greifen über authentifizierte Anrufe über HTTP oder HTTPS auf Nachrichten von überall auf der Welt zu. Eine Warteschlangennachricht kann bis zu 64 KB groß sein. Eine Warteschlange kann Millionen von Nachrichten bis zum Gesamtkapazitätslimit eines Speicherkontos enthalten. Warteschlangen werden häufig verwendet, um einen Arbeitsbestand zu erstellen, der asynchron verarbeitet werden kann.

Setup von Abhängigkeiten

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Konfiguration

Dieser Start bietet die folgenden Konfigurationsoptionen:

Verbindungskonfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit der Azure Storage-Warteschlange verwendet werden.

Anmerkung

Wenn Sie einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource verwenden, lesen Sie Autorisieren des Zugriffs mit Microsoft Entra ID, um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.

Konfigurierbare Verbindungseigenschaften von spring-cloud-azure-starter-integration-storage-queue:

Eigentum Art Beschreibung
spring.cloud.azure.storage.queue.enabled boolesch Gibt an, ob eine Azure Storage-Warteschlange aktiviert ist.
spring.cloud.azure.storage.queue.connection-string Schnur Namespace-Verbindungszeichenfolgenwert der Speicherwarteschlange.
spring.cloud.azure.storage.queue.accountName Schnur Name des Speicherwarteschlangenkontos.
spring.cloud.azure.storage.queue.accountKey Schnur Speicherwarteschlangenkontoschlüssel.
spring.cloud.azure.storage.queue.endpoint Schnur Endpunkt des Speicherwarteschlangendiensts.
spring.cloud.azure.storage.queue.sasToken Schnur Sas-Token-Anmeldeinformationen
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion, die beim Erstellen von API-Anforderungen verwendet wird.
spring.cloud.azure.storage.queue.messageEncoding Schnur Codierung von Warteschlangennachrichten.

Grundlegende Nutzung

Senden von Nachrichten an die Azure Storage-Warteschlange

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

    • Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in Ihrer application.yml Datei:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      
    • Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in Ihrer application.yml Datei:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Anmerkung

Die für tenant-id zulässigen Werte sind: common, organizations, consumersoder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt Abschnitt Abschnitt Fehler-AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandantenvorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  • Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in Ihrer application.yml-Datei:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Anmerkung

Die für tenant-id zulässigen Werte sind: common, organizations, consumersoder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt Abschnitt Abschnitt Fehler-AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandantenvorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  1. Erstellen Sie DefaultMessageHandler mit dem StorageQueueTemplate Bean, um Nachrichten an die Speicherwarteschlange zu senden.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Erstellen Sie eine Nachrichtengatewaybindung mit dem obigen Nachrichtenhandler über einen Nachrichtenkanal.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Senden von Nachrichten mithilfe des Gateways.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Empfangen von Nachrichten aus der Azure Storage-Warteschlange

  1. Füllen Sie die Konfigurationsoptionen für Anmeldeinformationen aus.

  2. Erstellen Sie einen Nachrichtenkanal als Eingabekanal.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Erstellen Sie StorageQueueMessageSource mit dem StorageQueueTemplate Bean, um Nachrichten an die Speicherwarteschlange zu empfangen.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Erstellen Sie eine Nachrichtenempfängerbindung mit StorageQueueMessageSource, die im letzten Schritt über den zuvor erstellten Nachrichtenkanal erstellt wurde.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Proben

Weitere Informationen finden Sie in den Azure-spring-boot-samples Repository auf GitHub.