Delen via


Spring Cloud Azure-ondersteuning voor Spring Cloud Stream

Dit artikel is van toepassing op:✅ versie 4.19.0 ✅ versie 5.19.0

Spring Cloud Stream is een framework voor het bouwen van uiterst schaalbare, gebeurtenisgestuurde microservices die zijn verbonden met gedeelde berichtensystemen.

Het framework biedt een flexibel programmeermodel dat is gebaseerd op reeds gevestigde en bekende Spring-idiomen en best practices. Deze aanbevolen procedures omvatten ondersteuning voor permanente pub-/subsemantiek, consumentengroepen en stateful partities.

Huidige binder-implementaties zijn onder andere:

Spring Cloud Stream Binder voor Azure Event Hubs

Sleutelbegrippen

Spring Cloud Stream Binder voor Azure Event Hubs biedt de bindende implementatie voor het Spring Cloud Stream-framework. Deze implementatie maakt gebruik van Spring Integration Event Hubs-kanaaladapters bij de basis. Vanuit het oogpunt van het ontwerp is Event Hubs vergelijkbaar met Kafka. Event Hubs kunnen ook worden geopend via kafka-API. Als uw project nauw afhankelijk is van de Kafka-API, kunt u proberen Events Hub met kafka API-voorbeeld

Consumentengroep

Event Hubs biedt vergelijkbare ondersteuning voor consumentengroepen als Apache Kafka, maar met iets andere logica. Hoewel Kafka alle vastgelegde offsets in de broker opslaat, moet u offsets opslaan van Event Hubs-berichten die handmatig worden verwerkt. Event Hubs SDK biedt de functie voor het opslaan van dergelijke offsets in Azure Storage.

Ondersteuning voor partitionering

Event Hubs biedt een vergelijkbaar concept van fysieke partitie als Kafka. Maar in tegenstelling tot de automatische herverdeling tussen consumenten en partities van Kafka, biedt Event Hubs een soort preventieve modus. Het opslagaccount fungeert als lease om te bepalen welke consument eigenaar is van welke partitie. Wanneer een nieuwe consument wordt gestart, wordt geprobeerd om enkele partities te stelen van de meest zwaar belaste consumenten om het werkbelastingsaldo te bereiken.

Om de taakverdelingsstrategie op te geven, worden de eigenschappen van spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* opgegeven. Zie de sectie Consumenteneigenschappen voor meer informatie.

Ondersteuning voor Batch-consumenten

Spring Cloud Azure Stream Event Hubs-binder ondersteunt Spring Cloud Stream Batch Consumer-functie.

Als u met de batchconsumermodus wilt werken, stelt u de eigenschap spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode in op true. Wanneer dit is ingeschakeld, wordt een bericht met een nettolading van een lijst met batchgebeurtenissen ontvangen en doorgegeven aan de functie Consumer. Elke berichtkop wordt ook geconverteerd naar een lijst, waarvan de inhoud de bijbehorende headerwaarde is die wordt geparseerd uit elke gebeurtenis. De gemeenschappelijke headers van partitie-id, controlepunt en laatste enqueued eigenschappen worden weergegeven als één waarde omdat de hele batch gebeurtenissen dezelfde waarde deelt. Zie de sectie Event Hubs-berichtkoppen sectie van Spring Cloud Azure-ondersteuning voor Spring Integrationvoor meer informatie.

Notitie

De koptekst van het controlepunt bestaat alleen wanneer de MANUAL controlepuntmodus wordt gebruikt.

Controlepunten van batchconsumer ondersteunen twee modi: BATCH en MANUAL. BATCH modus is een modus voor automatische controlepunten om de hele batch gebeurtenissen samen te controleren zodra de binder deze ontvangt. MANUAL modus is het controleren van de gebeurtenissen door gebruikers. Wanneer dit wordt gebruikt, wordt de Checkpointer doorgegeven aan de berichtkop en kunnen gebruikers dit gebruiken om controlepunten uit te voeren.

U kunt de batchgrootte opgeven door de eigenschappen max-size en max-wait-time in te stellen met een voorvoegsel van spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. De eigenschap max-size is nodig en de eigenschap max-wait-time is optioneel. Zie de sectie Consumenteneigenschappen voor meer informatie.

Afhankelijkheid instellen

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

