Delen via


Spring Cloud Azure-ondersteuning voor Spring Integration

Dit artikel is van toepassing op:✅ versie 4.19.0 ✅ versie 5.19.0

Spring Integration Extension voor Azure biedt Spring Integration-adapters voor de verschillende services die worden geleverd door de Azure SDK voor Java. We bieden Spring Integration-ondersteuning voor deze Azure-services: Event Hubs, Service Bus, Opslagwachtrij. Hier volgt een lijst met ondersteunde adapters:

Spring-integratie met Azure Event Hubs

Sleutelbegrippen

Azure Event Hubs is een streamingplatform voor big data en een service voor gebeurtenisopname. Het kan miljoenen gebeurtenissen per seconde ontvangen en verwerken. Gegevens die naar een Event Hub worden verzonden, kunnen worden getransformeerd en opgeslagen met behulp van een realtime analyseprovider of batchverwerking/opslagadapters.

Spring Integration maakt lichtgewicht berichten mogelijk binnen Spring-toepassingen en ondersteunt integratie met externe systemen via declaratieve adapters. Deze adapters bieden een hoger abstractieniveau ten opzichte van de ondersteuning van Spring voor externe communicatie, berichten en planning. Het Spring Integration voor Event Hubs-extensieproject biedt binnenkomende en uitgaande kanaaladapters en gateways voor Azure Event Hubs.

Notitie

RxJava-ondersteunings-API's worden verwijderd van versie 4.0.0. Zie Javadoc voor meer informatie.

Consumentengroep

Event Hubs biedt vergelijkbare ondersteuning voor consumentengroepen als Apache Kafka, maar met iets andere logica. Hoewel Kafka alle vastgelegde offsets in de broker opslaat, moet u offsets opslaan van Event Hubs-berichten die handmatig worden verwerkt. Event Hubs SDK biedt de functie voor het opslaan van dergelijke offsets in Azure Storage.

Ondersteuning voor partitionering

Event Hubs biedt een vergelijkbaar concept van fysieke partitie als Kafka. Maar in tegenstelling tot de automatische herverdeling van Kafka tussen consumenten en partities, biedt Event Hubs een soort preventieve modus. Het opslagaccount fungeert als een lease om te bepalen welke partitie eigendom is van welke consument. Wanneer een nieuwe consument begint, wordt geprobeerd om enkele partities te stelen van de meeste zwaar belaste consumenten om de workloadverdeling te bereiken.

Ontwikkelaars kunnen EventHubsContainerProperties voor de configuratie gebruiken om de taakverdelingsstrategie op te geven. Zie de volgende sectie voor een voorbeeld van het configureren van EventHubsContainerProperties.

Ondersteuning voor Batch-consumenten

De EventHubsInboundChannelAdapter ondersteunt de batchverwerkingsmodus. Om deze functie in te schakelen, kunnen gebruikers de listenermodus opgeven als ListenerMode.BATCH bij het maken van een EventHubsInboundChannelAdapter exemplaar. Wanneer deze optie is ingeschakeld, wordt een Message waarvan de nettolading een lijst met batchgebeurtenissen is ontvangen en doorgegeven aan het downstreamkanaal. Elke berichtkop wordt ook geconverteerd als een lijst, waarvan de inhoud de bijbehorende headerwaarde is die van elke gebeurtenis wordt geparseerd. Voor de gemeenschappelijke headers van partitie-id, controlepunt en laatste enqueued eigenschappen, worden ze weergegeven als één waarde voor de hele batch gebeurtenissen dezelfde. Zie de sectie Event Hubs-berichtkoppen voor meer informatie.

Notitie

De controlepuntkop bestaat alleen wanneer HANDMATIGe controlepuntmodus wordt gebruikt.

