Freigeben über


Spring Cloud Azure-Unterstützung für Spring Cloud Stream

Dieser Artikel gilt für:✅ Version 4.19.0 ✅ Version 5.19.0

Spring Cloud Stream ist ein Framework für die Erstellung hoch skalierbarer ereignisgesteuerter Mikroservices, die mit freigegebenen Messagingsystemen verbunden sind.

Das Framework bietet ein flexibles Programmiermodell, das auf bereits etablierten und vertrauten Spring-Idioms und bewährten Methoden basiert. Zu diesen bewährten Methoden gehören Unterstützung für persistente Pub/Sub-Semantik, Consumergruppen und zustandsbehaftete Partitionen.

Zu den aktuellen Binderimplementierungen gehören:

Spring Cloud Stream Binder für Azure Event Hubs

Schlüsselkonzepte

Der Spring Cloud Stream Binder für Azure Event Hubs stellt die Bindungsimplementierung für das Spring Cloud Stream-Framework bereit. Diese Implementierung verwendet Spring Integration Event Hubs-Kanaladapter auf der Grundlage. Aus Der Perspektive des Designs ist Event Hubs ähnlich wie Kafka. Außerdem kann über die Kafka-API auf Event Hubs zugegriffen werden. Wenn Ihr Projekt eine enge Abhängigkeit von der Kafka-API hat, können Sie Events Hub mit kafka-API-Beispiel-

Verbrauchergruppe

Event Hubs bietet ähnliche Unterstützung von Consumergruppen wie Apache Kafka, aber mit leichter unterschiedlicher Logik. Während Kafka alle zugesicherten Offsets im Broker speichert, müssen Sie Offsets von Event Hubs-Nachrichten speichern, die manuell verarbeitet werden. Event Hubs SDK stellt die Funktion zum Speichern solcher Offsets in Azure Storage bereit.

Partitionierungsunterstützung

Event Hubs bietet ein ähnliches Konzept der physischen Partition wie Kafka. Im Gegensatz zu Kafkas automatischer Rebalancing zwischen Verbrauchern und Partitionen bietet Event Hubs jedoch eine Art präemptiver Modus. Das Speicherkonto fungiert als Lease, um zu bestimmen, welcher Verbraucher welche Partition besitzt. Wenn ein neuer Verbraucher beginnt, versucht er, einige Partitionen von den am stärksten geladenen Verbrauchern zu stehlen, um die Arbeitsauslastungsbilanz zu erreichen.

Um die Lastenausgleichsstrategie anzugeben, werden Eigenschaften von spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* bereitgestellt. Weitere Informationen finden Sie im Abschnitt Consumer-Eigenschaften.

Batch-Consumerunterstützung

Spring Cloud Azure Stream Event Hubs-Ordner unterstützt Spring Cloud Stream Batch Consumer Feature.

Um mit dem Batch-Consumer-Modus zu arbeiten, legen Sie die eigenschaft spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode auf truefest. Wenn diese Option aktiviert ist, wird eine Nachricht mit einer Nutzlast einer Liste von Batchereignissen empfangen und an die Consumer-Funktion übergeben. Jeder Nachrichtenkopf wird auch in eine Liste konvertiert, deren Inhalt der zugeordnete Headerwert ist, der von jedem Ereignis analysiert wird. Die kommunalen Header der Partitions-ID, des Prüfpunkters und der letzten Enqueued-Eigenschaften werden als einzelner Wert dargestellt, da der gesamte Batch von Ereignissen denselben Wert aufweist. Weitere Informationen finden Sie in den Event Hubs-Nachrichtenkopfzeilen Abschnitt Spring Cloud Azure-Unterstützung für Spring Integration.

Anmerkung

Der Prüfpunktheader ist nur vorhanden, wenn der MANUAL Prüfpunktmodus verwendet wird.

Die Prüfpunkterstellung von Batch-Consumer unterstützt zwei Modi: BATCH und MANUAL. BATCH Modus ist ein automatischer Prüfpunktmodus, um den gesamten Batch von Ereignissen zusammen zu überwachen, sobald der Ordner sie empfängt. MANUAL Modus besteht darin, die Ereignisse von Benutzern zu überprüfen. Bei Verwendung wird die Checkpointer an den Nachrichtenkopf übergeben, und Benutzer können sie verwenden, um Prüfpunkte zu erledigen.

