Spring Cloud Azure-ondersteuning voor Spring Cloud Stream
Dit artikel is van toepassing op:✅ versie 4.19.0 ✅ versie 5.19.0
Spring Cloud Stream is een framework voor het bouwen van uiterst schaalbare, gebeurtenisgestuurde microservices die zijn verbonden met gedeelde berichtensystemen.
Het framework biedt een flexibel programmeermodel dat is gebaseerd op reeds gevestigde en bekende Spring-idiomen en best practices. Deze aanbevolen procedures omvatten ondersteuning voor permanente pub-/subsemantiek, consumentengroepen en stateful partities.
Huidige binder-implementaties zijn onder andere:
-
spring-cloud-azure-stream-binder-eventhubs
: zie Spring Cloud Stream Binder voor Azure Event Hubs voor meer informatie -
spring-cloud-azure-stream-binder-servicebus
: zie Spring Cloud Stream Binder voor Azure Service Bus
Spring Cloud Stream Binder voor Azure Event Hubs
Sleutelbegrippen
Spring Cloud Stream Binder voor Azure Event Hubs biedt de bindende implementatie voor het Spring Cloud Stream-framework. Deze implementatie maakt gebruik van Spring Integration Event Hubs-kanaaladapters bij de basis. Vanuit het oogpunt van het ontwerp is Event Hubs vergelijkbaar met Kafka. Event Hubs kunnen ook worden geopend via kafka-API. Als uw project nauw afhankelijk is van de Kafka-API, kunt u proberen Events Hub met kafka API-voorbeeld
Consumentengroep
Event Hubs biedt vergelijkbare ondersteuning voor consumentengroepen als Apache Kafka, maar met iets andere logica. Hoewel Kafka alle vastgelegde offsets in de broker opslaat, moet u offsets opslaan van Event Hubs-berichten die handmatig worden verwerkt. Event Hubs SDK biedt de functie voor het opslaan van dergelijke offsets in Azure Storage.
Ondersteuning voor partitionering
Event Hubs biedt een vergelijkbaar concept van fysieke partitie als Kafka. Maar in tegenstelling tot de automatische herverdeling tussen consumenten en partities van Kafka, biedt Event Hubs een soort preventieve modus. Het opslagaccount fungeert als lease om te bepalen welke consument eigenaar is van welke partitie. Wanneer een nieuwe consument wordt gestart, wordt geprobeerd om enkele partities te stelen van de meest zwaar belaste consumenten om het werkbelastingsaldo te bereiken.
Om de taakverdelingsstrategie op te geven, worden de eigenschappen van spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
opgegeven. Zie de sectie Consumenteneigenschappen voor meer informatie.
Ondersteuning voor Batch-consumenten
Spring Cloud Azure Stream Event Hubs-binder ondersteunt Spring Cloud Stream Batch Consumer-functie.
Als u met de batchconsumermodus wilt werken, stelt u de eigenschap spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
in op true
. Wanneer dit is ingeschakeld, wordt een bericht met een nettolading van een lijst met batchgebeurtenissen ontvangen en doorgegeven aan de functie Consumer
. Elke berichtkop wordt ook geconverteerd naar een lijst, waarvan de inhoud de bijbehorende headerwaarde is die wordt geparseerd uit elke gebeurtenis. De gemeenschappelijke headers van partitie-id, controlepunt en laatste enqueued eigenschappen worden weergegeven als één waarde omdat de hele batch gebeurtenissen dezelfde waarde deelt. Zie de sectie Event Hubs-berichtkoppen sectie van Spring Cloud Azure-ondersteuning voor Spring Integrationvoor meer informatie.
Notitie
De koptekst van het controlepunt bestaat alleen wanneer de MANUAL
controlepuntmodus wordt gebruikt.
Controlepunten van batchconsumer ondersteunen twee modi: BATCH
en MANUAL
.
BATCH
modus is een modus voor automatische controlepunten om de hele batch gebeurtenissen samen te controleren zodra de binder deze ontvangt.
MANUAL
modus is het controleren van de gebeurtenissen door gebruikers. Wanneer dit wordt gebruikt, wordt de Checkpointer
doorgegeven aan de berichtkop en kunnen gebruikers dit gebruiken om controlepunten uit te voeren.
U kunt de batchgrootte opgeven door de eigenschappen max-size
en max-wait-time
in te stellen met een voorvoegsel van spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
. De eigenschap max-size
is nodig en de eigenschap max-wait-time
is optioneel. Zie de sectie Consumenteneigenschappen voor meer informatie.
Afhankelijkheid instellen
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
U kunt ook de Spring Cloud Azure Stream Event Hubs Starter gebruiken, zoals wordt weergegeven in het volgende voorbeeld voor Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Configuratie
De binder biedt de volgende drie onderdelen van configuratieopties:
Eigenschappen van verbindingsconfiguratie
Deze sectie bevat de configuratieopties die worden gebruikt voor het maken van verbinding met Azure Event Hubs.
Notitie
Als u ervoor kiest om een beveiligingsprincipaal te gebruiken om te verifiëren en autoriseren met Microsoft Entra ID voor toegang tot een Azure-resource, raadpleegt u Toegang autoriseren met Microsoft Entra ID om ervoor te zorgen dat de beveiligingsprincipaal de juiste machtiging heeft gekregen voor toegang tot de Azure-resource.
Configureerbare verbindingseigenschappen van spring-cloud-azure-stream-binder-eventhubs:
Eigenschap | Type | Beschrijving |
---|---|---|
spring.cloud.azure.eventhubs.enabled | booleaans | Of een Azure Event Hubs is ingeschakeld. |
spring.cloud.azure.eventhubs.connection-string | Snaar | Waarde van verbindingsreeks voor Event Hubs-naamruimte. |
spring.cloud.azure.eventhubs.namespace | Snaar | Event Hubs-naamruimtewaarde, het voorvoegsel van de FQDN. Een FQDN moet bestaan uit NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Snaar | Domeinnaam van een Azure Event Hubs-naamruimtewaarde. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Snaar | Aangepast eindpuntadres. |
Fooi
Algemene configuratieopties voor Azure Service SDK kunnen ook worden geconfigureerd voor de Event Hubs-binder van Spring Cloud Azure Stream. De ondersteunde configuratieopties worden geïntroduceerd in Spring Cloud Azure-configuratieen kunnen worden geconfigureerd met het geïntegreerde voorvoegsel spring.cloud.azure.
of het voorvoegsel van spring.cloud.azure.eventhubs.
.
De binder ondersteunt ook standaard Spring Could Azure Resource Manager-. Zie het gedeelte Data
van Spring Could Azure Resource Managervoor meer informatie over het ophalen van de verbindingsreeks met beveiligingsprinciplen die niet worden verleend met gerelateerde rollen.
Eigenschappen van controlepuntconfiguratie
Deze sectie bevat de configuratieopties voor de Storage Blobs-service, die wordt gebruikt voor het behouden van het eigendom van de partitie en controlepuntgegevens.
Notitie
Wanneer vanaf versie 4.0.0 de eigenschap van spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists is niet handmatig is ingeschakeld, wordt er geen opslagcontainer automatisch gemaakt met de naam van spring.cloud.stream.bindings.bindings.binding-name.destination.
Controlepunt configureerbare eigenschappen van spring-cloud-azure-stream-binder-eventhubs:
Eigenschap | Type | Beschrijving |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booleaans | Of u containers wilt maken als deze niet bestaat. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Snaar | Naam voor het opslagaccount. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Snaar | Toegangssleutel voor opslagaccount. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Snaar | Naam van opslagcontainer. |
Fooi
Algemene configuratieopties voor Azure Service SDK kunnen ook worden geconfigureerd voor opslagblobcontrolepuntopslag. De ondersteunde configuratieopties worden geïntroduceerd in Spring Cloud Azure-configuratieen kunnen worden geconfigureerd met het geïntegreerde voorvoegsel spring.cloud.azure.
of het voorvoegsel van spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Azure Event Hubs Binding-configuratie-eigenschappen
De volgende opties zijn onderverdeeld in vier secties: Consumer Properties, Advanced Consumer Configurations, Producer Properties en Advanced Producer Configurations.
Consumenteneigenschappen
Deze eigenschappen worden weergegeven via EventHubsConsumerProperties
.
Notitie
Om herhaling te voorkomen, ondersteunt Spring Cloud Azure Stream Binder Event Hubs, sinds versie 4.19.0 en 5.19.0, instellingen voor alle kanalen, in de indeling van spring.cloud.stream.eventhubs.default.consumer.<property>=<value>
.
Configureerbare eigenschappen van spring-cloud-azure-stream-binder-eventhubs:
Eigenschap | Type | Beschrijving |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | Controlepuntmodus die wordt gebruikt wanneer de consument besluit hoe een controlepuntbericht moet worden verzonden |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | Geheel getal | Bepaalt de hoeveelheid bericht voor elke partitie om één controlepunt uit te voeren. Wordt alleen van kracht wanneer PARTITION_COUNT controlepuntmodus wordt gebruikt. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | Duur | Hiermee bepaalt u het tijdsinterval om één controlepunt uit te voeren. Wordt alleen van kracht wanneer TIME controlepuntmodus wordt gebruikt. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max grootte | Geheel getal | Het maximum aantal gebeurtenissen in een batch. Vereist voor de batchconsumermodus. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Duur | De maximale tijdsduur voor batchverwerking. Wordt alleen van kracht wanneer de batchconsumermodus is ingeschakeld en is optioneel. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Duur | De intervaltijdsduur voor het bijwerken. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | De taakverdelingsstrategie. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Duur | De tijdsduur waarna het eigendom van de partitie verloopt. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Booleaans | Of de gebeurtenisprocessor informatie moet aanvragen over de laatste enqueuedgebeurtenis op de bijbehorende partitie en die informatie moet bijhouden wanneer gebeurtenissen worden ontvangen. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Geheel getal | Het aantal dat door de consument wordt gebruikt om het aantal gebeurtenissen te beheren dat de Event Hub-consument actief ontvangt en in de wachtrij plaatst. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Wijs de sleutel toe als de partitie-id en waarden van StartPositionProperties |
De kaart met de gebeurtenispositie die voor elke partitie moet worden gebruikt als er geen controlepunt voor de partitie bestaat in het controlepuntarchief. Deze kaart is uitgeschakeld op de partitie-id. |
Notitie
De initial-partition-event-position
-configuratie accepteert een map
om de initiële positie voor elke Event Hub op te geven. De sleutel is dus de partitie-id en de waarde is van StartPositionProperties
, waaronder eigenschappen van offset, volgnummer, datum en inclusief. U kunt deze bijvoorbeeld instellen als
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Geavanceerde consumentenconfiguratie
De bovenstaande verbinding, controlepunten algemene configuratieondersteuning voor Azure SDK-client voor elke bindergebruiker, die u kunt configureren met het voorvoegsel spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
.
Producenteigenschappen
Deze eigenschappen worden weergegeven via EventHubsProducerProperties
.
Notitie
Om herhaling te voorkomen, ondersteunt Spring Cloud Azure Stream Binder Event Hubs, sinds versie 4.19.0 en 5.19.0, instellingen voor alle kanalen, in de indeling van spring.cloud.stream.eventhubs.default.producer.<property>=<value>
.
Configureerbare eigenschappen van spring-cloud-azure-stream-binder-eventhubs:
Eigenschap | Type | Beschrijving |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | booleaans | De schakelvlag voor synchronisatie van producent. Indien waar, wacht de producent op een antwoord na een verzendbewerking. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | lang | De hoeveelheid tijd die moet worden gewacht op een antwoord na een verzendbewerking. Wordt alleen van kracht wanneer een synchronisatieproducent is ingeschakeld. |
Geavanceerde configuratie van producent
De bovenstaande verbinding en algemene azure SDK-client configuratieondersteuning voor elke binderproducent, die u kunt configureren met het voorvoegsel spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
.
Basisgebruik
Berichten verzenden en ontvangen van/naar Event Hubs
Vul de configuratieopties in met referentiegegevens.
Voor referenties als verbindingsreeks configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Voor referenties als service-principal configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Notitie
De waarden die zijn toegestaan voor tenant-id
zijn: common
, organizations
, consumers
of de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.
Voor referenties als beheerde identiteiten configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Leverancier en consument definiëren.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Ondersteuning voor partitionering
Er wordt een PartitionSupplier
met door de gebruiker verstrekte partitiegegevens gemaakt om de partitiegegevens te configureren over het te verzenden bericht. In het volgende stroomdiagram ziet u het proces voor het verkrijgen van verschillende prioriteiten voor de partitie-id en sleutel:
Ondersteuning voor Batch-consumenten
Geef de batchconfiguratieopties op, zoals wordt weergegeven in het volgende voorbeeld:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Leverancier en consument definiëren.
Voor controlepuntenmodus als
BATCH
kunt u de volgende code gebruiken om berichten te verzenden en in batches te gebruiken.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Voor controlepuntenmodus als
MANUAL
kunt u de volgende code gebruiken om berichten te verzenden en/controlepunt in batches te gebruiken/te gebruiken.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Notitie
In de batchverwerkingsmodus is het standaardinhoudstype van Spring Cloud Stream-binder application/json
, dus zorg ervoor dat de nettolading van het bericht is afgestemd op het inhoudstype. Wanneer u bijvoorbeeld het standaardinhoudstype van application/json
gebruikt om berichten te ontvangen met String
nettolading, moet de nettolading worden JSON String
, omgeven door dubbele aanhalingstekens voor de oorspronkelijke String
tekst. Hoewel het voor text/plain
inhoudstype rechtstreeks een String
object kan zijn. Zie Spring Cloud Stream-inhoudstypeonderhandelingvoor meer informatie.
Foutberichten verwerken
Uitgaande bindingsfoutberichten verwerken
Spring Integration maakt standaard een globaal foutkanaal met de naam
errorChannel
. Configureer het volgende berichteindpunt voor het afhandelen van uitgaande bindingsfoutberichten.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Binnenkomende bindingsfoutberichten verwerken
Spring Cloud Stream Event Hubs Binder ondersteunt één oplossing voor het afhandelen van fouten voor de binnenkomende berichtbindingen: fouthandlers.
fouthandler:
Spring Cloud Stream biedt een mechanisme waarmee u een aangepaste fouthandler kunt bieden door een
Consumer
toe te voegen dieErrorMessage
exemplaren accepteert. Zie Foutberichten verwerken in de Spring Cloud Stream-documentatie voor meer informatie.Binding-standaardfouthandler
Configureer één
Consumer
bean om alle binnenkomende bindingsfoutberichten te gebruiken. De volgende standaardfunctie abonneert zich op elk binnenkomende bindingsfoutkanaal:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
U moet ook de eigenschap
spring.cloud.stream.default.error-handler-definition
instellen op de naam van de functie.Bindingsspecifieke fouthandler
Configureer een
Consumer
bean om de specifieke binnenkomende bindingsfoutberichten te gebruiken. De volgende functie abonneert zich op het specifieke inkomende bindingsfoutkanaal en heeft een hogere prioriteit dan de standaardfouthandler voor binding:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
U moet ook de eigenschap
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
instellen op de naam van de functie.
Event Hubs-berichtkoppen
Zie de sectie Event Hubs-berichtkoppen sectie van Spring Cloud Azure-ondersteuning voor Spring Integrationvoor de basisberichtkoppen die worden ondersteund.
Ondersteuning voor meerdere binder
Verbinding met meerdere Event Hubs-naamruimten wordt ook ondersteund door meerdere binders te gebruiken. In dit voorbeeld wordt een verbindingsreeks gebruikt als voorbeeld. Referenties van service-principals en beheerde identiteiten worden ook ondersteund. U kunt gerelateerde eigenschappen instellen in de omgevingsinstellingen van elke binder.
Als u meerdere binders wilt gebruiken met Event Hubs, configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Notitie
In het vorige toepassingsbestand ziet u hoe u één standaard poller voor de toepassing configureert voor alle bindingen. Als u de poller voor een specifieke binding wilt configureren, kunt u een configuratie zoals
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
gebruiken.We moeten twee leveranciers en twee consumenten definiëren:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Resource-inrichting
Event Hubs-binder biedt ondersteuning voor het inrichten van Event Hub en consumentengroep. Gebruikers kunnen de volgende eigenschappen gebruiken om het inrichten in te schakelen.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Notitie
De waarden die zijn toegestaan voor tenant-id
zijn: common
, organizations
, consumers
of de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.
Monsters
Zie de azure-spring-boot-samples opslagplaats op GitHub voor meer informatie.
Spring Cloud Stream Binder voor Azure Service Bus
Sleutelbegrippen
Spring Cloud Stream Binder voor Azure Service Bus biedt de bindende implementatie voor het Spring Cloud Stream Framework. Deze implementatie maakt gebruik van Spring Integration Service Bus-kanaaladapters bij de basis.
Gepland bericht
Deze binder ondersteunt het verzenden van berichten naar een onderwerp voor vertraagde verwerking. Gebruikers kunnen geplande berichten verzenden met kopteksten x-delay
in milliseconden een vertragingstijd voor het bericht uitdrukken. Het bericht wordt na x-delay
milliseconden aan de desbetreffende onderwerpen bezorgd.
Consumentengroep
Service Bus-onderwerp biedt vergelijkbare ondersteuning voor consumentengroepen als Apache Kafka, maar met iets andere logica.
Deze binder is afhankelijk van Subscription
van een onderwerp om te fungeren als een consumentengroep.
Afhankelijkheid instellen
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
U kunt ook de Spring Cloud Azure Stream Service Bus Starter gebruiken, zoals wordt weergegeven in het volgende voorbeeld voor Maven:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Configuratie
De binder biedt de volgende twee onderdelen van configuratieopties:
Eigenschappen van verbindingsconfiguratie
Deze sectie bevat de configuratieopties die worden gebruikt om verbinding te maken met Azure Service Bus.
Notitie
Als u ervoor kiest om een beveiligingsprincipaal te gebruiken om te verifiëren en autoriseren met Microsoft Entra ID voor toegang tot een Azure-resource, raadpleegt u Toegang autoriseren met Microsoft Entra ID om ervoor te zorgen dat de beveiligingsprincipaal de juiste machtiging heeft gekregen voor toegang tot de Azure-resource.
Configureerbare verbindingseigenschappen van spring-cloud-azure-stream-binder-servicebus:
Eigenschap | Type | Beschrijving |
---|---|---|
spring.cloud.azure.servicebus.enabled | booleaans | Of een Azure Service Bus is ingeschakeld. |
spring.cloud.azure.servicebus.connection-string | Snaar | Service Bus-naamruimteverbindingsreekswaarde. |
spring.cloud.azure.servicebus.custom-endpoint-address | Snaar | Het aangepaste eindpuntadres dat moet worden gebruikt bij het maken van verbinding met Service Bus. |
spring.cloud.azure.servicebus.namespace | Snaar | Service Bus-naamruimtewaarde, het voorvoegsel van de FQDN. Een FQDN moet bestaan uit NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Snaar | Domeinnaam van een Azure Service Bus-naamruimtewaarde. |
Notitie
Algemene configuratieopties voor Azure Service SDK kunnen ook worden geconfigureerd voor de Spring Cloud Azure Stream Service Bus-binder. De ondersteunde configuratieopties worden geïntroduceerd in Spring Cloud Azure-configuratieen kunnen worden geconfigureerd met het geïntegreerde voorvoegsel spring.cloud.azure.
of het voorvoegsel van spring.cloud.azure.servicebus.
.
De binder ondersteunt ook standaard Spring Could Azure Resource Manager-. Zie het gedeelte Data
van Spring Could Azure Resource Managervoor meer informatie over het ophalen van de verbindingsreeks met beveiligingsprinciplen die niet worden verleend met gerelateerde rollen.
Configuratie-eigenschappen van Azure Service Bus-binding
De volgende opties zijn onderverdeeld in vier secties: Consumer Properties, Advanced Consumer Configurations, Producer Properties en Advanced Producer Configurations.
Consumenteneigenschappen
Deze eigenschappen worden weergegeven via ServiceBusConsumerProperties
.
Notitie
Om herhaling te voorkomen, biedt Spring Cloud Azure Stream Binder Service Bus ondersteuning voor het instellen van waarden voor alle kanalen, in de indeling van spring.cloud.stream.servicebus.default.consumer.<property>=<value>
.
Configureerbare eigenschappen van spring-cloud-azure-stream-binder-servicebus:
Eigenschap | Type | Verstek | Beschrijving |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | booleaans | vals | Als de mislukte berichten worden doorgestuurd naar de DLQ. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Geheel getal | 1 | Maximaal aantal gelijktijdige berichten dat de Service Bus-processorclient moet verwerken. Wanneer sessie is ingeschakeld, is deze van toepassing op elke sessie. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-gelijktijdige sessies | Geheel getal | nul | Maximum aantal gelijktijdige sessies dat op een bepaald moment moet worden verwerkt. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Booleaans | nul | Of sessie is ingeschakeld. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Geheel getal | 0 | Het aantal prefetchs van de Service Bus-processorclient. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.subwachtrij | Subqueue | geen | Het type subwachtrij waarmee verbinding moet worden gemaakt. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Duur | 5 min. | De hoeveelheid tijd die nodig is om door te gaan met automatisch vernieuwen van de vergrendeling. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | De ontvangstmodus van de Service Bus-processorclient. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Booleaans | waar | Of berichten automatisch moeten worden vereffend. Als deze optie is ingesteld als onwaar, wordt er een berichtkop van Checkpointer toegevoegd om ontwikkelaars in staat te stellen berichten handmatig te vereffenen. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes | Lang | 1024 | De maximale grootte van de wachtrij/het onderwerp in megabytes. Dit is de grootte van het geheugen dat is toegewezen voor de wachtrij/het onderwerp. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | Duur | P10675199DT2H48M5.4775807S. (10675199 dagen, 2 uur, 48 minuten, 5 seconden en 477 milliseconden) | De duur waarna het bericht verloopt, beginnend vanaf het moment dat het bericht naar Service Bus wordt verzonden. |
Belangrijk
Wanneer u de Azure Resource Manager- (ARM) gebruikt, moet u de eigenschap spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type
configureren. Zie het voorbeeld servicebus-queue-binder-arm op GitHub voor meer informatie.
Geavanceerde consumentenconfiguratie
De bovenstaande verbinding en algemene azure SDK-client configuratieondersteuning voor elke bindergebruiker, die u kunt configureren met het voorvoegsel spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
.
Producenteigenschappen
Deze eigenschappen worden weergegeven via ServiceBusProducerProperties
.
Notitie
Om herhaling te voorkomen, biedt Spring Cloud Azure Stream Binder Service Bus ondersteuning voor het instellen van waarden voor alle kanalen, in de indeling van spring.cloud.stream.servicebus.default.producer.<property>=<value>
.
Configureerbare eigenschappen van spring-cloud-azure-stream-binder-servicebus:
Eigenschap | Type | Verstek | Beschrijving |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | booleaans | vals | Schakelvlag voor synchronisatie van producent. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | lang | 10000 | Time-outwaarde voor het verzenden van producent. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | nul | Service Bus-entiteitstype van de producent, vereist voor de bindingsproducent. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes | Lang | 1024 | De maximale grootte van de wachtrij/het onderwerp in megabytes. Dit is de grootte van het geheugen dat is toegewezen voor de wachtrij/het onderwerp. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | Duur | P10675199DT2H48M5.4775807S. (10675199 dagen, 2 uur, 48 minuten, 5 seconden en 477 milliseconden) | De duur waarna het bericht verloopt, beginnend vanaf het moment dat het bericht naar Service Bus wordt verzonden. |
Belangrijk
Wanneer u de bindingsproducent gebruikt, moet de eigenschap van spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
worden geconfigureerd.
Geavanceerde configuratie van producent
De bovenstaande verbinding en algemene azure SDK-client configuratieondersteuning voor elke binderproducent, die u kunt configureren met het voorvoegsel spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
.
Basisgebruik
Berichten verzenden en ontvangen van/naar Service Bus
Vul de configuratieopties in met referentiegegevens.
Voor referenties als verbindingsreeks configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Voor referenties als service-principal configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Notitie
De waarden die zijn toegestaan voor tenant-id
zijn: common
, organizations
, consumers
of de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.
Voor referenties als beheerde identiteiten configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Leverancier en consument definiëren.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Ondersteuning voor partitiesleutels
De binder ondersteunt Service Bus-partitionering door het instellen van de partitiesleutel en sessie-id in de berichtkop. In deze sectie wordt uitgelegd hoe u een partitiesleutel instelt voor berichten.
Spring Cloud Stream biedt een spEL-expressieeigenschap voor partitiesleutels spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
. Stel deze eigenschap bijvoorbeeld in als "'partitionKey-' + headers[<message-header-key>]"
en voeg een koptekst toe met de naam message-header-key. Spring Cloud Stream gebruikt de waarde voor deze header bij het evalueren van de expressie om een partitiesleutel toe te wijzen. De volgende code bevat een voorbeeldproducent:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Sessieondersteuning
De binder ondersteunt berichtsessies van Service Bus. Sessie-id van een bericht kan worden ingesteld via de berichtkop.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Notitie
Volgens Service Bus-partitioneringheeft de sessie-id een hogere prioriteit dan de partitiesleutel. Dus wanneer zowel ServiceBusMessageHeaders#SESSION_ID
als ServiceBusMessageHeaders#PARTITION_KEY
headers zijn ingesteld, wordt de waarde van de sessie-id uiteindelijk gebruikt om de waarde van de partitiesleutel te overschrijven.
Foutberichten verwerken
Uitgaande bindingsfoutberichten verwerken
Spring Integration maakt standaard een globaal foutkanaal met de naam
errorChannel
. Configureer het volgende berichteindpunt om het foutbericht voor uitgaande binding af te handelen.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Binnenkomende bindingsfoutberichten verwerken
Spring Cloud Stream Service Bus Binder ondersteunt twee oplossingen voor het afhandelen van fouten voor de binnenkomende berichtbindingen: de handler en handlers voor binderfouten.
Binder-fouthandler:
De standaardfouthandler voor binder verwerkt de binnenkomende binding. U gebruikt deze handler om mislukte berichten naar de wachtrij met onbestelbare berichten te verzenden wanneer
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
is ingeschakeld. Anders worden de mislukte berichten afgelaten. De handler voor binderfouten sluiten elkaar wederzijds uit met andere meegeleverde fouthandlers.fouthandler:
Spring Cloud Stream biedt een mechanisme waarmee u een aangepaste fouthandler kunt bieden door een
Consumer
toe te voegen dieErrorMessage
exemplaren accepteert. Zie Foutberichten verwerken in de Spring Cloud Stream-documentatie voor meer informatie.Binding-standaardfouthandler
Configureer één
Consumer
bean om alle binnenkomende bindingsfoutberichten te gebruiken. De volgende standaardfunctie abonneert zich op elk binnenkomende bindingsfoutkanaal:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
U moet ook de eigenschap
spring.cloud.stream.default.error-handler-definition
instellen op de naam van de functie.Bindingsspecifieke fouthandler
Configureer een
Consumer
bean om de specifieke binnenkomende bindingsfoutberichten te gebruiken. De volgende functie abonneert zich op het specifieke inkomende bindingsfoutkanaal met een hogere prioriteit dan de standaardfouthandler voor bindingen.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
U moet ook de eigenschap
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
instellen op de naam van de functie.
Service Bus-berichtkoppen
Zie de sectie Service Bus-berichtkoppen sectie van Spring Cloud Azure-ondersteuning voor Spring Integrationvoor de basisberichtkoppen die worden ondersteund.
Notitie
Bij het instellen van de partitiesleutel is de prioriteit van de berichtkop hoger dan de eigenschap Spring Cloud Stream.
spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
wordt dus alleen van kracht wanneer geen van de ServiceBusMessageHeaders#SESSION_ID
en ServiceBusMessageHeaders#PARTITION_KEY
headers zijn geconfigureerd.
Ondersteuning voor meerdere binder
Verbinding met meerdere Service Bus-naamruimten wordt ook ondersteund door meerdere binders te gebruiken. In dit voorbeeld wordt een verbindingsreeks gebruikt. Referenties van service-principals en beheerde identiteiten worden ook ondersteund. Gebruikers kunnen gerelateerde eigenschappen instellen in de omgevingsinstellingen van elke binder.
Als u meerdere binders van ServiceBus wilt gebruiken, configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Notitie
In het vorige toepassingsbestand ziet u hoe u één standaard poller voor de toepassing configureert voor alle bindingen. Als u de poller voor een specifieke binding wilt configureren, kunt u een configuratie zoals
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
gebruiken.we moeten twee leveranciers en twee consumenten definiëren
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Resource-inrichting
Service Bus Binder biedt ondersteuning voor het inrichten van wachtrijen, onderwerpen en abonnementen. Gebruikers kunnen de volgende eigenschappen gebruiken om het inrichten in te schakelen.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Notitie
De waarden die zijn toegestaan voor tenant-id
zijn: common
, organizations
, consumers
of de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.
Eigenschappen van Service Bus-client aanpassen
Ontwikkelaars kunnen AzureServiceClientBuilderCustomizer
gebruiken om eigenschappen van de Service Bus-client aan te passen. In het volgende voorbeeld wordt de eigenschap sessionIdleTimeout
in ServiceBusClientBuilder
aangepast:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Monsters
Zie de azure-spring-boot-samples opslagplaats op GitHub voor meer informatie.