Controlepunten van batchconsumer ondersteunen twee modi: BATCH en MANUAL. BATCH-modus is een modus voor automatische controlepunten om de hele batch gebeurtenissen samen te controleren zodra ze zijn ontvangen. MANUAL modus is het controleren van de gebeurtenissen door gebruikers. Wanneer dit wordt gebruikt, wordt de Checkpointer- doorgegeven aan de berichtkop en kunnen gebruikers deze gebruiken om controlepunten uit te voeren.

Het beleid voor batchverwerking kan worden opgegeven door eigenschappen van max-size en max-wait-time, waarbij max-size een vereiste eigenschap is terwijl max-wait-time optioneel is. Ontwikkelaars kunnen EventHubsContainerProperties voor de configuratie gebruiken om de strategie voor batchgebruik op te geven. Zie de volgende sectie voor een voorbeeld van het configureren van EventHubsContainerProperties.

Afhankelijkheid instellen

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Configuratie

Deze starter biedt de volgende drie onderdelen van configuratieopties:

Eigenschappen van verbindingsconfiguratie

Deze sectie bevat de configuratieopties die worden gebruikt voor het maken van verbinding met Azure Event Hubs.

Notitie

Als u ervoor kiest om een beveiligingsprincipaal te gebruiken om te verifiëren en autoriseren met Microsoft Entra ID voor toegang tot een Azure-resource, raadpleegt u Toegang autoriseren met Microsoft Entra ID om ervoor te zorgen dat de beveiligingsprincipaal de juiste machtiging heeft gekregen voor toegang tot de Azure-resource.

Configureerbare eigenschappen van spring-cloud-azure-starter-integration-eventhubs:

Eigenschap Type Beschrijving
spring.cloud.azure.eventhubs.enabled booleaans Of een Azure Event Hubs is ingeschakeld.
spring.cloud.azure.eventhubs.connection-string Snaar Waarde van verbindingsreeks voor Event Hubs-naamruimte.
spring.cloud.azure.eventhubs.namespace Snaar Event Hubs-naamruimtewaarde, het voorvoegsel van de FQDN. Een FQDN moet bestaan uit NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Snaar Domeinnaam van een Azure Event Hubs-naamruimtewaarde.
spring.cloud.azure.eventhubs.custom-endpoint-address Snaar Aangepast eindpuntadres.
spring.cloud.azure.eventhubs.shared-connection Booleaans Of de onderliggende EventProcessorClient en EventHubProducerAsyncClient dezelfde verbinding gebruiken. Standaard wordt er een nieuwe verbinding gemaakt en gebruikt voor elke Gemaakte Event Hub-client.

Eigenschappen van controlepuntconfiguratie

Deze sectie bevat de configuratieopties voor de Storage Blobs-service, die wordt gebruikt voor het behouden van het eigendom van de partitie en controlepuntgegevens.

Notitie

Wanneer vanaf versie 4.0.0 de eigenschap van spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists is niet handmatig is ingeschakeld, wordt er geen opslagcontainer automatisch gemaakt.

Controlepunt configureerbare eigenschappen van spring-cloud-azure-starter-integration-eventhubs:

Eigenschap Type Beschrijving
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleaans Of u containers wilt maken als deze niet bestaat.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Snaar Naam voor het opslagaccount.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Snaar Toegangssleutel voor opslagaccount.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Snaar Naam van opslagcontainer.

Algemene configuratieopties voor Azure Service SDK kunnen ook worden geconfigureerd voor opslagblobcontrolepuntopslag. De ondersteunde configuratieopties worden geïntroduceerd in Spring Cloud Azure-configuratieen kunnen worden geconfigureerd met het geïntegreerde voorvoegsel spring.cloud.azure. of het voorvoegsel van spring.cloud.azure.eventhubs.processor.checkpoint-store.

Configuratie-eigenschappen van Event Hub-processor

De EventHubsInboundChannelAdapter gebruikt de EventProcessorClient om berichten van een Event Hub te gebruiken om de algemene eigenschappen van een EventProcessorClientte configureren. Ontwikkelaars kunnen EventHubsContainerProperties gebruiken voor de configuratie. Zie de volgende sectie over het werken met EventHubsInboundChannelAdapter.