Sie können die Batchgröße angeben, indem Sie die eigenschaften max-size und max-wait-time festlegen, die über ein Präfix von spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.verfügen. Die max-size-Eigenschaft ist erforderlich, und die max-wait-time Eigenschaft ist optional. Weitere Informationen finden Sie im Abschnitt Consumer-Eigenschaften.

Setup von Abhängigkeiten

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Alternativ können Sie auch den Spring Cloud Azure Stream Event Hubs Starter verwenden, wie im folgenden Beispiel für Maven gezeigt:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Konfiguration

Der Ordner stellt die folgenden drei Konfigurationsoptionen bereit:

Verbindungskonfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Event Hubs verwendet werden.

Anmerkung

Wenn Sie einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource verwenden, lesen Sie Autorisieren des Zugriffs mit Microsoft Entra ID, um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.

Konfigurierbare Verbindungseigenschaften von spring-cloud-azure-stream-binder-eventhubs:

Eigentum Art Beschreibung
spring.cloud.azure.eventhubs.enabled boolesch Gibt an, ob azure Event Hubs aktiviert ist.
spring.cloud.azure.eventhubs.connection-string Schnur Event Hubs Namespace-Verbindungszeichenfolgenwert.
spring.cloud.azure.eventhubs.namespace Schnur Event Hubs-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DomainName bestehen.
spring.cloud.azure.eventhubs.domain-name Schnur Domänenname eines Azure Event Hubs-Namespacewerts.
spring.cloud.azure.eventhubs.custom-endpoint-address Schnur Benutzerdefinierte Endpunktadresse.

Trinkgeld

Allgemeine Azure Service SDK-Konfigurationsoptionen können auch für den Feder Cloud Azure Stream Event Hubs-Ordner konfiguriert werden. Die unterstützten Konfigurationsoptionen werden in Spring Cloud Azure-Konfigurationeingeführt und können entweder mit dem einheitlichen Präfix spring.cloud.azure. oder dem Präfix von spring.cloud.azure.eventhubs.konfiguriert werden.

Der Ordner unterstützt auch Spring Could Azure Resource Manager standardmäßig. Informationen zum Abrufen der Verbindungszeichenfolge mit Sicherheitsprinzipale, die nicht mit Data verwandten Rollen gewährt werden, finden Sie im Abschnitt Grundlegende Verwendung Abschnitt Spring Could Azure Resource Manager.

Eigenschaften der Prüfpunktkonfiguration

Dieser Abschnitt enthält die Konfigurationsoptionen für den Speicher-Blobs-Dienst, der zum Beibehalten des Partitionsbesitzes und der Prüfpunktinformationen verwendet wird.

Anmerkung

Ab Version 4.0.0 wird, wenn die Eigenschaft von spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nicht manuell aktiviert ist, kein Speichercontainer automatisch mit dem Namen aus spring.cloud.stream.bindings.binding-name.destination.destinationerstellt.

Prüfpunkt konfigurierbarer Eigenschaften von spring-cloud-azure-stream-binder-eventhubs:

Eigentum Art Beschreibung
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolesch Gibt an, ob das Erstellen von Containern zulässig ist, falls nicht vorhanden.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Schnur Name für das Speicherkonto.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Schnur Zugriffsschlüssel für Speicherkonto.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Schnur Name des Speichercontainers.

Trinkgeld

Allgemeine Azure Service SDK-Konfigurationsoptionen sind auch für den Speicherspeicher für Speicher-Blob-Prüfpunktspeicher konfigurierbar. Die unterstützten Konfigurationsoptionen werden in Spring Cloud Azure-Konfigurationeingeführt und können entweder mit dem einheitlichen Präfix spring.cloud.azure. oder dem Präfix von spring.cloud.azure.eventhubs.processor.checkpoint-storekonfiguriert werden.

Azure Event Hubs Binding-Konfigurationseigenschaften

