Spring Cloud Azure-ondersteuning voor Spring Integration
Dit artikel is van toepassing op:✅ versie 4.19.0 ✅ versie 5.19.0
Spring Integration Extension voor Azure biedt Spring Integration-adapters voor de verschillende services die worden geleverd door de Azure SDK voor Java. We bieden Spring Integration-ondersteuning voor deze Azure-services: Event Hubs, Service Bus, Opslagwachtrij. Hier volgt een lijst met ondersteunde adapters:
-
spring-cloud-azure-starter-integration-eventhubs
: zie Spring Integration met Azure Event Hubs voor meer informatie -
spring-cloud-azure-starter-integration-servicebus
: zie Spring Integration met Azure Service Bus voor meer informatie -
spring-cloud-azure-starter-integration-storage-queue
: zie Spring Integration met Azure Storage Queue voor meer informatie
Spring-integratie met Azure Event Hubs
Sleutelbegrippen
Azure Event Hubs is een streamingplatform voor big data en een service voor gebeurtenisopname. Het kan miljoenen gebeurtenissen per seconde ontvangen en verwerken. Gegevens die naar een Event Hub worden verzonden, kunnen worden getransformeerd en opgeslagen met behulp van een realtime analyseprovider of batchverwerking/opslagadapters.
Spring Integration maakt lichtgewicht berichten mogelijk binnen Spring-toepassingen en ondersteunt integratie met externe systemen via declaratieve adapters. Deze adapters bieden een hoger abstractieniveau ten opzichte van de ondersteuning van Spring voor externe communicatie, berichten en planning. Het Spring Integration voor Event Hubs-extensieproject biedt binnenkomende en uitgaande kanaaladapters en gateways voor Azure Event Hubs.
Notitie
RxJava-ondersteunings-API's worden verwijderd van versie 4.0.0. Zie Javadoc voor meer informatie.
Consumentengroep
Event Hubs biedt vergelijkbare ondersteuning voor consumentengroepen als Apache Kafka, maar met iets andere logica. Hoewel Kafka alle vastgelegde offsets in de broker opslaat, moet u offsets opslaan van Event Hubs-berichten die handmatig worden verwerkt. Event Hubs SDK biedt de functie voor het opslaan van dergelijke offsets in Azure Storage.
Ondersteuning voor partitionering
Event Hubs biedt een vergelijkbaar concept van fysieke partitie als Kafka. Maar in tegenstelling tot de automatische herverdeling van Kafka tussen consumenten en partities, biedt Event Hubs een soort preventieve modus. Het opslagaccount fungeert als een lease om te bepalen welke partitie eigendom is van welke consument. Wanneer een nieuwe consument begint, wordt geprobeerd om enkele partities te stelen van de meeste zwaar belaste consumenten om de workloadverdeling te bereiken.
Ontwikkelaars kunnen EventHubsContainerProperties
voor de configuratie gebruiken om de taakverdelingsstrategie op te geven. Zie de volgende sectie voor een voorbeeld van het configureren van EventHubsContainerProperties
.
Ondersteuning voor Batch-consumenten
De EventHubsInboundChannelAdapter
ondersteunt de batchverwerkingsmodus. Om deze functie in te schakelen, kunnen gebruikers de listenermodus opgeven als ListenerMode.BATCH
bij het maken van een EventHubsInboundChannelAdapter
exemplaar.
Wanneer deze optie is ingeschakeld, wordt een Message waarvan de nettolading een lijst met batchgebeurtenissen is ontvangen en doorgegeven aan het downstreamkanaal. Elke berichtkop wordt ook geconverteerd als een lijst, waarvan de inhoud de bijbehorende headerwaarde is die van elke gebeurtenis wordt geparseerd. Voor de gemeenschappelijke headers van partitie-id, controlepunt en laatste enqueued eigenschappen, worden ze weergegeven als één waarde voor de hele batch gebeurtenissen dezelfde. Zie de sectie Event Hubs-berichtkoppen voor meer informatie.
Notitie
De controlepuntkop bestaat alleen wanneer HANDMATIGe controlepuntmodus wordt gebruikt.
Controlepunten van batchconsumer ondersteunen twee modi: BATCH
en MANUAL
.
BATCH
-modus is een modus voor automatische controlepunten om de hele batch gebeurtenissen samen te controleren zodra ze zijn ontvangen.
MANUAL
modus is het controleren van de gebeurtenissen door gebruikers. Wanneer dit wordt gebruikt, wordt de Checkpointer- doorgegeven aan de berichtkop en kunnen gebruikers deze gebruiken om controlepunten uit te voeren.
Het beleid voor batchverwerking kan worden opgegeven door eigenschappen van max-size
en max-wait-time
, waarbij max-size
een vereiste eigenschap is terwijl max-wait-time
optioneel is.
Ontwikkelaars kunnen EventHubsContainerProperties
voor de configuratie gebruiken om de strategie voor batchgebruik op te geven. Zie de volgende sectie voor een voorbeeld van het configureren van EventHubsContainerProperties
.
Afhankelijkheid instellen
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Configuratie
Deze starter biedt de volgende drie onderdelen van configuratieopties:
Eigenschappen van verbindingsconfiguratie
Deze sectie bevat de configuratieopties die worden gebruikt voor het maken van verbinding met Azure Event Hubs.
Notitie
Als u ervoor kiest om een beveiligingsprincipaal te gebruiken om te verifiëren en autoriseren met Microsoft Entra ID voor toegang tot een Azure-resource, raadpleegt u Toegang autoriseren met Microsoft Entra ID om ervoor te zorgen dat de beveiligingsprincipaal de juiste machtiging heeft gekregen voor toegang tot de Azure-resource.
Configureerbare eigenschappen van spring-cloud-azure-starter-integration-eventhubs:
Eigenschap | Type | Beschrijving |
---|---|---|
spring.cloud.azure.eventhubs.enabled | booleaans | Of een Azure Event Hubs is ingeschakeld. |
spring.cloud.azure.eventhubs.connection-string | Snaar | Waarde van verbindingsreeks voor Event Hubs-naamruimte. |
spring.cloud.azure.eventhubs.namespace | Snaar | Event Hubs-naamruimtewaarde, het voorvoegsel van de FQDN. Een FQDN moet bestaan uit NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Snaar | Domeinnaam van een Azure Event Hubs-naamruimtewaarde. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Snaar | Aangepast eindpuntadres. |
spring.cloud.azure.eventhubs.shared-connection | Booleaans | Of de onderliggende EventProcessorClient en EventHubProducerAsyncClient dezelfde verbinding gebruiken. Standaard wordt er een nieuwe verbinding gemaakt en gebruikt voor elke Gemaakte Event Hub-client. |
Eigenschappen van controlepuntconfiguratie
Deze sectie bevat de configuratieopties voor de Storage Blobs-service, die wordt gebruikt voor het behouden van het eigendom van de partitie en controlepuntgegevens.
Notitie
Wanneer vanaf versie 4.0.0 de eigenschap van spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists is niet handmatig is ingeschakeld, wordt er geen opslagcontainer automatisch gemaakt.
Controlepunt configureerbare eigenschappen van spring-cloud-azure-starter-integration-eventhubs:
Eigenschap | Type | Beschrijving |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booleaans | Of u containers wilt maken als deze niet bestaat. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | Snaar | Naam voor het opslagaccount. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | Snaar | Toegangssleutel voor opslagaccount. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | Snaar | Naam van opslagcontainer. |
Algemene configuratieopties voor Azure Service SDK kunnen ook worden geconfigureerd voor opslagblobcontrolepuntopslag. De ondersteunde configuratieopties worden geïntroduceerd in Spring Cloud Azure-configuratieen kunnen worden geconfigureerd met het geïntegreerde voorvoegsel spring.cloud.azure.
of het voorvoegsel van spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Configuratie-eigenschappen van Event Hub-processor
De EventHubsInboundChannelAdapter
gebruikt de EventProcessorClient
om berichten van een Event Hub te gebruiken om de algemene eigenschappen van een EventProcessorClient
te configureren. Ontwikkelaars kunnen EventHubsContainerProperties
gebruiken voor de configuratie. Zie de volgende sectie over het werken met EventHubsInboundChannelAdapter
.
Basisgebruik
Berichten verzenden naar Azure Event Hubs
Vul de configuratieopties voor referenties in.
Voor referenties als verbindingsreeks configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Voor referenties als beheerde identiteiten configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Voor referenties als service-principal configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Notitie
De waarden die zijn toegestaan voor tenant-id
zijn: common
, organizations
, consumers
of de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.
Maak
DefaultMessageHandler
met deEventHubsTemplate
bean om berichten naar Event Hubs te verzenden.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Maak een berichtgatewaybinding met de bovenstaande berichthandler via een berichtkanaal.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Berichten verzenden met behulp van de gateway.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Berichten ontvangen van Azure Event Hubs
Vul de configuratieopties voor referenties in.
Maak een san van het berichtkanaal als invoerkanaal.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Maak
EventHubsInboundChannelAdapter
met deEventHubsMessageListenerContainer
bean om berichten van Event Hubs te ontvangen.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Maak een berichtontvangerbinding met EventHubsInboundChannelAdapter via het berichtkanaal dat eerder is gemaakt.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
EventHubsMessageConverter configureren om objectMapper aan te passen
EventHubsMessageConverter
wordt gemaakt als configureerbare bean, zodat gebruikers ObjectMapper kunnen aanpassen.
Ondersteuning voor Batch-consumenten
Als u berichten van Event Hubs in batches wilt gebruiken, is vergelijkbaar met het bovenstaande voorbeeld, moeten gebruikers naast gebruikers de gerelateerde configuratieopties voor batchverwerking instellen voor EventHubsInboundChannelAdapter
.
Wanneer u EventHubsInboundChannelAdapter
maakt, moet de listenermodus worden ingesteld als BATCH
. Wanneer u een bean van EventHubsMessageListenerContainer
maakt, stelt u de controlepuntmodus in als MANUAL
of BATCH
en kunt u de batchopties zo nodig configureren.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Event Hubs-berichtkoppen
In de volgende tabel ziet u hoe de eigenschappen van Event Hubs-berichten worden toegewezen aan Spring-berichtkoppen. Voor Azure Event Hubs wordt het bericht aangeroepen als event
.
Toewijzing tussen Event Hubs-bericht-/gebeurteniseigenschappen en Spring Message-headers in recordlistenermodus:
Event Hubs-gebeurteniseigenschappen | Spring Message Header Constanten | Type | Beschrijving |
---|---|---|---|
Tijd van enquête | EventHubsHeaders#ENQUEUED_TIME | Oogwenk | Het moment, in UTC, van toen de gebeurtenis werd geïnventueerd in de Event Hub-partitie. |
Afstand | EventHubsHeaders#OFFSET | Lang | De offset van de gebeurtenis toen deze werd ontvangen van de bijbehorende Event Hub-partitie. |
Partitiesleutel | AzureHeaders#PARTITION_KEY | Snaar | De partitie-hashsleutel als deze is ingesteld bij het oorspronkelijk publiceren van de gebeurtenis. |
Partitie-id | AzureHeaders#RAW_PARTITION_ID | Snaar | De partitie-id van de Event Hub. |
Volgnummer | EventHubsHeaders#SEQUENCE_NUMBER | Lang | Het volgnummer dat is toegewezen aan de gebeurtenis toen deze werd gevraagd in de bijbehorende Event Hub-partitie. |
Laatst geïnventueerde gebeurteniseigenschappen | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | De eigenschappen van de laatste enqueuedgebeurtenis in deze partitie. |
NA | AzureHeaders#CHECKPOINTER | Controlepunt | De koptekst voor het controlepunt van het specifieke bericht. |
Gebruikers kunnen de berichtkoppen parseren voor de gerelateerde informatie van elke gebeurtenis. Als u een berichtkop voor de gebeurtenis wilt instellen, worden alle aangepaste headers als een toepassingseigenschap van een gebeurtenis geplaatst, waarbij de header als eigenschapssleutel wordt ingesteld. Wanneer gebeurtenissen worden ontvangen van Event Hubs, worden alle toepassingseigenschappen geconverteerd naar de berichtkop.
Notitie
Berichtkoppen van partitiesleutel, ge enqueued tijd, offset en volgnummer worden niet ondersteund om handmatig in te stellen.
Wanneer de batchconsumermodus is ingeschakeld, worden de specifieke headers van batchberichten als volgt weergegeven. Deze bevat een lijst met waarden van elke Event Hubs-gebeurtenis.
Toewijzing tussen Event Hubs-bericht-/gebeurteniseigenschappen en Spring Message-headers in batch-listenermodus:
Event Hubs-gebeurteniseigenschappen | Spring Batch-berichtkopconstanten | Type | Beschrijving |
---|---|---|---|
Tijd van enquête | EventHubsHeaders#ENQUEUED_TIME | Lijst met direct | Lijst van de directe, in UTC, van wanneer elke gebeurtenis is geïnventareerd in de Event Hub-partitie. |
Afstand | EventHubsHeaders#OFFSET | Lijst met lange | Lijst van de offset van elke gebeurtenis wanneer deze is ontvangen van de bijbehorende Event Hub-partitie. |
Partitiesleutel | AzureHeaders#PARTITION_KEY | Lijst met tekenreeksen | Lijst met de partitie-hashsleutel als deze is ingesteld bij het oorspronkelijk publiceren van elke gebeurtenis. |
Volgnummer | EventHubsHeaders#SEQUENCE_NUMBER | Lijst met lange | Lijst met het volgnummer dat aan elke gebeurtenis is toegewezen toen deze werd gevraagt in de bijbehorende Event Hub-partitie. |
Systeemeigenschappen | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Lijst met kaart | Lijst met de systeemeigenschappen van elke gebeurtenis. |
Toepassingseigenschappen | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Lijst met kaart | Lijst met de toepassingseigenschappen van elke gebeurtenis, waarbij alle aangepaste berichtkoppen of gebeurteniseigenschappen worden geplaatst. |
Notitie
Wanneer u berichten publiceert, worden alle bovenstaande batchheaders verwijderd uit de berichten, indien aanwezig.
Monsters
Zie de azure-spring-boot-samples opslagplaats op GitHub voor meer informatie.
Spring-integratie met Azure Service Bus
Sleutelbegrippen
Spring Integration maakt lichtgewicht berichten mogelijk binnen Spring-toepassingen en ondersteunt integratie met externe systemen via declaratieve adapters.
Het Spring Integration for Azure Service Bus-extensieproject biedt binnenkomende en uitgaande kanaaladapters voor Azure Service Bus.
Notitie
CompleteFuture-ondersteunings-API's zijn afgeschaft van versie 2.10.0 en vervangen door Reactor Core van versie 4.0.0. Zie Javadoc voor meer informatie.
Afhankelijkheid instellen
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Configuratie
Deze starter biedt de volgende twee onderdelen van configuratieopties:
Eigenschappen van verbindingsconfiguratie
Deze sectie bevat de configuratieopties die worden gebruikt om verbinding te maken met Azure Service Bus.
Notitie
Als u ervoor kiest om een beveiligingsprincipaal te gebruiken om te verifiëren en autoriseren met Microsoft Entra ID voor toegang tot een Azure-resource, raadpleegt u Toegang autoriseren met Microsoft Entra ID om ervoor te zorgen dat de beveiligingsprincipaal de juiste machtiging heeft gekregen voor toegang tot de Azure-resource.
Configureerbare eigenschappen van spring-cloud-azure-starter-integration-servicebus:
Eigenschap | Type | Beschrijving |
---|---|---|
spring.cloud.azure.servicebus.enabled | booleaans | Of een Azure Service Bus is ingeschakeld. |
spring.cloud.azure.servicebus.connection-string | Snaar | Service Bus-naamruimteverbindingsreekswaarde. |
spring.cloud.azure.servicebus.custom-endpoint-address | Snaar | Het aangepaste eindpuntadres dat moet worden gebruikt bij het maken van verbinding met Service Bus. |
spring.cloud.azure.servicebus.namespace | Snaar | Service Bus-naamruimtewaarde, het voorvoegsel van de FQDN. Een FQDN moet bestaan uit NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Snaar | Domeinnaam van een Azure Service Bus-naamruimtewaarde. |
Configuratie-eigenschappen van Service Bus-processor
De ServiceBusInboundChannelAdapter
gebruikt de ServiceBusProcessorClient
om berichten te gebruiken, om de algemene eigenschappen van een ServiceBusProcessorClient
te configureren, kunnen ontwikkelaars ServiceBusContainerProperties
gebruiken voor de configuratie. Zie de volgende sectie over het werken met ServiceBusInboundChannelAdapter
.
Basisgebruik
Berichten verzenden naar Azure Service Bus
Vul de configuratieopties voor referenties in.
Voor referenties als verbindingsreeks configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Voor referenties als beheerde identiteiten configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Notitie
De waarden die zijn toegestaan voor tenant-id
zijn: common
, organizations
, consumers
of de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.
Voor referenties als service-principal configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Notitie
De waarden die zijn toegestaan voor tenant-id
zijn: common
, organizations
, consumers
of de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.
Maak
DefaultMessageHandler
met deServiceBusTemplate
bean om berichten naar Service Bus te verzenden, stel het entiteitstype voor de ServiceBusTemplate in. In dit voorbeeld wordt Service Bus Queue gebruikt als voorbeeld.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Maak een berichtgatewaybinding met de bovenstaande berichthandler via een berichtkanaal.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Berichten verzenden met behulp van de gateway.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Berichten ontvangen van Azure Service Bus
Vul de configuratieopties voor referenties in.
Maak een san van het berichtkanaal als invoerkanaal.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Maak
ServiceBusInboundChannelAdapter
met deServiceBusMessageListenerContainer
bean om berichten naar Service Bus te ontvangen. In dit voorbeeld wordt Service Bus Queue gebruikt als voorbeeld.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Maak een berichtontvangerbinding met
ServiceBusInboundChannelAdapter
via het berichtkanaal dat we eerder hebben gemaakt.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
ServiceBusMessage Converter configureren om objectMapper aan te passen
ServiceBusMessageConverter
wordt gemaakt als configureerbare bean, zodat gebruikers ObjectMapper
kunnen aanpassen.
Service Bus-berichtkoppen
Voor sommige Service Bus-headers die kunnen worden toegewezen aan meerdere Spring-headerconstanten, wordt de prioriteit van verschillende Spring-headers vermeld.
Toewijzing tussen Service Bus-headers en Spring-headers:
Service Bus-berichtkoppen en -eigenschappen | Koptekstconstanten van springbericht | Type | Configureerbare | Beschrijving |
---|---|---|---|---|
Inhoudstype | MessageHeaders#CONTENT_TYPE |
Snaar | Ja | De RFC2045 inhoudstypedescriptor van het bericht. |
Correlatie-id | ServiceBusMessageHeaders#CORRELATION_ID |
Snaar | Ja | De correlatie-id van het bericht |
Bericht-id | ServiceBusMessageHeaders#MESSAGE_ID |
Snaar | Ja | De bericht-id van het bericht heeft deze koptekst een hogere prioriteit dan MessageHeaders#ID . |
Bericht-id | MessageHeaders#ID |
UUID | Ja | De bericht-id van het bericht heeft een lagere prioriteit dan ServiceBusMessageHeaders#MESSAGE_ID . |
Partitiesleutel | ServiceBusMessageHeaders#PARTITION_KEY |
Snaar | Ja | De partitiesleutel voor het verzenden van het bericht naar een gepartitioneerde entiteit. |
Beantwoorden | MessageHeaders#REPLY_CHANNEL |
Snaar | Ja | Het adres van een entiteit waarnaar antwoorden moeten worden verzonden. |
Sessie-id beantwoorden | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Snaar | Ja | De eigenschapswaarde ReplyToGroupId van het bericht. |
Geplande tijd in wachtrij utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Ja | De datum/tijd waarop het bericht moet worden ge enqueued in Service Bus, deze kop heeft een hogere prioriteit dan AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Geplande tijd in wachtrij utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Geheel getal | Ja | De datum/tijd waarop het bericht in Service Bus moet worden verzonden, heeft deze koptekst een lagere prioriteit dan ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
Sessie-id | ServiceBusMessageHeaders#SESSION_ID |
Snaar | Ja | De sessie-IDentifier voor een sessiebewuste entiteit. |
Time to live | ServiceBusMessageHeaders#TIME_TO_LIVE |
Duur | Ja | De duur van de tijd voordat dit bericht verloopt. |
Aan | ServiceBusMessageHeaders#TO |
Snaar | Ja | Het 'aan'-adres van het bericht, gereserveerd voor toekomstig gebruik in routeringsscenario's en momenteel genegeerd door de broker zelf. |
Onderwerp | ServiceBusMessageHeaders#SUBJECT |
Snaar | Ja | Het onderwerp voor het bericht. |
Beschrijving van fout met dode letter | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Snaar | Nee | De beschrijving van een bericht dat in een dode letter is geschreven. |
Reden voor dode letter | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Snaar | Nee | De reden dat een bericht dood is geschreven. |
Bron van dode letter | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Snaar | Nee | De entiteit waarin het bericht dood is geschreven. |
Aantal bezorgingen | ServiceBusMessageHeaders#DELIVERY_COUNT |
lang | Nee | Het aantal keren dat dit bericht aan clients is bezorgd. |
Gesemplementeerd volgnummer | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
lang | Nee | Het volgnummer dat is toegewezen aan een bericht door Service Bus. |
Tijd van enquête | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Nee | De datum/tijd waarop dit bericht is verzonden in Service Bus. |
Verloopt op | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Nee | De datum/tijd waarop dit bericht verloopt. |
Token vergrendelen | ServiceBusMessageHeaders#LOCK_TOKEN |
Snaar | Nee | Het vergrendelingstoken voor het huidige bericht. |
Vergrendeld tot | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Nee | De datum/tijd waarop de vergrendeling van dit bericht verloopt. |
Volgnummer | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
lang | Nee | Het unieke nummer dat is toegewezen aan een bericht door Service Bus. |
Staat | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Nee | De status van het bericht, die actief, uitgesteld of gepland kan zijn. |
Ondersteuning voor partitiesleutels
Deze starter ondersteunt Service Bus-partitionering door het instellen van de partitiesleutel en sessie-id in de berichtkop. In deze sectie wordt uitgelegd hoe u een partitiesleutel instelt voor berichten.
Aanbevolen: gebruik ServiceBusMessageHeaders.PARTITION_KEY
als sleutel van de koptekst.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Niet aanbevolen, maar momenteel ondersteund:AzureHeaders.PARTITION_KEY
als sleutel van de header.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Notitie
Wanneer zowel ServiceBusMessageHeaders.PARTITION_KEY
als AzureHeaders.PARTITION_KEY
zijn ingesteld in de berichtkoppen, heeft ServiceBusMessageHeaders.PARTITION_KEY
de voorkeur.
Sessieondersteuning
In dit voorbeeld ziet u hoe u de sessie-id van een bericht in de toepassing handmatig instelt.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Notitie
Wanneer de ServiceBusMessageHeaders.SESSION_ID
is ingesteld in de berichtkoppen en er ook een andere ServiceBusMessageHeaders.PARTITION_KEY
koptekst is ingesteld, wordt de waarde van de sessie-id uiteindelijk gebruikt om de waarde van de partitiesleutel te overschrijven.
Eigenschappen van Service Bus-client aanpassen
Ontwikkelaars kunnen AzureServiceClientBuilderCustomizer
gebruiken om eigenschappen van de Service Bus-client aan te passen. In het volgende voorbeeld wordt de eigenschap sessionIdleTimeout
in ServiceBusClientBuilder
aangepast:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
Monsters
Zie de azure-spring-boot-samples opslagplaats op GitHub voor meer informatie.
Spring-integratie met Azure Storage Queue
Sleutelbegrippen
Azure Queue Storage is een service voor het opslaan van grote aantallen berichten. U opent berichten vanaf elke locatie ter wereld via geverifieerde aanroepen via HTTP of HTTPS. Een wachtrijbericht kan maximaal 64 kB groot zijn. Een wachtrij kan miljoenen berichten bevatten, tot de totale capaciteitslimiet van een opslagaccount. Wachtrijen worden vaak gebruikt om een achterstand van werk te maken om asynchroon te verwerken.
Afhankelijkheid instellen
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Configuratie
Deze starter biedt de volgende configuratieopties:
Eigenschappen van verbindingsconfiguratie
Deze sectie bevat de configuratieopties die worden gebruikt voor het maken van verbinding met Azure Storage Queue.
Notitie
Als u ervoor kiest om een beveiligingsprincipaal te gebruiken om te verifiëren en autoriseren met Microsoft Entra ID voor toegang tot een Azure-resource, raadpleegt u Toegang autoriseren met Microsoft Entra ID om ervoor te zorgen dat de beveiligingsprincipaal de juiste machtiging heeft gekregen voor toegang tot de Azure-resource.
Configureerbare verbindingseigenschappen van spring-cloud-azure-starter-integration-storage-queue:
Eigenschap | Type | Beschrijving |
---|---|---|
spring.cloud.azure.storage.queue.enabled | booleaans | Of een Azure Storage-wachtrij is ingeschakeld. |
spring.cloud.azure.storage.queue.connection-string | Snaar | Verbindingsreekswaarde voor opslagwachtrijnaamruimte. |
spring.cloud.azure.storage.queue.accountName | Snaar | Naam van opslagwachtrijaccount. |
spring.cloud.azure.storage.queue.accountKey | Snaar | Opslagwachtrijaccountsleutel. |
spring.cloud.azure.storage.queue.endpoint | Snaar | Service-eindpunt voor opslagwachtrij. |
spring.cloud.azure.storage.queue.sasToken | Snaar | Sas-tokenreferenties |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion die wordt gebruikt bij het maken van API-aanvragen. |
spring.cloud.azure.storage.queue.messageEncoding | Snaar | Wachtrijberichtcodering. |
Basisgebruik
Berichten verzenden naar Azure Storage Queue
Vul de configuratieopties voor referenties in.
Voor referenties als verbindingsreeks configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
Voor referenties als beheerde identiteiten configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Notitie
De waarden die zijn toegestaan voor tenant-id
zijn: common
, organizations
, consumers
of de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.
Voor referenties als service-principal configureert u de volgende eigenschappen in uw application.yml-bestand:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
Notitie
De waarden die zijn toegestaan voor tenant-id
zijn: common
, organizations
, consumers
of de tenant-id. Zie voor meer informatie over deze waarden het Het verkeerde eindpunt (persoonlijke en organisatieaccounts) gebruikt sectie van Fout AADSTS50020 - Gebruikersaccount van id-provider bestaat niet in tenant. Zie App met één tenant converteren naar multitenant op Microsoft Entra IDvoor meer informatie over het converteren van uw app met één tenant.
Maak
DefaultMessageHandler
met deStorageQueueTemplate
bean om berichten naar de opslagwachtrij te verzenden.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Maak een Message Gateway-binding met de bovenstaande berichthandler via een berichtkanaal.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Berichten verzenden met behulp van de gateway.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Berichten ontvangen van Azure Storage Queue
Vul de configuratieopties voor referenties in.
Maak een san van het berichtkanaal als invoerkanaal.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Maak
StorageQueueMessageSource
met deStorageQueueTemplate
bean voor het ontvangen van berichten naar de opslagwachtrij.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Maak een berichtontvangerbinding met StorageQueueMessageSource die in de laatste stap is gemaakt via het berichtkanaal dat we eerder hebben gemaakt.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Monsters
Zie de azure-spring-boot-samples opslagplaats op GitHub voor meer informatie.