Basisgebruik

Berichten verzenden naar Azure Event Hubs

  1. Vul de configuratieopties voor referenties in.

    • Voor referenties als verbindingsreeks configureert u de volgende eigenschappen in uw application.yml-bestand:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Voor referenties als beheerde identiteiten configureert u de volgende eigenschappen in uw application.yml-bestand:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Voor referenties als service-principal configureert u de volgende eigenschappen in uw application.yml-bestand:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Notitie

De waarden die zijn toegestaan voor tenant-id zijn: common, organizations, consumersof de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.

  1. Maak DefaultMessageHandler met de EventHubsTemplate bean om berichten naar Event Hubs te verzenden.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Maak een berichtgatewaybinding met de bovenstaande berichthandler via een berichtkanaal.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Berichten verzenden met behulp van de gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Berichten ontvangen van Azure Event Hubs

  1. Vul de configuratieopties voor referenties in.

  2. Maak een san van het berichtkanaal als invoerkanaal.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Maak EventHubsInboundChannelAdapter met de EventHubsMessageListenerContainer bean om berichten van Event Hubs te ontvangen.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Maak een berichtontvangerbinding met EventHubsInboundChannelAdapter via het berichtkanaal dat eerder is gemaakt.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

EventHubsMessageConverter configureren om objectMapper aan te passen

EventHubsMessageConverter wordt gemaakt als configureerbare bean, zodat gebruikers ObjectMapper kunnen aanpassen.

Ondersteuning voor Batch-consumenten

Als u berichten van Event Hubs in batches wilt gebruiken, is vergelijkbaar met het bovenstaande voorbeeld, moeten gebruikers naast gebruikers de gerelateerde configuratieopties voor batchverwerking instellen voor EventHubsInboundChannelAdapter.

Wanneer u EventHubsInboundChannelAdaptermaakt, moet de listenermodus worden ingesteld als BATCH. Wanneer u een bean van EventHubsMessageListenerContainermaakt, stelt u de controlepuntmodus in als MANUAL of BATCHen kunt u de batchopties zo nodig configureren.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Event Hubs-berichtkoppen

In de volgende tabel ziet u hoe de eigenschappen van Event Hubs-berichten worden toegewezen aan Spring-berichtkoppen. Voor Azure Event Hubs wordt het bericht aangeroepen als event.

Toewijzing tussen Event Hubs-bericht-/gebeurteniseigenschappen en Spring Message-headers in recordlistenermodus:

Event Hubs-gebeurteniseigenschappen Spring Message Header Constanten Type Beschrijving
Tijd van enquête EventHubsHeaders#ENQUEUED_TIME Oogwenk Het moment, in UTC, van toen de gebeurtenis werd geïnventueerd in de Event Hub-partitie.
Afstand EventHubsHeaders#OFFSET Lang De offset van de gebeurtenis toen deze werd ontvangen van de bijbehorende Event Hub-partitie.
Partitiesleutel AzureHeaders#PARTITION_KEY Snaar De partitie-hashsleutel als deze is ingesteld bij het oorspronkelijk publiceren van de gebeurtenis.
Partitie-id AzureHeaders#RAW_PARTITION_ID Snaar De partitie-id van de Event Hub.
Volgnummer EventHubsHeaders#SEQUENCE_NUMBER Lang Het volgnummer dat is toegewezen aan de gebeurtenis toen deze werd gevraagd in de bijbehorende Event Hub-partitie.
Laatst geïnventueerde gebeurteniseigenschappen EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties De eigenschappen van de laatste enqueuedgebeurtenis in deze partitie.
NA AzureHeaders#CHECKPOINTER Controlepunt De koptekst voor het controlepunt van het specifieke bericht.

Gebruikers kunnen de berichtkoppen parseren voor de gerelateerde informatie van elke gebeurtenis. Als u een berichtkop voor de gebeurtenis wilt instellen, worden alle aangepaste headers als een toepassingseigenschap van een gebeurtenis geplaatst, waarbij de header als eigenschapssleutel wordt ingesteld. Wanneer gebeurtenissen worden ontvangen van Event Hubs, worden alle toepassingseigenschappen geconverteerd naar de berichtkop.