Die folgenden Optionen sind in vier Abschnitte unterteilt: Consumer Properties, Advanced Consumer Configurations, Producer Properties and Advanced Producer Configurations.

Consumereigenschaften

Diese Eigenschaften werden über EventHubsConsumerPropertiesverfügbar gemacht.

Anmerkung

Um Wiederholungen zu vermeiden, unterstützt Spring Cloud Azure Stream Binder Event Hubs seit Version 4.19.0 und 5.19.0 das Festlegen von Werten für alle Kanäle im Format von spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Konfigurierbare Verbrauchereigenschaften von spring-cloud-azure-stream-binder-eventhubs:

Eigentum Art Beschreibung
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Prüfpunktmodus, der verwendet wird, wenn Verbraucher entscheiden, wie die Prüfpunktmeldung ausgeführt werden soll
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Ganze Zahl Bestimmt die Nachrichtenmenge für jede Partition, um einen Prüfpunkt zu erledigen. Wird nur wirksam, wenn PARTITION_COUNT Prüfpunktmodus verwendet wird.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Dauer Legt das Zeitintervall fest, um einen Prüfpunkt zu erledigen. Wird nur wirksam, wenn TIME Prüfpunktmodus verwendet wird.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max Größe Ganze Zahl Die maximale Anzahl von Ereignissen in einem Batch. Erforderlich für den Batch-Consumer-Modus.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Dauer Die maximale Zeitdauer für die Batchaufwärmung. Wird nur wirksam, wenn der Batch-Consumer-Modus aktiviert ist und optional ist.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Dauer Die Intervallzeitdauer für die Aktualisierung.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy Die Lastenausgleichsstrategie.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Dauer Die Zeitdauer, nach der der Besitz der Partition abläuft.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Boolesch Gibt an, ob der Ereignisprozessor Informationen zum letzten enqueued-Ereignis auf der zugehörigen Partition anfordern soll und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Ganze Zahl Die Anzahl, die vom Consumer verwendet wird, um die Anzahl der Ereignisse zu steuern, die der Event Hub-Consumer aktiv empfängt und lokal in die Warteschlange stellt.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Zuordnen des Schlüssels als Partitions-ID und Werte von StartPositionProperties Die Zuordnung, die die Ereignisposition enthält, die für jede Partition verwendet werden soll, wenn kein Prüfpunkt für die Partition im Prüfpunktspeicher vorhanden ist. Diese Zuordnung wird von der Partitions-ID abgeschlüsselt.

Anmerkung

Die initial-partition-event-position Konfiguration akzeptiert eine map, um die Anfangsposition für jeden Event Hub anzugeben. Daher ist der Schlüssel die Partitions-ID, und der Wert ist von StartPositionProperties, die Eigenschaften von Offset, Sequenznummer, enqueuierte Datumszeit und ob einschließlich enthält. Sie können sie z. B. als

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Erweiterte Consumerkonfiguration

Die oben Verbindung, Prüfpunkt-und allgemeinen Azure SDK-Client Konfigurationsunterstützung für jeden Ordnerverbraucher, den Sie mit dem Präfix spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.konfigurieren können.

Produzenteneigenschaften

Diese Eigenschaften werden über EventHubsProducerPropertiesverfügbar gemacht.

Anmerkung

Um Wiederholungen zu vermeiden, unterstützt Spring Cloud Azure Stream Binder Event Hubs seit Version 4.19.0 und 5.19.0 das Festlegen von Werten für alle Kanäle im Format von spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-eventhubs:

Eigentum Art Beschreibung
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync boolesch Die Switch-Kennzeichnung für die Synchronisierung des Produzenten. Wenn true, wartet der Produzent nach einem Sendevorgang auf eine Antwort.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout lang Die Zeitspanne, die nach einem Sendevorgang auf eine Antwort wartet. Wird nur wirksam, wenn ein Synchronisierungsproduzent aktiviert ist.
Erweiterte Produzentenkonfiguration

Die oben Verbindung und allgemeinen Azure SDK-Client Konfigurationsunterstützung für jeden Ordnerhersteller, den Sie mit dem Präfix spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.konfigurieren können.

Grundlegende Nutzung