U kunt ook de Spring Cloud Azure Stream Event Hubs Starter gebruiken, zoals wordt weergegeven in het volgende voorbeeld voor Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Configuratie

De binder biedt de volgende drie onderdelen van configuratieopties:

Eigenschappen van verbindingsconfiguratie

Deze sectie bevat de configuratieopties die worden gebruikt voor het maken van verbinding met Azure Event Hubs.

Notitie

Als u ervoor kiest om een beveiligingsprincipaal te gebruiken om te verifiëren en autoriseren met Microsoft Entra ID voor toegang tot een Azure-resource, raadpleegt u Toegang autoriseren met Microsoft Entra ID om ervoor te zorgen dat de beveiligingsprincipaal de juiste machtiging heeft gekregen voor toegang tot de Azure-resource.

Configureerbare verbindingseigenschappen van spring-cloud-azure-stream-binder-eventhubs:

Eigenschap Type Beschrijving
spring.cloud.azure.eventhubs.enabled booleaans Of een Azure Event Hubs is ingeschakeld.
spring.cloud.azure.eventhubs.connection-string Snaar Waarde van verbindingsreeks voor Event Hubs-naamruimte.
spring.cloud.azure.eventhubs.namespace Snaar Event Hubs-naamruimtewaarde, het voorvoegsel van de FQDN. Een FQDN moet bestaan uit NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Snaar Domeinnaam van een Azure Event Hubs-naamruimtewaarde.
spring.cloud.azure.eventhubs.custom-endpoint-address Snaar Aangepast eindpuntadres.

Fooi

Algemene configuratieopties voor Azure Service SDK kunnen ook worden geconfigureerd voor de Event Hubs-binder van Spring Cloud Azure Stream. De ondersteunde configuratieopties worden geïntroduceerd in Spring Cloud Azure-configuratieen kunnen worden geconfigureerd met het geïntegreerde voorvoegsel spring.cloud.azure. of het voorvoegsel van spring.cloud.azure.eventhubs..

De binder ondersteunt ook standaard Spring Could Azure Resource Manager-. Zie het gedeelte Data van Spring Could Azure Resource Managervoor meer informatie over het ophalen van de verbindingsreeks met beveiligingsprinciplen die niet worden verleend met gerelateerde rollen.

Eigenschappen van controlepuntconfiguratie

Deze sectie bevat de configuratieopties voor de Storage Blobs-service, die wordt gebruikt voor het behouden van het eigendom van de partitie en controlepuntgegevens.

Notitie

Wanneer vanaf versie 4.0.0 de eigenschap van spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists is niet handmatig is ingeschakeld, wordt er geen opslagcontainer automatisch gemaakt met de naam van spring.cloud.stream.bindings.bindings.binding-name.destination.

Controlepunt configureerbare eigenschappen van spring-cloud-azure-stream-binder-eventhubs:

Eigenschap Type Beschrijving
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleaans Of u containers wilt maken als deze niet bestaat.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Snaar Naam voor het opslagaccount.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Snaar Toegangssleutel voor opslagaccount.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Snaar Naam van opslagcontainer.

Fooi

Algemene configuratieopties voor Azure Service SDK kunnen ook worden geconfigureerd voor opslagblobcontrolepuntopslag. De ondersteunde configuratieopties worden geïntroduceerd in Spring Cloud Azure-configuratieen kunnen worden geconfigureerd met het geïntegreerde voorvoegsel spring.cloud.azure. of het voorvoegsel van spring.cloud.azure.eventhubs.processor.checkpoint-store.

Azure Event Hubs Binding-configuratie-eigenschappen

De volgende opties zijn onderverdeeld in vier secties: Consumer Properties, Advanced Consumer Configurations, Producer Properties en Advanced Producer Configurations.

Consumenteneigenschappen

Deze eigenschappen worden weergegeven via EventHubsConsumerProperties.

Notitie

Om herhaling te voorkomen, ondersteunt Spring Cloud Azure Stream Binder Event Hubs, sinds versie 4.19.0 en 5.19.0, instellingen voor alle kanalen, in de indeling van spring.cloud.stream.eventhubs.default.consumer.<property>=<value>.

Configureerbare eigenschappen van spring-cloud-azure-stream-binder-eventhubs:

Eigenschap Type Beschrijving
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode Controlepuntmodus die wordt gebruikt wanneer de consument besluit hoe een controlepuntbericht moet worden verzonden
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count Geheel getal Bepaalt de hoeveelheid bericht voor elke partitie om één controlepunt uit te voeren. Wordt alleen van kracht wanneer PARTITION_COUNT controlepuntmodus wordt gebruikt.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval Duur Hiermee bepaalt u het tijdsinterval om één controlepunt uit te voeren. Wordt alleen van kracht wanneer TIME controlepuntmodus wordt gebruikt.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max grootte Geheel getal Het maximum aantal gebeurtenissen in een batch. Vereist voor de batchconsumermodus.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Duur De maximale tijdsduur voor batchverwerking. Wordt alleen van kracht wanneer de batchconsumermodus is ingeschakeld en is optioneel.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Duur De intervaltijdsduur voor het bijwerken.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy De taakverdelingsstrategie.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Duur De tijdsduur waarna het eigendom van de partitie verloopt.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Booleaans Of de gebeurtenisprocessor informatie moet aanvragen over de laatste enqueuedgebeurtenis op de bijbehorende partitie en die informatie moet bijhouden wanneer gebeurtenissen worden ontvangen.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Geheel getal Het aantal dat door de consument wordt gebruikt om het aantal gebeurtenissen te beheren dat de Event Hub-consument actief ontvangt en in de wachtrij plaatst.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Wijs de sleutel toe als de partitie-id en waarden van StartPositionProperties De kaart met de gebeurtenispositie die voor elke partitie moet worden gebruikt als er geen controlepunt voor de partitie bestaat in het controlepuntarchief. Deze kaart is uitgeschakeld op de partitie-id.

Notitie

De initial-partition-event-position-configuratie accepteert een map om de initiële positie voor elke Event Hub op te geven. De sleutel is dus de partitie-id en de waarde is van StartPositionProperties, waaronder eigenschappen van offset, volgnummer, datum en inclusief. U kunt deze bijvoorbeeld instellen als

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Geavanceerde consumentenconfiguratie

De bovenstaande verbinding, controlepunten algemene configuratieondersteuning voor Azure SDK-client voor elke bindergebruiker, die u kunt configureren met het voorvoegsel spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Producenteigenschappen

Deze eigenschappen worden weergegeven via EventHubsProducerProperties.

Notitie

Om herhaling te voorkomen, ondersteunt Spring Cloud Azure Stream Binder Event Hubs, sinds versie 4.19.0 en 5.19.0, instellingen voor alle kanalen, in de indeling van spring.cloud.stream.eventhubs.default.producer.<property>=<value>.

Configureerbare eigenschappen van spring-cloud-azure-stream-binder-eventhubs:

Eigenschap Type Beschrijving
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync booleaans De schakelvlag voor synchronisatie van producent. Indien waar, wacht de producent op een antwoord na een verzendbewerking.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout lang De hoeveelheid tijd die moet worden gewacht op een antwoord na een verzendbewerking. Wordt alleen van kracht wanneer een synchronisatieproducent is ingeschakeld.
Geavanceerde configuratie van producent

De bovenstaande verbinding en algemene azure SDK-client configuratieondersteuning voor elke binderproducent, die u kunt configureren met het voorvoegsel spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Basisgebruik

Berichten verzenden en ontvangen van/naar Event Hubs

  1. Vul de configuratieopties in met referentiegegevens.

    • Voor referenties als verbindingsreeks configureert u de volgende eigenschappen in uw application.yml-bestand:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Voor referenties als service-principal configureert u de volgende eigenschappen in uw application.yml-bestand:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Notitie

De waarden die zijn toegestaan voor tenant-id zijn: common, organizations, consumersof de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.

  • Voor referenties als beheerde identiteiten configureert u de volgende eigenschappen in uw application.yml-bestand:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Leverancier en consument definiëren.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Ondersteuning voor partitionering

Er wordt een PartitionSupplier met door de gebruiker verstrekte partitiegegevens gemaakt om de partitiegegevens te configureren over het te verzenden bericht. In het volgende stroomdiagram ziet u het proces voor het verkrijgen van verschillende prioriteiten voor de partitie-id en sleutel:

diagram met een stroomdiagram van het partitioneringsondersteuningsproces.