Notitie

Berichtkoppen van partitiesleutel, ge enqueued tijd, offset en volgnummer worden niet ondersteund om handmatig in te stellen.

Wanneer de batchconsumermodus is ingeschakeld, worden de specifieke headers van batchberichten als volgt weergegeven. Deze bevat een lijst met waarden van elke Event Hubs-gebeurtenis.

Toewijzing tussen Event Hubs-bericht-/gebeurteniseigenschappen en Spring Message-headers in batch-listenermodus:

Event Hubs-gebeurteniseigenschappen Spring Batch-berichtkopconstanten Type Beschrijving
Tijd van enquête EventHubsHeaders#ENQUEUED_TIME Lijst met direct Lijst van de directe, in UTC, van wanneer elke gebeurtenis is geïnventareerd in de Event Hub-partitie.
Afstand EventHubsHeaders#OFFSET Lijst met lange Lijst van de offset van elke gebeurtenis wanneer deze is ontvangen van de bijbehorende Event Hub-partitie.
Partitiesleutel AzureHeaders#PARTITION_KEY Lijst met tekenreeksen Lijst met de partitie-hashsleutel als deze is ingesteld bij het oorspronkelijk publiceren van elke gebeurtenis.
Volgnummer EventHubsHeaders#SEQUENCE_NUMBER Lijst met lange Lijst met het volgnummer dat aan elke gebeurtenis is toegewezen toen deze werd gevraagt in de bijbehorende Event Hub-partitie.
Systeemeigenschappen EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Lijst met kaart Lijst met de systeemeigenschappen van elke gebeurtenis.
Toepassingseigenschappen EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Lijst met kaart Lijst met de toepassingseigenschappen van elke gebeurtenis, waarbij alle aangepaste berichtkoppen of gebeurteniseigenschappen worden geplaatst.

Notitie

Wanneer u berichten publiceert, worden alle bovenstaande batchheaders verwijderd uit de berichten, indien aanwezig.

Monsters

Zie de azure-spring-boot-samples opslagplaats op GitHub voor meer informatie.

Spring-integratie met Azure Service Bus

Sleutelbegrippen

Spring Integration maakt lichtgewicht berichten mogelijk binnen Spring-toepassingen en ondersteunt integratie met externe systemen via declaratieve adapters.

Het Spring Integration for Azure Service Bus-extensieproject biedt binnenkomende en uitgaande kanaaladapters voor Azure Service Bus.

Notitie

CompleteFuture-ondersteunings-API's zijn afgeschaft van versie 2.10.0 en vervangen door Reactor Core van versie 4.0.0. Zie Javadoc voor meer informatie.

Afhankelijkheid instellen

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Configuratie

Deze starter biedt de volgende twee onderdelen van configuratieopties:

Eigenschappen van verbindingsconfiguratie

Deze sectie bevat de configuratieopties die worden gebruikt om verbinding te maken met Azure Service Bus.

Notitie

Als u ervoor kiest om een beveiligingsprincipaal te gebruiken om te verifiëren en autoriseren met Microsoft Entra ID voor toegang tot een Azure-resource, raadpleegt u Toegang autoriseren met Microsoft Entra ID om ervoor te zorgen dat de beveiligingsprincipaal de juiste machtiging heeft gekregen voor toegang tot de Azure-resource.

Configureerbare eigenschappen van spring-cloud-azure-starter-integration-servicebus:

Eigenschap Type Beschrijving
spring.cloud.azure.servicebus.enabled booleaans Of een Azure Service Bus is ingeschakeld.
spring.cloud.azure.servicebus.connection-string Snaar Service Bus-naamruimteverbindingsreekswaarde.
spring.cloud.azure.servicebus.custom-endpoint-address Snaar Het aangepaste eindpuntadres dat moet worden gebruikt bij het maken van verbinding met Service Bus.
spring.cloud.azure.servicebus.namespace Snaar Service Bus-naamruimtewaarde, het voorvoegsel van de FQDN. Een FQDN moet bestaan uit NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Snaar Domeinnaam van een Azure Service Bus-naamruimtewaarde.

Configuratie-eigenschappen van Service Bus-processor

De ServiceBusInboundChannelAdapter gebruikt de ServiceBusProcessorClient om berichten te gebruiken, om de algemene eigenschappen van een ServiceBusProcessorClientte configureren, kunnen ontwikkelaars ServiceBusContainerProperties gebruiken voor de configuratie. Zie de volgende sectie over het werken met ServiceBusInboundChannelAdapter.

Basisgebruik

Berichten verzenden naar Azure Service Bus

  1. Vul de configuratieopties voor referenties in.

    • Voor referenties als verbindingsreeks configureert u de volgende eigenschappen in uw application.yml-bestand:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Voor referenties als beheerde identiteiten configureert u de volgende eigenschappen in uw application.yml-bestand:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Notitie

De waarden die zijn toegestaan voor tenant-id zijn: common, organizations, consumersof de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.

  • Voor referenties als service-principal configureert u de volgende eigenschappen in uw application.yml-bestand:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Notitie

De waarden die zijn toegestaan voor tenant-id zijn: common, organizations, consumersof de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.

  1. Maak DefaultMessageHandler met de ServiceBusTemplate bean om berichten naar Service Bus te verzenden, stel het entiteitstype voor de ServiceBusTemplate in. In dit voorbeeld wordt Service Bus Queue gebruikt als voorbeeld.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Maak een berichtgatewaybinding met de bovenstaande berichthandler via een berichtkanaal.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Berichten verzenden met behulp van de gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Berichten ontvangen van Azure Service Bus

  1. Vul de configuratieopties voor referenties in.

  2. Maak een san van het berichtkanaal als invoerkanaal.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Maak ServiceBusInboundChannelAdapter met de ServiceBusMessageListenerContainer bean om berichten naar Service Bus te ontvangen. In dit voorbeeld wordt Service Bus Queue gebruikt als voorbeeld.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Maak een berichtontvangerbinding met ServiceBusInboundChannelAdapter via het berichtkanaal dat we eerder hebben gemaakt.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

ServiceBusMessage Converter configureren om objectMapper aan te passen

ServiceBusMessageConverter wordt gemaakt als configureerbare bean, zodat gebruikers ObjectMapperkunnen aanpassen.

Service Bus-berichtkoppen

Voor sommige Service Bus-headers die kunnen worden toegewezen aan meerdere Spring-headerconstanten, wordt de prioriteit van verschillende Spring-headers vermeld.

Toewijzing tussen Service Bus-headers en Spring-headers:

Service Bus-berichtkoppen en -eigenschappen Koptekstconstanten van springbericht Type Configureerbare Beschrijving
Inhoudstype MessageHeaders#CONTENT_TYPE Snaar Ja De RFC2045 inhoudstypedescriptor van het bericht.
Correlatie-id ServiceBusMessageHeaders#CORRELATION_ID Snaar Ja De correlatie-id van het bericht
Bericht-id ServiceBusMessageHeaders#MESSAGE_ID Snaar Ja De bericht-id van het bericht heeft deze koptekst een hogere prioriteit dan MessageHeaders#ID.
Bericht-id MessageHeaders#ID UUID Ja De bericht-id van het bericht heeft een lagere prioriteit dan ServiceBusMessageHeaders#MESSAGE_ID.
Partitiesleutel ServiceBusMessageHeaders#PARTITION_KEY Snaar Ja De partitiesleutel voor het verzenden van het bericht naar een gepartitioneerde entiteit.
Beantwoorden MessageHeaders#REPLY_CHANNEL Snaar Ja Het adres van een entiteit waarnaar antwoorden moeten worden verzonden.
Sessie-id beantwoorden ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Snaar Ja De eigenschapswaarde ReplyToGroupId van het bericht.
Geplande tijd in wachtrij utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Ja De datum/tijd waarop het bericht moet worden ge enqueued in Service Bus, deze kop heeft een hogere prioriteit dan AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Geplande tijd in wachtrij utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Geheel getal Ja De datum/tijd waarop het bericht in Service Bus moet worden verzonden, heeft deze koptekst een lagere prioriteit dan ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Sessie-id ServiceBusMessageHeaders#SESSION_ID Snaar Ja De sessie-IDentifier voor een sessiebewuste entiteit.
Time to live ServiceBusMessageHeaders#TIME_TO_LIVE Duur Ja De duur van de tijd voordat dit bericht verloopt.
Aan ServiceBusMessageHeaders#TO Snaar Ja Het 'aan'-adres van het bericht, gereserveerd voor toekomstig gebruik in routeringsscenario's en momenteel genegeerd door de broker zelf.
Onderwerp ServiceBusMessageHeaders#SUBJECT Snaar Ja Het onderwerp voor het bericht.
Beschrijving van fout met dode letter ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Snaar Nee De beschrijving van een bericht dat in een dode letter is geschreven.
Reden voor dode letter ServiceBusMessageHeaders#DEAD_LETTER_REASON Snaar Nee De reden dat een bericht dood is geschreven.
Bron van dode letter ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Snaar Nee De entiteit waarin het bericht dood is geschreven.
Aantal bezorgingen ServiceBusMessageHeaders#DELIVERY_COUNT lang Nee Het aantal keren dat dit bericht aan clients is bezorgd.
Gesemplementeerd volgnummer ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER lang Nee Het volgnummer dat is toegewezen aan een bericht door Service Bus.
Tijd van enquête ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Nee De datum/tijd waarop dit bericht is verzonden in Service Bus.
Verloopt op ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Nee De datum/tijd waarop dit bericht verloopt.
Token vergrendelen ServiceBusMessageHeaders#LOCK_TOKEN Snaar Nee Het vergrendelingstoken voor het huidige bericht.
Vergrendeld tot ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Nee De datum/tijd waarop de vergrendeling van dit bericht verloopt.
Volgnummer ServiceBusMessageHeaders#SEQUENCE_NUMBER lang Nee Het unieke nummer dat is toegewezen aan een bericht door Service Bus.
Staat ServiceBusMessageHeaders#STATE ServiceBusMessageState Nee De status van het bericht, die actief, uitgesteld of gepland kan zijn.

Ondersteuning voor partitiesleutels

Deze starter ondersteunt Service Bus-partitionering door het instellen van de partitiesleutel en sessie-id in de berichtkop. In deze sectie wordt uitgelegd hoe u een partitiesleutel instelt voor berichten.

Aanbevolen: gebruik ServiceBusMessageHeaders.PARTITION_KEY als sleutel van de koptekst.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Niet aanbevolen, maar momenteel ondersteund:AzureHeaders.PARTITION_KEY als sleutel van de header.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Notitie

Wanneer zowel ServiceBusMessageHeaders.PARTITION_KEY als AzureHeaders.PARTITION_KEY zijn ingesteld in de berichtkoppen, heeft ServiceBusMessageHeaders.PARTITION_KEY de voorkeur.

Sessieondersteuning

In dit voorbeeld ziet u hoe u de sessie-id van een bericht in de toepassing handmatig instelt.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Notitie

Wanneer de ServiceBusMessageHeaders.SESSION_ID is ingesteld in de berichtkoppen en er ook een andere ServiceBusMessageHeaders.PARTITION_KEY koptekst is ingesteld, wordt de waarde van de sessie-id uiteindelijk gebruikt om de waarde van de partitiesleutel te overschrijven.

Eigenschappen van Service Bus-client aanpassen

