Spring Cloud Azure-Unterstützung für Spring Cloud Stream
Dieser Artikel gilt für:✅ Version 4.19.0 ✅ Version 5.19.0
Spring Cloud Stream ist ein Framework für die Erstellung hoch skalierbarer ereignisgesteuerter Mikroservices, die mit freigegebenen Messagingsystemen verbunden sind.
Das Framework bietet ein flexibles Programmiermodell, das auf bereits etablierten und vertrauten Spring-Idioms und bewährten Methoden basiert. Zu diesen bewährten Methoden gehören Unterstützung für persistente Pub/Sub-Semantik, Consumergruppen und zustandsbehaftete Partitionen.
Zu den aktuellen Binderimplementierungen gehören:
-
spring-cloud-azure-stream-binder-eventhubs
– weitere Informationen finden Sie unter Spring Cloud Stream Binder für Azure Event Hubs -
spring-cloud-azure-stream-binder-servicebus
– weitere Informationen finden Sie unter Spring Cloud Stream Binder für Azure Service Bus
Spring Cloud Stream Binder für Azure Event Hubs
Schlüsselkonzepte
Der Spring Cloud Stream Binder für Azure Event Hubs stellt die Bindungsimplementierung für das Spring Cloud Stream-Framework bereit. Diese Implementierung verwendet Spring Integration Event Hubs-Kanaladapter auf der Grundlage. Aus Der Perspektive des Designs ist Event Hubs ähnlich wie Kafka. Außerdem kann über die Kafka-API auf Event Hubs zugegriffen werden. Wenn Ihr Projekt eine enge Abhängigkeit von der Kafka-API hat, können Sie Events Hub mit kafka-API-Beispiel-
Verbrauchergruppe
Event Hubs bietet ähnliche Unterstützung von Consumergruppen wie Apache Kafka, aber mit leichter unterschiedlicher Logik. Während Kafka alle zugesicherten Offsets im Broker speichert, müssen Sie Offsets von Event Hubs-Nachrichten speichern, die manuell verarbeitet werden. Event Hubs SDK stellt die Funktion zum Speichern solcher Offsets in Azure Storage bereit.
Partitionierungsunterstützung
Event Hubs bietet ein ähnliches Konzept der physischen Partition wie Kafka. Im Gegensatz zu Kafkas automatischer Rebalancing zwischen Verbrauchern und Partitionen bietet Event Hubs jedoch eine Art präemptiver Modus. Das Speicherkonto fungiert als Lease, um zu bestimmen, welcher Verbraucher welche Partition besitzt. Wenn ein neuer Verbraucher beginnt, versucht er, einige Partitionen von den am stärksten geladenen Verbrauchern zu stehlen, um die Arbeitsauslastungsbilanz zu erreichen.
Um die Lastenausgleichsstrategie anzugeben, werden Eigenschaften von spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
bereitgestellt. Weitere Informationen finden Sie im Abschnitt Consumer-Eigenschaften.
Batch-Consumerunterstützung
Spring Cloud Azure Stream Event Hubs-Ordner unterstützt Spring Cloud Stream Batch Consumer Feature.
Um mit dem Batch-Consumer-Modus zu arbeiten, legen Sie die eigenschaft spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
auf true
fest. Wenn diese Option aktiviert ist, wird eine Nachricht mit einer Nutzlast einer Liste von Batchereignissen empfangen und an die Consumer
-Funktion übergeben. Jeder Nachrichtenkopf wird auch in eine Liste konvertiert, deren Inhalt der zugeordnete Headerwert ist, der von jedem Ereignis analysiert wird. Die kommunalen Header der Partitions-ID, des Prüfpunkters und der letzten Enqueued-Eigenschaften werden als einzelner Wert dargestellt, da der gesamte Batch von Ereignissen denselben Wert aufweist. Weitere Informationen finden Sie in den Event Hubs-Nachrichtenkopfzeilen Abschnitt Spring Cloud Azure-Unterstützung für Spring Integration.
Anmerkung
Der Prüfpunktheader ist nur vorhanden, wenn der MANUAL
Prüfpunktmodus verwendet wird.
Die Prüfpunkterstellung von Batch-Consumer unterstützt zwei Modi: BATCH
und MANUAL
.
BATCH
Modus ist ein automatischer Prüfpunktmodus, um den gesamten Batch von Ereignissen zusammen zu überwachen, sobald der Ordner sie empfängt.
MANUAL
Modus besteht darin, die Ereignisse von Benutzern zu überprüfen. Bei Verwendung wird die Checkpointer
an den Nachrichtenkopf übergeben, und Benutzer können sie verwenden, um Prüfpunkte zu erledigen.
Sie können die Batchgröße angeben, indem Sie die eigenschaften max-size
und max-wait-time
festlegen, die über ein Präfix von spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
verfügen. Die max-size
-Eigenschaft ist erforderlich, und die max-wait-time
Eigenschaft ist optional. Weitere Informationen finden Sie im Abschnitt Consumer-Eigenschaften.
Setup von Abhängigkeiten
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Alternativ können Sie auch den Spring Cloud Azure Stream Event Hubs Starter verwenden, wie im folgenden Beispiel für Maven gezeigt:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Konfiguration
Der Ordner stellt die folgenden drei Konfigurationsoptionen bereit:
Verbindungskonfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Event Hubs verwendet werden.
Anmerkung
Wenn Sie einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource verwenden, lesen Sie Autorisieren des Zugriffs mit Microsoft Entra ID, um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.
Konfigurierbare Verbindungseigenschaften von spring-cloud-azure-stream-binder-eventhubs:
Eigentum | Art | Beschreibung |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolesch | Gibt an, ob azure Event Hubs aktiviert ist. |
spring.cloud.azure.eventhubs.connection-string | Schnur | Event Hubs Namespace-Verbindungszeichenfolgenwert. |
spring.cloud.azure.eventhubs.namespace | Schnur | Event Hubs-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DomainName bestehen. |
spring.cloud.azure.eventhubs.domain-name | Schnur | Domänenname eines Azure Event Hubs-Namespacewerts. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Schnur | Benutzerdefinierte Endpunktadresse. |
Trinkgeld
Allgemeine Azure Service SDK-Konfigurationsoptionen können auch für den Feder Cloud Azure Stream Event Hubs-Ordner konfiguriert werden. Die unterstützten Konfigurationsoptionen werden in Spring Cloud Azure-Konfigurationeingeführt und können entweder mit dem einheitlichen Präfix spring.cloud.azure.
oder dem Präfix von spring.cloud.azure.eventhubs.
konfiguriert werden.
Der Ordner unterstützt auch Spring Could Azure Resource Manager standardmäßig. Informationen zum Abrufen der Verbindungszeichenfolge mit Sicherheitsprinzipale, die nicht mit Data
verwandten Rollen gewährt werden, finden Sie im Abschnitt Grundlegende Verwendung Abschnitt Spring Could Azure Resource Manager.
Eigenschaften der Prüfpunktkonfiguration
Dieser Abschnitt enthält die Konfigurationsoptionen für den Speicher-Blobs-Dienst, der zum Beibehalten des Partitionsbesitzes und der Prüfpunktinformationen verwendet wird.
Anmerkung
Ab Version 4.0.0 wird, wenn die Eigenschaft von spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists nicht manuell aktiviert ist, kein Speichercontainer automatisch mit dem Namen aus spring.cloud.stream.bindings.binding-name.destination.destinationerstellt.
Prüfpunkt konfigurierbarer Eigenschaften von spring-cloud-azure-stream-binder-eventhubs:
Eigentum | Art | Beschreibung |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Boolesch | Gibt an, ob das Erstellen von Containern zulässig ist, falls nicht vorhanden. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Schnur | Name für das Speicherkonto. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Schnur | Zugriffsschlüssel für Speicherkonto. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Schnur | Name des Speichercontainers. |
Trinkgeld
Allgemeine Azure Service SDK-Konfigurationsoptionen sind auch für den Speicherspeicher für Speicher-Blob-Prüfpunktspeicher konfigurierbar. Die unterstützten Konfigurationsoptionen werden in Spring Cloud Azure-Konfigurationeingeführt und können entweder mit dem einheitlichen Präfix spring.cloud.azure.
oder dem Präfix von spring.cloud.azure.eventhubs.processor.checkpoint-store
konfiguriert werden.
Azure Event Hubs Binding-Konfigurationseigenschaften
Die folgenden Optionen sind in vier Abschnitte unterteilt: Consumer Properties, Advanced Consumer Configurations, Producer Properties and Advanced Producer Configurations.
Consumereigenschaften
Diese Eigenschaften werden über EventHubsConsumerProperties
verfügbar gemacht.
Anmerkung
Um Wiederholungen zu vermeiden, unterstützt Spring Cloud Azure Stream Binder Event Hubs seit Version 4.19.0 und 5.19.0 das Festlegen von Werten für alle Kanäle im Format von spring.cloud.stream.eventhubs.default.consumer.<property>=<value>
.
Konfigurierbare Verbrauchereigenschaften von spring-cloud-azure-stream-binder-eventhubs:
Eigentum | Art | Beschreibung |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | Prüfpunktmodus, der verwendet wird, wenn Verbraucher entscheiden, wie die Prüfpunktmeldung ausgeführt werden soll |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Ganze Zahl | Bestimmt die Nachrichtenmenge für jede Partition, um einen Prüfpunkt zu erledigen. Wird nur wirksam, wenn PARTITION_COUNT Prüfpunktmodus verwendet wird. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Dauer | Legt das Zeitintervall fest, um einen Prüfpunkt zu erledigen. Wird nur wirksam, wenn TIME Prüfpunktmodus verwendet wird. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max Größe | Ganze Zahl | Die maximale Anzahl von Ereignissen in einem Batch. Erforderlich für den Batch-Consumer-Modus. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Dauer | Die maximale Zeitdauer für die Batchaufwärmung. Wird nur wirksam, wenn der Batch-Consumer-Modus aktiviert ist und optional ist. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Dauer | Die Intervallzeitdauer für die Aktualisierung. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | Die Lastenausgleichsstrategie. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Dauer | Die Zeitdauer, nach der der Besitz der Partition abläuft. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Boolesch | Gibt an, ob der Ereignisprozessor Informationen zum letzten enqueued-Ereignis auf der zugehörigen Partition anfordern soll und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Ganze Zahl | Die Anzahl, die vom Consumer verwendet wird, um die Anzahl der Ereignisse zu steuern, die der Event Hub-Consumer aktiv empfängt und lokal in die Warteschlange stellt. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Zuordnen des Schlüssels als Partitions-ID und Werte von StartPositionProperties |
Die Zuordnung, die die Ereignisposition enthält, die für jede Partition verwendet werden soll, wenn kein Prüfpunkt für die Partition im Prüfpunktspeicher vorhanden ist. Diese Zuordnung wird von der Partitions-ID abgeschlüsselt. |
Anmerkung
Die initial-partition-event-position
Konfiguration akzeptiert eine map
, um die Anfangsposition für jeden Event Hub anzugeben. Daher ist der Schlüssel die Partitions-ID, und der Wert ist von StartPositionProperties
, die Eigenschaften von Offset, Sequenznummer, enqueuierte Datumszeit und ob einschließlich enthält. Sie können sie z. B. als
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Erweiterte Consumerkonfiguration
Die oben Verbindung, Prüfpunkt-und allgemeinen Azure SDK-Client Konfigurationsunterstützung für jeden Ordnerverbraucher, den Sie mit dem Präfix spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
konfigurieren können.
Produzenteneigenschaften
Diese Eigenschaften werden über EventHubsProducerProperties
verfügbar gemacht.
Anmerkung
Um Wiederholungen zu vermeiden, unterstützt Spring Cloud Azure Stream Binder Event Hubs seit Version 4.19.0 und 5.19.0 das Festlegen von Werten für alle Kanäle im Format von spring.cloud.stream.eventhubs.default.producer.<property>=<value>
.
Konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-eventhubs:
Eigentum | Art | Beschreibung |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | boolesch | Die Switch-Kennzeichnung für die Synchronisierung des Produzenten. Wenn true, wartet der Produzent nach einem Sendevorgang auf eine Antwort. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | lang | Die Zeitspanne, die nach einem Sendevorgang auf eine Antwort wartet. Wird nur wirksam, wenn ein Synchronisierungsproduzent aktiviert ist. |
Erweiterte Produzentenkonfiguration
Die oben Verbindung und allgemeinen Azure SDK-Client Konfigurationsunterstützung für jeden Ordnerhersteller, den Sie mit dem Präfix spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
konfigurieren können.
Grundlegende Nutzung
Senden und Empfangen von Nachrichten von/an Event Hubs
Füllen Sie die Konfigurationsoptionen mit Anmeldeinformationen aus.
Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in Ihrer application.yml-Datei:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Anmerkung
Die für tenant-id
zulässigen Werte sind: common
, organizations
, consumers
oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt
Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Definieren Sie Lieferanten und Verbraucher.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Partitionierungsunterstützung
Es wird eine PartitionSupplier
mit vom Benutzer bereitgestellten Partitionsinformationen erstellt, um die Partitionsinformationen über die zu sendende Nachricht zu konfigurieren. Das folgende Flussdiagramm zeigt den Vorgang zum Abrufen verschiedener Prioritäten für die Partitions-ID und den Schlüssel:
Batch-Consumerunterstützung
Stellen Sie die Batchkonfigurationsoptionen bereit, wie im folgenden Beispiel gezeigt:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Definieren Sie Lieferanten und Verbraucher.
Für den Prüfpunktmodus als
BATCH
können Sie den folgenden Code verwenden, um Nachrichten zu senden und in Batches zu verwenden.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Für den Prüfpunktmodus als
MANUAL
können Sie den folgenden Code verwenden, um Nachrichten zu senden und Nachrichten in Batches zu nutzen/zu nutzen.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Anmerkung
Im Batch-Nutzungsmodus ist der Standardinhaltstyp des Spring Cloud Stream-Ordners application/json
. Stellen Sie daher sicher, dass die Nachrichtennutzlast am Inhaltstyp ausgerichtet ist. Wenn Sie z. B. den Standardinhaltstyp application/json
verwenden, um Nachrichten mit String
Nutzlast zu empfangen, sollte die Nutzlast JSON String
sein, umgeben von doppelten Anführungszeichen für den ursprünglichen String
Text. Bei text/plain
Inhaltstyp kann es sich um ein String
-Objekt handeln. Weitere Informationen finden Sie unter Spring Cloud Stream Content Type Negotiation.
Behandeln von Fehlermeldungen
Behandeln von Ausgehenden Bindungsfehlermeldungen
Standardmäßig erstellt Spring Integration einen globalen Fehlerkanal namens
errorChannel
. Konfigurieren Sie den folgenden Nachrichtenendpunkt, um ausgehende Bindungsfehlermeldungen zu behandeln.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Behandeln eingehender Bindungsfehlermeldungen
Spring Cloud Stream Event Hubs Binder unterstützt eine Lösung zur Behandlung von Fehlern für eingehende Nachrichtenbindungen: Fehlerhandler.
Fehlerhandler:
Spring Cloud Stream macht einen Mechanismus verfügbar, mit dem Sie einen benutzerdefinierten Fehlerhandler bereitstellen können, indem sie eine
Consumer
hinzufügen, dieErrorMessage
Instanzen akzeptiert. Weitere Informationen finden Sie unter Behandeln von Fehlermeldungen in der Spring Cloud Stream-Dokumentation.Bindungsstandardfehlerhandler
Konfigurieren Sie eine einzelne
Consumer
Bohnen, um alle eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Standardfunktion abonniert jeden eingehenden Bindungsfehlerkanal:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Außerdem müssen Sie die eigenschaft
spring.cloud.stream.default.error-handler-definition
auf den Funktionsnamen festlegen.Bindungsspezifischer Fehlerhandler
Konfigurieren Sie eine
Consumer
Bean, um die spezifischen eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Funktion abonniert den spezifischen Fehlerkanal für eingehende Bindung und hat eine höhere Priorität als der Bindungsstandardfehlerhandler:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
Außerdem müssen Sie die eigenschaft
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
auf den Funktionsnamen festlegen.
Event Hubs-Nachrichtenkopfzeilen
Die unterstützten grundlegenden Nachrichtenkopfzeilen finden Sie im Abschnitt Event Hubs-Nachrichtenkopfzeilen Abschnitt Spring Cloud Azure-Unterstützung für Spring Integration.
Unterstützung mehrerer Ordner
Die Verbindung mit mehreren Event Hubs-Namespaces wird auch mithilfe mehrerer Ordner unterstützt. In diesem Beispiel wird eine Verbindungszeichenfolge als Beispiel verwendet. Anmeldeinformationen von Dienstprinzipalen und verwalteten Identitäten werden ebenfalls unterstützt. Sie können verwandte Eigenschaften in den Umgebungseinstellungen der einzelnen Ordner festlegen.
Um mehrere Ordner mit Event Hubs zu verwenden, konfigurieren Sie die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Anmerkung
In der vorherigen Anwendungsdatei wird gezeigt, wie Sie einen einzelnen Standardabfrager für die Anwendung für alle Bindungen konfigurieren. Wenn Sie den Poller für eine bestimmte Bindung konfigurieren möchten, können Sie eine Konfiguration wie
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
verwenden.Wir müssen zwei Lieferanten und zwei Verbraucher definieren:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Ressourcenbereitstellung
Event Hubs-Ordner unterstützt die Bereitstellung von Event Hub- und Consumergruppen. Benutzer können die folgenden Eigenschaften verwenden, um die Bereitstellung zu aktivieren.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Anmerkung
Die für tenant-id
zulässigen Werte sind: common
, organizations
, consumers
oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt
Proben
Weitere Informationen finden Sie in den Azure-spring-boot-samples Repository auf GitHub.
Spring Cloud Stream Binder für Azure Service Bus
Schlüsselkonzepte
Der Spring Cloud Stream Binder für Azure Service Bus stellt die Bindungsimplementierung für das Spring Cloud Stream Framework bereit. Diese Implementierung verwendet Spring Integration Service Bus Channel Adapter auf der Grundlage.
Geplante Nachricht
Dieser Ordner unterstützt das Senden von Nachrichten an ein Thema zur verzögerten Verarbeitung. Benutzer können geplante Nachrichten mit Kopfzeile x-delay
in Millisekunden eine Verzögerungszeit für die Nachricht ausdrücken. Die Nachricht wird nach x-delay
Millisekunden an die jeweiligen Themen übermittelt.
Verbrauchergruppe
Service Bus Topic bietet ähnliche Unterstützung der Consumergruppe wie Apache Kafka, aber mit leichter unterschiedlicher Logik.
Dieser Ordner basiert auf Subscription
eines Themas, das als Verbrauchergruppe fungiert.
Setup von Abhängigkeiten
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Alternativ können Sie auch den Spring Cloud Azure Stream Service Bus Starter verwenden, wie im folgenden Beispiel für Maven gezeigt:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Konfiguration
Der Ordner stellt die folgenden beiden Konfigurationsoptionen bereit:
Verbindungskonfigurationseigenschaften
Dieser Abschnitt enthält die Konfigurationsoptionen, die zum Herstellen einer Verbindung mit Azure Service Bus verwendet werden.
Anmerkung
Wenn Sie einen Sicherheitsprinzipal zum Authentifizieren und Autorisieren mit Microsoft Entra-ID für den Zugriff auf eine Azure-Ressource verwenden, lesen Sie Autorisieren des Zugriffs mit Microsoft Entra ID, um sicherzustellen, dass dem Sicherheitsprinzipal die ausreichende Berechtigung für den Zugriff auf die Azure-Ressource gewährt wurde.
Konfigurierbare Verbindungseigenschaften von spring-cloud-azure-stream-binder-servicebus:
Eigentum | Art | Beschreibung |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolesch | Gibt an, ob ein Azure Service Bus aktiviert ist. |
spring.cloud.azure.servicebus.connection-string | Schnur | Service Bus-Namespace-Verbindungszeichenfolgenwert. |
spring.cloud.azure.servicebus.custom-endpoint-address | Schnur | Die benutzerdefinierte Endpunktadresse, die beim Herstellen einer Verbindung mit Service Bus verwendet werden soll. |
spring.cloud.azure.servicebus.namespace | Schnur | Service Bus-Namespacewert, der das Präfix des FQDN ist. Ein FQDN sollte aus NamespaceName.DomainName bestehen. |
spring.cloud.azure.servicebus.domain-name | Schnur | Domänenname eines Azure Service Bus-Namespacewerts. |
Anmerkung
Allgemeine Azure Service SDK-Konfigurationsoptionen können auch für den Feder Cloud Azure Stream Service Bus-Ordner konfiguriert werden. Die unterstützten Konfigurationsoptionen werden in Spring Cloud Azure-Konfigurationeingeführt und können entweder mit dem einheitlichen Präfix spring.cloud.azure.
oder dem Präfix von spring.cloud.azure.servicebus.
konfiguriert werden.
Der Ordner unterstützt auch Spring Could Azure Resource Manager standardmäßig. Informationen zum Abrufen der Verbindungszeichenfolge mit Sicherheitsprinzipale, die nicht mit Data
verwandten Rollen gewährt werden, finden Sie im Abschnitt Grundlegende Verwendung Abschnitt Spring Could Azure Resource Manager.
Azure Service Bus-Bindungskonfigurationseigenschaften
Die folgenden Optionen sind in vier Abschnitte unterteilt: Consumer Properties, Advanced Consumer Configurations, Producer Properties and Advanced Producer Configurations.
Consumereigenschaften
Diese Eigenschaften werden über ServiceBusConsumerProperties
verfügbar gemacht.
Anmerkung
Um Wiederholungen zu vermeiden, unterstützt Spring Cloud Azure Stream Binder Service Bus seit Version 4.19.0 und 5.19.0 das Festlegen von Werten für alle Kanäle im Format von spring.cloud.stream.servicebus.default.consumer.<property>=<value>
.
Konfigurierbare Verbrauchereigenschaften von spring-cloud-azure-stream-binder-servicebus:
Eigentum | Art | Vorgabe | Beschreibung |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | boolesch | FALSCH | Wenn die fehlgeschlagenen Nachrichten an den DLQ weitergeleitet werden. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Ganze Zahl | 1 | Max. gleichzeitige Nachrichten, die der Dienstbusprozessorclient verarbeiten soll. Wenn die Sitzung aktiviert ist, gilt sie für jede Sitzung. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-gleichzeitige Sitzungen | Ganze Zahl | null | Maximale Anzahl gleichzeitiger Sitzungen, die zu einem bestimmten Zeitpunkt verarbeitet werden sollen. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Boolesch | null | Gibt an, ob die Sitzung aktiviert ist. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Ganze Zahl | 0 | Die Prefetch-Anzahl des Service Bus-Prozessorclients. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | SubQueue | nichts | Der Typ der Unterwarteschlange, mit der eine Verbindung hergestellt werden soll. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Dauer | 5m | Die Zeitspanne, um die automatische Verlängerung der Sperre fortzusetzen. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Der Empfangsmodus des Service Bus-Prozessorclients. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Boolesch | STIMMT | Gibt an, ob Nachrichten automatisch abgleichen sollen. Wenn dieser Wert auf "false" festgelegt ist, wird eine Nachrichtenkopfzeile von Checkpointer hinzugefügt, damit Entwickler Nachrichten manuell abgleichen können. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes | Long | 1024 | Die maximale Größe der Warteschlange/des Themas in Megabyte, bei der es sich um die Größe des arbeitsspeichers handelt, der für die Warteschlange/das Thema zugeordnet ist. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Dauer | P10675199DT2H48M5.4775807S. (10675199 Tage, 2 Stunden, 48 Minuten, 5 Sekunden und 477 Millisekunden) | Die Dauer, nach der die Nachricht abläuft, beginnend ab dem Zeitpunkt, an den die Nachricht an Service Bus gesendet wird. |
Wichtig
Wenn Sie den Azure Resource Manager (ARM) verwenden, müssen Sie die eigenschaft spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type
konfigurieren. Weitere Informationen finden Sie im Beispiel servicebus-queue-binder-arm auf GitHub.
Erweiterte Consumerkonfiguration
Die oben genannten Verbindung und allgemeinen Azure SDK-Client Konfigurationsunterstützung für jeden Ordnerverbraucher, den Sie mit dem Präfix spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
konfigurieren können.
Produzenteneigenschaften
Diese Eigenschaften werden über ServiceBusProducerProperties
verfügbar gemacht.
Anmerkung
Um Wiederholungen zu vermeiden, unterstützt Spring Cloud Azure Stream Binder Service Bus seit Version 4.19.0 und 5.19.0 das Festlegen von Werten für alle Kanäle im Format von spring.cloud.stream.servicebus.default.producer.<property>=<value>
.
Konfigurierbare Eigenschaften von spring-cloud-azure-stream-binder-servicebus:
Eigentum | Art | Vorgabe | Beschreibung |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | boolesch | FALSCH | Switch flag for sync of producer. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | lang | 10000 | Timeoutwert für das Senden des Produzenten. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | null | Service Bus-Entitätstyp des Produzenten, erforderlich für den Bindungshersteller. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes | Long | 1024 | Die maximale Größe der Warteschlange/des Themas in Megabyte, bei der es sich um die Größe des arbeitsspeichers handelt, der für die Warteschlange/das Thema zugeordnet ist. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Dauer | P10675199DT2H48M5.4775807S. (10675199 Tage, 2 Stunden, 48 Minuten, 5 Sekunden und 477 Millisekunden) | Die Dauer, nach der die Nachricht abläuft, beginnend ab dem Zeitpunkt, an den die Nachricht an Service Bus gesendet wird. |
Wichtig
Bei Verwendung des Bindungsherstellers muss die Eigenschaft spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
konfiguriert werden.
Erweiterte Produzentenkonfiguration
Die oben Verbindung und allgemeinen Azure SDK-Client Konfigurationsunterstützung für jeden Ordnerhersteller, den Sie mit dem Präfix spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
konfigurieren können.
Grundlegende Nutzung
Senden und Empfangen von Nachrichten von/an Service Bus
Füllen Sie die Konfigurationsoptionen mit Anmeldeinformationen aus.
Konfigurieren Sie für Anmeldeinformationen als Verbindungszeichenfolge die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Konfigurieren Sie für Anmeldeinformationen als Dienstprinzipal die folgenden Eigenschaften in Ihrer application.yml-Datei:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Anmerkung
Die für tenant-id
zulässigen Werte sind: common
, organizations
, consumers
oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt
Konfigurieren Sie für Anmeldeinformationen als verwaltete Identitäten die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Definieren Sie Lieferanten und Verbraucher.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Unterstützung des Partitionsschlüssels
Der Ordner unterstützt Service Bus-Partitionierung, indem die Einstellung des Partitionsschlüssels und der Sitzungs-ID im Nachrichtenkopf zugelassen wird. In diesem Abschnitt wird erläutert, wie Sie den Partitionsschlüssel für Nachrichten festlegen.
Spring Cloud Stream stellt eine SpEL-Ausdruckseigenschaft des Partitionsschlüssels spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
bereit. Legen Sie diese Eigenschaft beispielsweise als "'partitionKey-' + headers[<message-header-key>]"
fest, und fügen Sie eine Kopfzeile namens "Message-header-key" hinzu. Spring Cloud Stream verwendet den Wert für diesen Header beim Auswerten des Ausdrucks, um einen Partitionsschlüssel zuzuweisen. Der folgende Code stellt einen Beispielhersteller bereit:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Sitzungsunterstützung
Der Ordner unterstützt Nachrichtensitzungen service bus. Die Sitzungs-ID einer Nachricht kann über den Nachrichtenkopf festgelegt werden.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Anmerkung
Gemäß Dienstbuspartitionierunghat die Sitzungs-ID eine höhere Priorität als Partitionsschlüssel. Wenn also sowohl ServiceBusMessageHeaders#SESSION_ID
- als auch ServiceBusMessageHeaders#PARTITION_KEY
Header festgelegt werden, wird der Wert der Sitzungs-ID schließlich verwendet, um den Wert des Partitionsschlüssels zu überschreiben.
Behandeln von Fehlermeldungen
Behandeln von Ausgehenden Bindungsfehlermeldungen
Standardmäßig erstellt Spring Integration einen globalen Fehlerkanal namens
errorChannel
. Konfigurieren Sie den folgenden Nachrichtenendpunkt, um die Fehlermeldung für ausgehende Bindung zu behandeln.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Behandeln eingehender Bindungsfehlermeldungen
Spring Cloud Stream Service Bus Binder unterstützt zwei Lösungen zur Behandlung von Fehlern für eingehende Nachrichtenbindungen: der Ordnerfehlerhandler und -handler.
Binder-Fehlerhandler:
Der Standardordnerfehlerhandler behandelt die eingehende Bindung. Sie verwenden diesen Handler, um fehlgeschlagene Nachrichten an die Warteschlange mit inaktiven Buchstaben zu senden, wenn
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
aktiviert ist. Andernfalls werden die fehlgeschlagenen Nachrichten abgebrochen. Der Sammelmappenfehlerhandler schließt sich gegenseitig mit anderen bereitgestellten Fehlerhandlern aus.Fehlerhandler:
Spring Cloud Stream macht einen Mechanismus verfügbar, mit dem Sie einen benutzerdefinierten Fehlerhandler bereitstellen können, indem sie eine
Consumer
hinzufügen, dieErrorMessage
Instanzen akzeptiert. Weitere Informationen finden Sie unter Behandeln von Fehlermeldungen in der Spring Cloud Stream-Dokumentation.Bindungsstandardfehlerhandler
Konfigurieren Sie eine einzelne
Consumer
Bohnen, um alle eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Standardfunktion abonniert jeden eingehenden Bindungsfehlerkanal:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Außerdem müssen Sie die eigenschaft
spring.cloud.stream.default.error-handler-definition
auf den Funktionsnamen festlegen.Bindungsspezifischer Fehlerhandler
Konfigurieren Sie eine
Consumer
Bean, um die spezifischen eingehenden Bindungsfehlermeldungen zu nutzen. Die folgende Funktion abonniert den spezifischen Eingehenden Bindungsfehlerkanal mit einer höheren Priorität als der Bindungsstandardfehlerhandler.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Außerdem müssen Sie die eigenschaft
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
auf den Funktionsnamen festlegen.
Dienstbus-Nachrichtenkopfzeilen
Die unterstützten grundlegenden Nachrichtenkopfzeilen finden Sie im Abschnitt Service Bus-Nachrichtenkopfzeilen Abschnitt Spring Cloud Azure-Unterstützung für Spring Integration.
Anmerkung
Beim Festlegen des Partitionsschlüssels ist die Priorität des Nachrichtenkopfs höher als die Spring Cloud Stream-Eigenschaft. Daher wird spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
nur wirksam, wenn keines der ServiceBusMessageHeaders#SESSION_ID
und ServiceBusMessageHeaders#PARTITION_KEY
Header konfiguriert ist.
Unterstützung mehrerer Ordner
Die Verbindung mit mehreren ServiceBus-Namespaces wird auch mithilfe mehrerer Ordner unterstützt. In diesem Beispiel wird die Verbindungszeichenfolge als Beispiel verwendet. Anmeldeinformationen von Dienstprinzipalen und verwalteten Identitäten werden ebenfalls unterstützt, Benutzer können verwandte Eigenschaften in den Umgebungseinstellungen der einzelnen Ordner festlegen.
Um mehrere Ordner von ServiceBus zu verwenden, konfigurieren Sie die folgenden Eigenschaften in Ihrer application.yml Datei:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Anmerkung
In der vorherigen Anwendungsdatei wird gezeigt, wie Sie einen einzelnen Standardabfrager für die Anwendung für alle Bindungen konfigurieren. Wenn Sie den Poller für eine bestimmte Bindung konfigurieren möchten, können Sie eine Konfiguration wie
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
verwenden.wir müssen zwei Lieferanten und zwei Verbraucher definieren
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Ressourcenbereitstellung
Der Servicebusordner unterstützt die Bereitstellung von Warteschlangen, Themen und Abonnements. Benutzer können die folgenden Eigenschaften verwenden, um die Bereitstellung zu aktivieren.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Anmerkung
Die für tenant-id
zulässigen Werte sind: common
, organizations
, consumers
oder die Mandanten-ID. Weitere Informationen zu diesen Werten finden Sie im Abschnitt
Anpassen von ServiceBus-Clienteigenschaften
Entwickler können AzureServiceClientBuilderCustomizer
zum Anpassen von Service Bus-Clienteigenschaften verwenden. Im folgenden Beispiel wird die eigenschaft sessionIdleTimeout
in ServiceBusClientBuilder
angepasst:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Proben
Weitere Informationen finden Sie in den Azure-spring-boot-samples Repository auf GitHub.