Senden und Empfangen von Nachrichten von/an Event Hubs

  1. Füllen Sie die Konfigurationsoptionen mit Anmeldeinformationen aus.

    • Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in Ihrer application.yml Datei:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in Ihrer application.yml-Datei:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Anmerkung

Die für tenant-id zulässigen Werte sind: common, organizations, consumersoder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt Abschnitt Abschnitt Fehler-AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandantenvorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  • Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in Ihrer application.yml Datei:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Definieren Sie Lieferanten und Verbraucher.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Partitionierungsunterstützung

Es wird eine PartitionSupplier mit vom Benutzer bereitgestellten Partitionsinformationen erstellt, um die Partitionsinformationen über die zu sendende Nachricht zu konfigurieren. Das folgende Flussdiagramm zeigt den Vorgang zum Abrufen verschiedener Prioritäten für die Partitions-ID und den Schlüssel:

Diagramm mit einem Flussdiagramm des Partitionierungsunterstützungsprozesses.

Batch-Consumerunterstützung

  1. Stellen Sie die Batchkonfigurationsoptionen bereit, wie im folgenden Beispiel gezeigt:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Definieren Sie Lieferanten und Verbraucher.

    Für den Prüfpunktmodus als BATCHkönnen Sie den folgenden Code verwenden, um Nachrichten zu senden und in Batches zu verwenden.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Für den Prüfpunktmodus als MANUALkönnen Sie den folgenden Code verwenden, um Nachrichten zu senden und Nachrichten in Batches zu nutzen/zu nutzen.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Anmerkung

Im Batch-Nutzungsmodus ist der Standardinhaltstyp des Spring Cloud Stream-Ordners application/json. Stellen Sie daher sicher, dass die Nachrichtennutzlast am Inhaltstyp ausgerichtet ist. Wenn Sie z. B. den Standardinhaltstyp application/json verwenden, um Nachrichten mit String Nutzlast zu empfangen, sollte die Nutzlast JSON Stringsein, umgeben von doppelten Anführungszeichen für den ursprünglichen String Text. Bei text/plain Inhaltstyp kann es sich um ein String-Objekt handeln. Weitere Informationen finden Sie unter Spring Cloud Stream Content Type Negotiation.

Behandeln von Fehlermeldungen

  • Behandeln von Ausgehenden Bindungsfehlermeldungen

    Standardmäßig erstellt Spring Integration einen globalen Fehlerkanal namens errorChannel. Konfigurieren Sie den folgenden Nachrichtenendpunkt, um ausgehende Bindungsfehlermeldungen zu behandeln.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Behandeln eingehender Bindungsfehlermeldungen

    Spring Cloud Stream Event Hubs Binder unterstützt eine Lösung zur Behandlung von Fehlern für eingehende Nachrichtenbindungen: Fehlerhandler.

    Fehlerhandler:

    Spring Cloud Stream macht einen Mechanismus verfügbar, mit dem Sie einen benutzerdefinierten Fehlerhandler bereitstellen können, indem sie eine Consumer hinzufügen, die ErrorMessage Instanzen akzeptiert. Weitere Informationen finden Sie unter Behandeln von Fehlermeldungen in der Spring Cloud Stream-Dokumentation.

    • Bindungsstandardfehlerhandler

      Konfigurieren Sie eine einzelne Consumer Bohnen, um alle eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Standardfunktion abonniert jeden eingehenden Bindungsfehlerkanal:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Außerdem müssen Sie die eigenschaft spring.cloud.stream.default.error-handler-definition auf den Funktionsnamen festlegen.

    • Bindungsspezifischer Fehlerhandler

      Konfigurieren Sie eine Consumer Bean, um die spezifischen eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Funktion abonniert den spezifischen Fehlerkanal für eingehende Bindung und hat eine höhere Priorität als der Bindungsstandardfehlerhandler:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Außerdem müssen Sie die eigenschaft spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition auf den Funktionsnamen festlegen.

Event Hubs-Nachrichtenkopfzeilen

Die unterstützten grundlegenden Nachrichtenkopfzeilen finden Sie im Abschnitt Event Hubs-Nachrichtenkopfzeilen Abschnitt Spring Cloud Azure-Unterstützung für Spring Integration.