Ondersteuning voor Batch-consumenten

  1. Geef de batchconfiguratieopties op, zoals wordt weergegeven in het volgende voorbeeld:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Leverancier en consument definiëren.

    Voor controlepuntenmodus als BATCHkunt u de volgende code gebruiken om berichten te verzenden en in batches te gebruiken.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Voor controlepuntenmodus als MANUALkunt u de volgende code gebruiken om berichten te verzenden en/controlepunt in batches te gebruiken/te gebruiken.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Notitie

In de batchverwerkingsmodus is het standaardinhoudstype van Spring Cloud Stream-binder application/json, dus zorg ervoor dat de nettolading van het bericht is afgestemd op het inhoudstype. Wanneer u bijvoorbeeld het standaardinhoudstype van application/json gebruikt om berichten te ontvangen met String nettolading, moet de nettolading worden JSON String, omgeven door dubbele aanhalingstekens voor de oorspronkelijke String tekst. Hoewel het voor text/plain inhoudstype rechtstreeks een String object kan zijn. Zie Spring Cloud Stream-inhoudstypeonderhandelingvoor meer informatie.

Foutberichten verwerken

  • Uitgaande bindingsfoutberichten verwerken

    Spring Integration maakt standaard een globaal foutkanaal met de naam errorChannel. Configureer het volgende berichteindpunt voor het afhandelen van uitgaande bindingsfoutberichten.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Binnenkomende bindingsfoutberichten verwerken

    Spring Cloud Stream Event Hubs Binder ondersteunt één oplossing voor het afhandelen van fouten voor de binnenkomende berichtbindingen: fouthandlers.

    fouthandler:

    Spring Cloud Stream biedt een mechanisme waarmee u een aangepaste fouthandler kunt bieden door een Consumer toe te voegen die ErrorMessage exemplaren accepteert. Zie Foutberichten verwerken in de Spring Cloud Stream-documentatie voor meer informatie.

    • Binding-standaardfouthandler

      Configureer één Consumer bean om alle binnenkomende bindingsfoutberichten te gebruiken. De volgende standaardfunctie abonneert zich op elk binnenkomende bindingsfoutkanaal:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      U moet ook de eigenschap spring.cloud.stream.default.error-handler-definition instellen op de naam van de functie.

    • Bindingsspecifieke fouthandler

      Configureer een Consumer bean om de specifieke binnenkomende bindingsfoutberichten te gebruiken. De volgende functie abonneert zich op het specifieke inkomende bindingsfoutkanaal en heeft een hogere prioriteit dan de standaardfouthandler voor binding:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      U moet ook de eigenschap spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition instellen op de naam van de functie.

Event Hubs-berichtkoppen

Zie de sectie Event Hubs-berichtkoppen sectie van Spring Cloud Azure-ondersteuning voor Spring Integrationvoor de basisberichtkoppen die worden ondersteund.

Ondersteuning voor meerdere binder

Verbinding met meerdere Event Hubs-naamruimten wordt ook ondersteund door meerdere binders te gebruiken. In dit voorbeeld wordt een verbindingsreeks gebruikt als voorbeeld. Referenties van service-principals en beheerde identiteiten worden ook ondersteund. U kunt gerelateerde eigenschappen instellen in de omgevingsinstellingen van elke binder.

  1. Als u meerdere binders wilt gebruiken met Event Hubs, configureert u de volgende eigenschappen in uw application.yml-bestand:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Notitie

    In het vorige toepassingsbestand ziet u hoe u één standaard poller voor de toepassing configureert voor alle bindingen. Als u de poller voor een specifieke binding wilt configureren, kunt u een configuratie zoals spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000gebruiken.

  2. We moeten twee leveranciers en twee consumenten definiëren:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Resource-inrichting

Event Hubs-binder biedt ondersteuning voor het inrichten van Event Hub en consumentengroep. Gebruikers kunnen de volgende eigenschappen gebruiken om het inrichten in te schakelen.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Notitie

De waarden die zijn toegestaan voor tenant-id zijn: common, organizations, consumersof de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.

Monsters

Zie de azure-spring-boot-samples opslagplaats op GitHub voor meer informatie.

Spring Cloud Stream Binder voor Azure Service Bus

Sleutelbegrippen

Spring Cloud Stream Binder voor Azure Service Bus biedt de bindende implementatie voor het Spring Cloud Stream Framework. Deze implementatie maakt gebruik van Spring Integration Service Bus-kanaaladapters bij de basis.

Gepland bericht