Ontwikkelaars kunnen AzureServiceClientBuilderCustomizer gebruiken om eigenschappen van de Service Bus-client aan te passen. In het volgende voorbeeld wordt de eigenschap sessionIdleTimeout in ServiceBusClientBuilderaangepast:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Monsters

Zie de azure-spring-boot-samples opslagplaats op GitHub voor meer informatie.

Spring-integratie met Azure Storage Queue

Sleutelbegrippen

Azure Queue Storage is een service voor het opslaan van grote aantallen berichten. U opent berichten vanaf elke locatie ter wereld via geverifieerde aanroepen via HTTP of HTTPS. Een wachtrijbericht kan maximaal 64 kB groot zijn. Een wachtrij kan miljoenen berichten bevatten, tot de totale capaciteitslimiet van een opslagaccount. Wachtrijen worden vaak gebruikt om een achterstand van werk te maken om asynchroon te verwerken.

Afhankelijkheid instellen

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Configuratie

Deze starter biedt de volgende configuratieopties:

Eigenschappen van verbindingsconfiguratie

Deze sectie bevat de configuratieopties die worden gebruikt voor het maken van verbinding met Azure Storage Queue.

Notitie

Als u ervoor kiest om een beveiligingsprincipaal te gebruiken om te verifiëren en autoriseren met Microsoft Entra ID voor toegang tot een Azure-resource, raadpleegt u Toegang autoriseren met Microsoft Entra ID om ervoor te zorgen dat de beveiligingsprincipaal de juiste machtiging heeft gekregen voor toegang tot de Azure-resource.

Configureerbare verbindingseigenschappen van spring-cloud-azure-starter-integration-storage-queue:

Eigenschap Type Beschrijving
spring.cloud.azure.storage.queue.enabled booleaans Of een Azure Storage-wachtrij is ingeschakeld.
spring.cloud.azure.storage.queue.connection-string Snaar Verbindingsreekswaarde voor opslagwachtrijnaamruimte.
spring.cloud.azure.storage.queue.accountName Snaar Naam van opslagwachtrijaccount.
spring.cloud.azure.storage.queue.accountKey Snaar Opslagwachtrijaccountsleutel.
spring.cloud.azure.storage.queue.endpoint Snaar Service-eindpunt voor opslagwachtrij.
spring.cloud.azure.storage.queue.sasToken Snaar Sas-tokenreferenties
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion die wordt gebruikt bij het maken van API-aanvragen.
spring.cloud.azure.storage.queue.messageEncoding Snaar Wachtrijberichtcodering.

Basisgebruik

Berichten verzenden naar Azure Storage Queue

  1. Vul de configuratieopties voor referenties in.

    • Voor referenties als verbindingsreeks configureert u de volgende eigenschappen in uw application.yml-bestand:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      
    • Voor referenties als beheerde identiteiten configureert u de volgende eigenschappen in uw application.yml-bestand:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Notitie

De waarden die zijn toegestaan voor tenant-id zijn: common, organizations, consumersof de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.

  • Voor referenties als service-principal configureert u de volgende eigenschappen in uw application.yml-bestand:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Notitie

De waarden die zijn toegestaan voor tenant-id zijn: common, organizations, consumersof de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.

  1. Maak DefaultMessageHandler met de StorageQueueTemplate bean om berichten naar de opslagwachtrij te verzenden.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Maak een Message Gateway-binding met de bovenstaande berichthandler via een berichtkanaal.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Berichten verzenden met behulp van de gateway.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Berichten ontvangen van Azure Storage Queue

  1. Vul de configuratieopties voor referenties in.

  2. Maak een san van het berichtkanaal als invoerkanaal.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Maak StorageQueueMessageSource met de StorageQueueTemplate bean voor het ontvangen van berichten naar de opslagwachtrij.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Maak een berichtontvangerbinding met StorageQueueMessageSource die in de laatste stap is gemaakt via het berichtkanaal dat we eerder hebben gemaakt.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Monsters

Zie de azure-spring-boot-samples opslagplaats op GitHub voor meer informatie.