Unterstützung mehrerer Ordner

Die Verbindung mit mehreren Event Hubs-Namespaces wird auch mithilfe mehrerer Ordner unterstützt. In diesem Beispiel wird eine Verbindungszeichenfolge als Beispiel verwendet. Anmeldeinformationen von Dienstprinzipalen und verwalteten Identitäten werden ebenfalls unterstützt. Sie können verwandte Eigenschaften in den Umgebungseinstellungen der einzelnen Ordner festlegen.

  1. Um mehrere Ordner mit Event Hubs zu verwenden, konfigurieren Sie die folgenden Eigenschaften in Ihrer application.yml Datei:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Anmerkung

    In der vorherigen Anwendungsdatei wird gezeigt, wie Sie einen einzelnen Standardabfrager für die Anwendung für alle Bindungen konfigurieren. Wenn Sie den Poller für eine bestimmte Bindung konfigurieren möchten, können Sie eine Konfiguration wie spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000verwenden.

  2. Wir müssen zwei Lieferanten und zwei Verbraucher definieren:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Ressourcenbereitstellung

Event Hubs-Ordner unterstützt die Bereitstellung von Event Hub- und Consumergruppen. Benutzer können die folgenden Eigenschaften verwenden, um die Bereitstellung zu aktivieren.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Anmerkung

Die für tenant-id zulässigen Werte sind: common, organizations, consumersoder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt Abschnitt Abschnitt Fehler-AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandantenvorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

Proben

Weitere Informationen finden Sie in den Azure-spring-boot-samples Repository auf GitHub.

Spring Cloud Stream Binder für Azure Service Bus

Schlüsselkonzepte

Der Spring Cloud Stream Binder für Azure Service Bus stellt die Bindungsimplementierung für das Spring Cloud Stream Framework bereit. Diese Implementierung verwendet Spring Integration Service Bus Channel Adapter auf der Grundlage.

Geplante Nachricht

Dieser Ordner unterstützt das Senden von Nachrichten an ein Thema zur verzögerten Verarbeitung. Benutzer können geplante Nachrichten mit Kopfzeile x-delay in Millisekunden eine Verzögerungszeit für die Nachricht ausdrücken. Die Nachricht wird nach x-delay Millisekunden an die jeweiligen Themen übermittelt.

Verbrauchergruppe

Service Bus Topic bietet ähnliche Unterstützung der Consumergruppe wie Apache Kafka, aber mit leichter unterschiedlicher Logik. Dieser Ordner basiert auf Subscription eines Themas, das als Verbrauchergruppe fungiert.

Setup von Abhängigkeiten

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Alternativ können Sie auch den Spring Cloud Azure Stream Service Bus Starter verwenden, wie im folgenden Beispiel für Maven gezeigt:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Konfiguration

Der Ordner stellt die folgenden beiden Konfigurationsoptionen bereit:

Verbindungskonfigurationseigenschaften

Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Service Bus verwendet werden.

Anmerkung

Wenn Sie einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource verwenden, lesen Sie Autorisieren des Zugriffs mit Microsoft Entra ID, um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.

Konfigurierbare Verbindungseigenschaften von spring-cloud-azure-stream-binder-servicebus:

Eigentum Art Beschreibung
spring.cloud.azure.servicebus.enabled boolesch Gibt an, ob ein Azure Service Bus aktiviert ist.
spring.cloud.azure.servicebus.connection-string Schnur Service Bus-Namespace-Verbindungszeichenfolgenwert.
spring.cloud.azure.servicebus.custom-endpoint-address Schnur Die benutzerdefinierte Endpunktadresse, die beim Herstellen einer Verbindung mit Service Bus verwendet werden soll.
spring.cloud.azure.servicebus.namespace Schnur Service Bus-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DomainName bestehen.
spring.cloud.azure.servicebus.domain-name Schnur Domänenname eines Azure Service Bus-Namespacewerts.

Anmerkung