Deze binder ondersteunt het verzenden van berichten naar een onderwerp voor vertraagde verwerking. Gebruikers kunnen geplande berichten verzenden met kopteksten x-delay in milliseconden een vertragingstijd voor het bericht uitdrukken. Het bericht wordt na x-delay milliseconden aan de desbetreffende onderwerpen bezorgd.

Consumentengroep

Service Bus-onderwerp biedt vergelijkbare ondersteuning voor consumentengroepen als Apache Kafka, maar met iets andere logica. Deze binder is afhankelijk van Subscription van een onderwerp om te fungeren als een consumentengroep.

Afhankelijkheid instellen

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

U kunt ook de Spring Cloud Azure Stream Service Bus Starter gebruiken, zoals wordt weergegeven in het volgende voorbeeld voor Maven:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Configuratie

De binder biedt de volgende twee onderdelen van configuratieopties:

Eigenschappen van verbindingsconfiguratie

Deze sectie bevat de configuratieopties die worden gebruikt om verbinding te maken met Azure Service Bus.

Notitie

Als u ervoor kiest om een beveiligingsprincipaal te gebruiken om te verifiëren en autoriseren met Microsoft Entra ID voor toegang tot een Azure-resource, raadpleegt u Toegang autoriseren met Microsoft Entra ID om ervoor te zorgen dat de beveiligingsprincipaal de juiste machtiging heeft gekregen voor toegang tot de Azure-resource.

Configureerbare verbindingseigenschappen van spring-cloud-azure-stream-binder-servicebus:

Eigenschap Type Beschrijving
spring.cloud.azure.servicebus.enabled booleaans Of een Azure Service Bus is ingeschakeld.
spring.cloud.azure.servicebus.connection-string Snaar Service Bus-naamruimteverbindingsreekswaarde.
spring.cloud.azure.servicebus.custom-endpoint-address Snaar Het aangepaste eindpuntadres dat moet worden gebruikt bij het maken van verbinding met Service Bus.
spring.cloud.azure.servicebus.namespace Snaar Service Bus-naamruimtewaarde, het voorvoegsel van de FQDN. Een FQDN moet bestaan uit NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Snaar Domeinnaam van een Azure Service Bus-naamruimtewaarde.

Notitie

Algemene configuratieopties voor Azure Service SDK kunnen ook worden geconfigureerd voor de Spring Cloud Azure Stream Service Bus-binder. De ondersteunde configuratieopties worden geïntroduceerd in Spring Cloud Azure-configuratieen kunnen worden geconfigureerd met het geïntegreerde voorvoegsel spring.cloud.azure. of het voorvoegsel van spring.cloud.azure.servicebus..

De binder ondersteunt ook standaard Spring Could Azure Resource Manager-. Zie het gedeelte Data van Spring Could Azure Resource Managervoor meer informatie over het ophalen van de verbindingsreeks met beveiligingsprinciplen die niet worden verleend met gerelateerde rollen.

Configuratie-eigenschappen van Azure Service Bus-binding

De volgende opties zijn onderverdeeld in vier secties: Consumer Properties, Advanced Consumer Configurations, Producer Properties en Advanced Producer Configurations.

Consumenteneigenschappen

Deze eigenschappen worden weergegeven via ServiceBusConsumerProperties.

Notitie

Om herhaling te voorkomen, biedt Spring Cloud Azure Stream Binder Service Bus ondersteuning voor het instellen van waarden voor alle kanalen, in de indeling van spring.cloud.stream.servicebus.default.consumer.<property>=<value>.

Configureerbare eigenschappen van spring-cloud-azure-stream-binder-servicebus:

Eigenschap Type Verstek Beschrijving
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected booleaans vals Als de mislukte berichten worden doorgestuurd naar de DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Geheel getal 1 Maximaal aantal gelijktijdige berichten dat de Service Bus-processorclient moet verwerken. Wanneer sessie is ingeschakeld, is deze van toepassing op elke sessie.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-gelijktijdige sessies Geheel getal nul Maximum aantal gelijktijdige sessies dat op een bepaald moment moet worden verwerkt.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Booleaans nul Of sessie is ingeschakeld.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Geheel getal 0 Het aantal prefetchs van de Service Bus-processorclient.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.subwachtrij Subqueue geen Het type subwachtrij waarmee verbinding moet worden gemaakt.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Duur 5 min. De hoeveelheid tijd die nodig is om door te gaan met automatisch vernieuwen van de vergrendeling.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock De ontvangstmodus van de Service Bus-processorclient.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Booleaans waar Of berichten automatisch moeten worden vereffend. Als deze optie is ingesteld als onwaar, wordt er een berichtkop van Checkpointer toegevoegd om ontwikkelaars in staat te stellen berichten handmatig te vereffenen.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes Lang 1024 De maximale grootte van de wachtrij/het onderwerp in megabytes. Dit is de grootte van het geheugen dat is toegewezen voor de wachtrij/het onderwerp.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live Duur P10675199DT2H48M5.4775807S. (10675199 dagen, 2 uur, 48 minuten, 5 seconden en 477 milliseconden) De duur waarna het bericht verloopt, beginnend vanaf het moment dat het bericht naar Service Bus wordt verzonden.

Belangrijk

Wanneer u de Azure Resource Manager- (ARM) gebruikt, moet u de eigenschap spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type configureren. Zie het voorbeeld servicebus-queue-binder-arm op GitHub voor meer informatie.

Geavanceerde consumentenconfiguratie

De bovenstaande verbinding en algemene azure SDK-client configuratieondersteuning voor elke bindergebruiker, die u kunt configureren met het voorvoegsel spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Producenteigenschappen

Deze eigenschappen worden weergegeven via ServiceBusProducerProperties.

Notitie

Om herhaling te voorkomen, biedt Spring Cloud Azure Stream Binder Service Bus ondersteuning voor het instellen van waarden voor alle kanalen, in de indeling van spring.cloud.stream.servicebus.default.producer.<property>=<value>.

Configureerbare eigenschappen van spring-cloud-azure-stream-binder-servicebus:

Eigenschap Type Verstek Beschrijving
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync booleaans vals Schakelvlag voor synchronisatie van producent.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout lang 10000 Time-outwaarde voor het verzenden van producent.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType nul Service Bus-entiteitstype van de producent, vereist voor de bindingsproducent.
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes Lang 1024 De maximale grootte van de wachtrij/het onderwerp in megabytes. Dit is de grootte van het geheugen dat is toegewezen voor de wachtrij/het onderwerp.
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live Duur P10675199DT2H48M5.4775807S. (10675199 dagen, 2 uur, 48 minuten, 5 seconden en 477 milliseconden) De duur waarna het bericht verloopt, beginnend vanaf het moment dat het bericht naar Service Bus wordt verzonden.

Belangrijk

Wanneer u de bindingsproducent gebruikt, moet de eigenschap van spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type worden geconfigureerd.

Geavanceerde configuratie van producent

De bovenstaande verbinding en algemene azure SDK-client configuratieondersteuning voor elke binderproducent, die u kunt configureren met het voorvoegsel spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Basisgebruik

Berichten verzenden en ontvangen van/naar Service Bus

  1. Vul de configuratieopties in met referentiegegevens.

    • Voor referenties als verbindingsreeks configureert u de volgende eigenschappen in uw application.yml-bestand:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Voor referenties als service-principal configureert u de volgende eigenschappen in uw application.yml-bestand:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Notitie

De waarden die zijn toegestaan voor tenant-id zijn: common, organizations, consumersof de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.

  • Voor referenties als beheerde identiteiten configureert u de volgende eigenschappen in uw application.yml-bestand:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Leverancier en consument definiëren.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Ondersteuning voor partitiesleutels

De binder ondersteunt Service Bus-partitionering door het instellen van de partitiesleutel en sessie-id in de berichtkop. In deze sectie wordt uitgelegd hoe u een partitiesleutel instelt voor berichten.

Spring Cloud Stream biedt een spEL-expressieeigenschap voor partitiesleutels spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression. Stel deze eigenschap bijvoorbeeld in als "'partitionKey-' + headers[<message-header-key>]" en voeg een koptekst toe met de naam message-header-key. Spring Cloud Stream gebruikt de waarde voor deze header bij het evalueren van de expressie om een partitiesleutel toe te wijzen. De volgende code bevat een voorbeeldproducent:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Sessieondersteuning

De binder ondersteunt berichtsessies van Service Bus. Sessie-id van een bericht kan worden ingesteld via de berichtkop.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Notitie