Allgemeine Azure Service SDK-Konfigurationsoptionen können auch für den Feder Cloud Azure Stream Service Bus-Ordner konfiguriert werden. Die unterstützten Konfigurationsoptionen werden in Spring Cloud Azure-Konfigurationeingeführt und können entweder mit dem einheitlichen Präfix spring.cloud.azure. oder dem Präfix von spring.cloud.azure.servicebus.konfiguriert werden.

Der Ordner unterstützt auch Spring Could Azure Resource Manager standardmäßig. Informationen zum Abrufen der Verbindungszeichenfolge mit Sicherheitsprinzipale, die nicht mit Data verwandten Rollen gewährt werden, finden Sie im Abschnitt Grundlegende Verwendung Abschnitt Spring Could Azure Resource Manager.

Azure Service Bus-Bindungskonfigurationseigenschaften

Die folgenden Optionen sind in vier Abschnitte unterteilt: Consumer Properties, Advanced Consumer Configurations, Producer Properties and Advanced Producer Configurations.

Consumereigenschaften

Diese Eigenschaften werden über ServiceBusConsumerPropertiesverfügbar gemacht.

Anmerkung

Um Wiederholungen zu vermeiden, unterstützt Spring Cloud Azure Stream Binder Service Bus seit Version 4.19.0 und 5.19.0 das Festlegen von Werten für alle Kanäle im Format von spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Konfigurierbare Verbrauchereigenschaften von spring-cloud-azure-stream-binder-servicebus:

Eigentum Art Vorgabe Beschreibung
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected boolesch FALSCH Wenn die fehlgeschlagenen Nachrichten an den DLQ weitergeleitet werden.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Ganze Zahl 1 Max. gleichzeitige Nachrichten, die der Dienstbusprozessorclient verarbeiten soll. Wenn die Sitzung aktiviert ist, gilt sie für jede Sitzung.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-gleichzeitige Sitzungen Ganze Zahl null Maximale Anzahl gleichzeitiger Sitzungen, die zu einem bestimmten Zeitpunkt verarbeitet werden sollen.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Boolesch null Gibt an, ob die Sitzung aktiviert ist.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Ganze Zahl 0 Die Prefetch-Anzahl des Service Bus-Prozessorclients.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue SubQueue nichts Der Typ der Unterwarteschlange, mit der eine Verbindung hergestellt werden soll.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Dauer 5m Die Zeitspanne, um die automatische Verlängerung der Sperre fortzusetzen.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Der Empfangsmodus des Service Bus-Prozessorclients.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Boolesch STIMMT Gibt an, ob Nachrichten automatisch abgleichen sollen. Wenn dieser Wert auf "false" festgelegt ist, wird eine Nachrichtenkopfzeile von Checkpointer hinzugefügt, damit Entwickler Nachrichten manuell abgleichen können.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes Long 1024 Die maximale Größe der Warteschlange/des Themas in Megabyte, bei der es sich um die Größe des arbeitsspeichers handelt, der für die Warteschlange/das Thema zugeordnet ist.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Dauer P10675199DT2H48M5.4775807S. (10675199 Tage, 2 Stunden, 48 Minuten, 5 Sekunden und 477 Millisekunden) Die Dauer, nach der die Nachricht abläuft, beginnend ab dem Zeitpunkt, an den die Nachricht an Service Bus gesendet wird.

Wichtig

Wenn Sie den Azure Resource Manager (ARM) verwenden, müssen Sie die eigenschaft spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type konfigurieren. Weitere Informationen finden Sie im Beispiel servicebus-queue-binder-arm auf GitHub.

Erweiterte Consumerkonfiguration

Die oben genannten Verbindung und allgemeinen Azure SDK-Client Konfigurationsunterstützung für jeden Ordnerverbraucher, den Sie mit dem Präfix spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.konfigurieren können.

Produzenteneigenschaften

Diese Eigenschaften werden über ServiceBusProducerPropertiesverfügbar gemacht.

Anmerkung

Um Wiederholungen zu vermeiden, unterstützt Spring Cloud Azure Stream Binder Service Bus seit Version 4.19.0 und 5.19.0 das Festlegen von Werten für alle Kanäle im Format von spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-servicebus:

Eigentum Art Vorgabe Beschreibung
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync boolesch FALSCH Switch flag for sync of producer.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout lang 10000 Timeoutwert für das Senden des Produzenten.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType null Service Bus-Entitätstyp des Produzenten, erforderlich für den Bindungshersteller.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes Long 1024 Die maximale Größe der Warteschlange/des Themas in Megabyte, bei der es sich um die Größe des arbeitsspeichers handelt, der für die Warteschlange/das Thema zugeordnet ist.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Dauer P10675199DT2H48M5.4775807S. (10675199 Tage, 2 Stunden, 48 Minuten, 5 Sekunden und 477 Millisekunden) Die Dauer, nach der die Nachricht abläuft, beginnend ab dem Zeitpunkt, an den die Nachricht an Service Bus gesendet wird.

Wichtig

Bei Verwendung des Bindungsherstellers muss die Eigenschaft spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type konfiguriert werden.

Erweiterte Produzentenkonfiguration

Die oben Verbindung und allgemeinen Azure SDK-Client Konfigurationsunterstützung für jeden Ordnerhersteller, den Sie mit dem Präfix spring.cloud.stream.servicebus.bindings.<binding-name>.producer.konfigurieren können.

Grundlegende Nutzung

Senden und Empfangen von Nachrichten von/an Service Bus

  1. Füllen Sie die Konfigurationsoptionen mit Anmeldeinformationen aus.

    • Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in Ihrer application.yml Datei:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in Ihrer application.yml-Datei:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Anmerkung

Die für tenant-id zulässigen Werte sind: common, organizations, consumersoder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt Abschnitt Abschnitt Fehler-AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandantenvorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

  • Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in Ihrer application.yml Datei:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Definieren Sie Lieferanten und Verbraucher.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Unterstützung des Partitionsschlüssels

Der Ordner unterstützt Service Bus-Partitionierung, indem die Einstellung des Partitionsschlüssels und der Sitzungs-ID im Nachrichtenkopf zugelassen wird. In diesem Abschnitt wird erläutert, wie Sie den Partitionsschlüssel für Nachrichten festlegen.

Spring Cloud Stream stellt eine SpEL-Ausdruckseigenschaft des Partitionsschlüssels spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionbereit. Legen Sie diese Eigenschaft beispielsweise als "'partitionKey-' + headers[<message-header-key>]" fest, und fügen Sie eine Kopfzeile namens "Message-header-key" hinzu. Spring Cloud Stream verwendet den Wert für diesen Header beim Auswerten des Ausdrucks, um einen Partitionsschlüssel zuzuweisen. Der folgende Code stellt einen Beispielhersteller bereit:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Sitzungsunterstützung

Der Ordner unterstützt Nachrichtensitzungen service bus. Die Sitzungs-ID einer Nachricht kann über den Nachrichtenkopf festgelegt werden.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Anmerkung

Gemäß Dienstbuspartitionierunghat die Sitzungs-ID eine höhere Priorität als Partitionsschlüssel. Wenn also sowohl ServiceBusMessageHeaders#SESSION_ID- als auch ServiceBusMessageHeaders#PARTITION_KEY Header festgelegt werden, wird der Wert der Sitzungs-ID schließlich verwendet, um den Wert des Partitionsschlüssels zu überschreiben.