Volgens Service Bus-partitioneringheeft de sessie-id een hogere prioriteit dan de partitiesleutel. Dus wanneer zowel ServiceBusMessageHeaders#SESSION_ID als ServiceBusMessageHeaders#PARTITION_KEY headers zijn ingesteld, wordt de waarde van de sessie-id uiteindelijk gebruikt om de waarde van de partitiesleutel te overschrijven.

Foutberichten verwerken

  • Uitgaande bindingsfoutberichten verwerken

    Spring Integration maakt standaard een globaal foutkanaal met de naam errorChannel. Configureer het volgende berichteindpunt om het foutbericht voor uitgaande binding af te handelen.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Binnenkomende bindingsfoutberichten verwerken

    Spring Cloud Stream Service Bus Binder ondersteunt twee oplossingen voor het afhandelen van fouten voor de binnenkomende berichtbindingen: de handler en handlers voor binderfouten.

    Binder-fouthandler:

    De standaardfouthandler voor binder verwerkt de binnenkomende binding. U gebruikt deze handler om mislukte berichten naar de wachtrij met onbestelbare berichten te verzenden wanneer spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected is ingeschakeld. Anders worden de mislukte berichten afgelaten. De handler voor binderfouten sluiten elkaar wederzijds uit met andere meegeleverde fouthandlers.

    fouthandler:

    Spring Cloud Stream biedt een mechanisme waarmee u een aangepaste fouthandler kunt bieden door een Consumer toe te voegen die ErrorMessage exemplaren accepteert. Zie Foutberichten verwerken in de Spring Cloud Stream-documentatie voor meer informatie.

    • Binding-standaardfouthandler

      Configureer één Consumer bean om alle binnenkomende bindingsfoutberichten te gebruiken. De volgende standaardfunctie abonneert zich op elk binnenkomende bindingsfoutkanaal:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      U moet ook de eigenschap spring.cloud.stream.default.error-handler-definition instellen op de naam van de functie.

    • Bindingsspecifieke fouthandler

      Configureer een Consumer bean om de specifieke binnenkomende bindingsfoutberichten te gebruiken. De volgende functie abonneert zich op het specifieke inkomende bindingsfoutkanaal met een hogere prioriteit dan de standaardfouthandler voor bindingen.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      U moet ook de eigenschap spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition instellen op de naam van de functie.

Service Bus-berichtkoppen

Zie de sectie Service Bus-berichtkoppen sectie van Spring Cloud Azure-ondersteuning voor Spring Integrationvoor de basisberichtkoppen die worden ondersteund.

Notitie

Bij het instellen van de partitiesleutel is de prioriteit van de berichtkop hoger dan de eigenschap Spring Cloud Stream. spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression wordt dus alleen van kracht wanneer geen van de ServiceBusMessageHeaders#SESSION_ID en ServiceBusMessageHeaders#PARTITION_KEY headers zijn geconfigureerd.

Ondersteuning voor meerdere binder

Verbinding met meerdere Service Bus-naamruimten wordt ook ondersteund door meerdere binders te gebruiken. In dit voorbeeld wordt een verbindingsreeks gebruikt. Referenties van service-principals en beheerde identiteiten worden ook ondersteund. Gebruikers kunnen gerelateerde eigenschappen instellen in de omgevingsinstellingen van elke binder.

  1. Als u meerdere binders van ServiceBus wilt gebruiken, configureert u de volgende eigenschappen in uw application.yml-bestand:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Notitie

    In het vorige toepassingsbestand ziet u hoe u één standaard poller voor de toepassing configureert voor alle bindingen. Als u de poller voor een specifieke binding wilt configureren, kunt u een configuratie zoals spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000gebruiken.

  2. we moeten twee leveranciers en twee consumenten definiëren

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Resource-inrichting

Service Bus Binder biedt ondersteuning voor het inrichten van wachtrijen, onderwerpen en abonnementen. Gebruikers kunnen de volgende eigenschappen gebruiken om het inrichten in te schakelen.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Notitie

De waarden die zijn toegestaan voor tenant-id zijn: common, organizations, consumersof de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.

Eigenschappen van Service Bus-client aanpassen

Ontwikkelaars kunnen AzureServiceClientBuilderCustomizer gebruiken om eigenschappen van de Service Bus-client aan te passen. In het volgende voorbeeld wordt de eigenschap sessionIdleTimeout in ServiceBusClientBuilderaangepast:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Monsters

Zie de azure-spring-boot-samples opslagplaats op GitHub voor meer informatie.