Behandeln von Fehlermeldungen

  • Behandeln von Ausgehenden Bindungsfehlermeldungen

    Standardmäßig erstellt Spring Integration einen globalen Fehlerkanal namens errorChannel. Konfigurieren Sie den folgenden Nachrichtenendpunkt, um die Fehlermeldung für ausgehende Bindung zu behandeln.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Behandeln eingehender Bindungsfehlermeldungen

    Spring Cloud Stream Service Bus Binder unterstützt zwei Lösungen zur Behandlung von Fehlern für eingehende Nachrichtenbindungen: der Ordnerfehlerhandler und -handler.

    Binder-Fehlerhandler:

    Der Standardordnerfehlerhandler behandelt die eingehende Bindung. Sie verwenden diesen Handler, um fehlgeschlagene Nachrichten an die Warteschlange mit inaktiven Buchstaben zu senden, wenn spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected aktiviert ist. Andernfalls werden die fehlgeschlagenen Nachrichten abgebrochen. Der Sammelmappenfehlerhandler schließt sich gegenseitig mit anderen bereitgestellten Fehlerhandlern aus.

    Fehlerhandler:

    Spring Cloud Stream macht einen Mechanismus verfügbar, mit dem Sie einen benutzerdefinierten Fehlerhandler bereitstellen können, indem sie eine Consumer hinzufügen, die ErrorMessage Instanzen akzeptiert. Weitere Informationen finden Sie unter Behandeln von Fehlermeldungen in der Spring Cloud Stream-Dokumentation.

    • Bindungsstandardfehlerhandler

      Konfigurieren Sie eine einzelne Consumer Bohnen, um alle eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Standardfunktion abonniert jeden eingehenden Bindungsfehlerkanal:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Außerdem müssen Sie die eigenschaft spring.cloud.stream.default.error-handler-definition auf den Funktionsnamen festlegen.

    • Bindungsspezifischer Fehlerhandler

      Konfigurieren Sie eine Consumer Bean, um die spezifischen eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Funktion abonniert den spezifischen Eingehenden Bindungsfehlerkanal mit einer höheren Priorität als der Bindungsstandardfehlerhandler.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Außerdem müssen Sie die eigenschaft spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition auf den Funktionsnamen festlegen.

Dienstbus-Nachrichtenkopfzeilen

Die unterstützten grundlegenden Nachrichtenkopfzeilen finden Sie im Abschnitt Service Bus-Nachrichtenkopfzeilen Abschnitt Spring Cloud Azure-Unterstützung für Spring Integration.

Anmerkung

Beim Festlegen des Partitionsschlüssels ist die Priorität des Nachrichtenkopfs höher als die Spring Cloud Stream-Eigenschaft. Daher wird spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression nur wirksam, wenn keines der ServiceBusMessageHeaders#SESSION_ID und ServiceBusMessageHeaders#PARTITION_KEY Header konfiguriert ist.

Unterstützung mehrerer Ordner

Die Verbindung mit mehreren ServiceBus-Namespaces wird auch mithilfe mehrerer Ordner unterstützt. In diesem Beispiel wird die Verbindungszeichenfolge als Beispiel verwendet. Anmeldeinformationen von Dienstprinzipalen und verwalteten Identitäten werden ebenfalls unterstützt, Benutzer können verwandte Eigenschaften in den Umgebungseinstellungen der einzelnen Ordner festlegen.

  1. Um mehrere Ordner von ServiceBus zu verwenden, konfigurieren Sie die folgenden Eigenschaften in Ihrer application.yml Datei:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Anmerkung

    In der vorherigen Anwendungsdatei wird gezeigt, wie Sie einen einzelnen Standardabfrager für die Anwendung für alle Bindungen konfigurieren. Wenn Sie den Poller für eine bestimmte Bindung konfigurieren möchten, können Sie eine Konfiguration wie spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000verwenden.

  2. wir müssen zwei Lieferanten und zwei Verbraucher definieren

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Ressourcenbereitstellung

Der Servicebusordner unterstützt die Bereitstellung von Warteschlangen, Themen und Abonnements. Benutzer können die folgenden Eigenschaften verwenden, um die Bereitstellung zu aktivieren.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Anmerkung

Die für tenant-id zulässigen Werte sind: common, organizations, consumersoder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt Abschnitt Abschnitt Fehler-AADSTS50020 – Benutzerkonto des Identitätsanbieters ist nicht im Mandantenvorhanden. Informationen zum Konvertieren Ihrer Einzelmandanten-App finden Sie unter Konvertieren einer Einzelmandanten-App in multitenant auf Microsoft Entra ID.

Anpassen von ServiceBus-Clienteigenschaften

Entwickler können AzureServiceClientBuilderCustomizer zum Anpassen von Service Bus-Clienteigenschaften verwenden. Im folgenden Beispiel wird die eigenschaft sessionIdleTimeout in ServiceBusClientBuilderangepasst:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Proben

Weitere Informationen finden Sie in den Azure-spring-boot-samples Repository auf GitHub.