Suporte do Spring Cloud Azure para integração com o Spring
Este artigo aplica-se a: ✔️ Versão 4.14.0 Versão 5.8.0 ✔️
A Extensão de Integração do Spring para o Azure fornece adaptadores de Integração do Spring para os vários serviços fornecidos pelo SDK do Azure para Java. Fornecemos suporte de integração Spring para estes serviços do Azure: Hubs de Eventos, Barramento de Serviço, Fila de Armazenamento. A seguir está uma lista de adaptadores suportados:
spring-cloud-azure-starter-integration-eventhubs
- para obter mais informações, consulte Integração do Spring com Hubs de Eventos do Azurespring-cloud-azure-starter-integration-servicebus
- para obter mais informações, consulte Integração do Spring com o Barramento de Serviço do Azurespring-cloud-azure-starter-integration-storage-queue
- para obter mais informações, consulte Integração do Spring com a Fila de Armazenamento do Azure
Integração do Spring com Hubs de Eventos do Azure
Conceitos principais
Os Hubs de Eventos do Azure são uma plataforma de streaming de Big Data e um serviço de ingestão de eventos. Ele pode receber e processar milhões de eventos por segundo. Os dados enviados para um hub de eventos podem ser transformados e armazenados usando qualquer provedor de análise em tempo real ou adaptadores de envio em lote/armazenamento.
O Spring Integration permite mensagens leves em aplicativos baseados no Spring e oferece suporte à integração com sistemas externos por meio de adaptadores declarativos. Esses adaptadores fornecem um nível mais alto de abstração em relação ao suporte do Spring para comunicação remota, mensagens e agendamento. O projeto de extensão Spring Integration for Event Hubs fornece adaptadores de canal de entrada e saída e gateways para Hubs de Eventos do Azure.
Observação
As APIs de suporte do RxJava são descartadas da versão 4.0.0. Consulte Javadoc para obter detalhes.
Grupo de consumidores
Os Hubs de Eventos fornecem suporte de grupo de consumidores semelhante ao Apache Kafka, mas com lógica ligeiramente diferente. Enquanto o Kafka armazena todos os deslocamentos confirmados no broker, você precisa armazenar deslocamentos de mensagens dos Hubs de Eventos que estão sendo processados manualmente. O SDK dos Hubs de Eventos fornece a função para armazenar esses deslocamentos dentro do Armazenamento do Azure.
Suporte ao particionamento
Os Hubs de Eventos fornecem um conceito semelhante de partição física como Kafka. Mas, ao contrário do rebalanceamento automático de Kafka entre consumidores e partições, os Hubs de Eventos fornecem uma espécie de modo preventivo. A conta de armazenamento atua como uma concessão para determinar qual partição pertence a qual consumidor. Quando um novo consumidor é iniciado, ele tentará roubar algumas partições da maioria dos consumidores carregados para obter o balanceamento de carga de trabalho.
Para especificar a estratégia de balanceamento de carga, os desenvolvedores podem usar EventHubsContainerProperties
para a configuração. Consulte a seção a seguir para obter um exemplo de como configurar EventHubsContainerProperties
o .
Suporte ao consumidor em lote
O EventHubsInboundChannelAdapter
suporta o modo de consumo em lote. Para habilitá-lo, os usuários podem especificar o modo de ouvinte como ListenerMode.BATCH
ao construir uma EventHubsInboundChannelAdapter
instância.
Quando habilitada, uma Mensagem da qual a carga útil é uma lista de eventos em lote será recebida e passada para o canal downstream. Cada cabeçalho de mensagem também é convertido como uma lista, cujo conteúdo é o valor de cabeçalho associado analisado de cada evento. Para os cabeçalhos comuns de ID de partição, ponteiro de verificação e últimas propriedades enfileiradas, eles são apresentados como um único valor para todo o lote de compartilhamentos de eventos do mesmo. Para obter mais informações, consulte a seção Cabeçalhos de Mensagem dos Hubs de Eventos.
Observação
O cabeçalho do ponto de verificação só existe quando o modo de ponto de verificação MANUAL é usado.
O ponto de verificação do consumidor em lote oferece suporte a dois modos: BATCH
e MANUAL
. BATCH
mode é um modo de checkpoint automático para verificar todo o lote de eventos juntos depois que eles são recebidos. MANUAL
modo é verificar os eventos pelos usuários. Quando usado, o Checkpointer será passado para o cabeçalho da mensagem e os usuários poderão usá-lo para fazer o checkpointing.
A política de consumo em lote pode ser especificada pelas propriedades de max-size
e , onde max-size
é uma propriedade necessária enquanto max-wait-time
é max-wait-time
opcional.
Para especificar a estratégia de consumo de lote, os desenvolvedores podem usar EventHubsContainerProperties
para a configuração. Consulte a seção a seguir para obter um exemplo de como configurar EventHubsContainerProperties
o .
Configuração de dependência
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Configuração
Este iniciador fornece as seguintes 3 partes de opções de configuração:
Propriedades de configuração de conexão
Esta seção contém as opções de configuração usadas para se conectar aos Hubs de Eventos do Azure.
Observação
Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.
Propriedades configuráveis de conexão de spring-cloud-azure-starter-integration-eventhubs:
Propriedade | Type | Descrição |
---|---|---|
spring.cloud.azure.eventhubs.enabled | boolean | Se um Hubs de Eventos do Azure está habilitado. |
spring.cloud.azure.eventhubs.connection-string | String | Valor da cadeia de conexão do namespace dos Hubs de Eventos. |
spring.cloud.azure.eventhubs.namespace | String | Valor do namespace dos Hubs de Eventos, que é o prefixo do FQDN. Um FQDN deve ser composto de NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | String | Nome de domínio de um valor de namespace dos Hubs de Eventos do Azure. |
spring.cloud.azure.eventhubs.custom-endpoint-address | String | Endereço de ponto de extremidade personalizado. |
spring.cloud.azure.eventhubs.shared-connection | Booliano | Se o EventProcessorClient e o EventHubProducerAsyncClient subjacentes usam a mesma conexão. Por padrão, uma nova conexão é construída e usada criada para cada cliente do Hub de Eventos criado. |
Propriedades de configuração do ponto de verificação
Esta seção contém as opções de configuração para o serviço Blobs de Armazenamento, que é usado para a persistência de informações de propriedade e ponto de verificação de partição.
Observação
A partir da versão 4.0.0, quando a propriedade de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists não estiver habilitada manualmente, nenhum contêiner de armazenamento será criado automaticamente.
Propriedades configuráveis de checkpoint de spring-cloud-azure-starter-integration-eventhubs:
Propriedade | Type | Descrição |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | Booliano | Se a criação de contêineres deve ser permitida, se não existir. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | String | Nome da conta de armazenamento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | String | Chave de acesso da conta de armazenamento. |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | String | Nome do contêiner de armazenamento. |
As opções de configuração comuns do SDK do Serviço do Azure também são configuráveis para o armazenamento do ponto de verificação do Blob de Armazenamento. As opções de configuração com suporte são introduzidas na configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado ou o prefixo spring.cloud.azure.
de spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Propriedades de configuração do processador do Hub de Eventos
O EventHubsInboundChannelAdapter
usa o EventProcessorClient
para consumir mensagens de um hub de eventos, para configurar as propriedades gerais de um EventProcessorClient
, os desenvolvedores podem usar EventHubsContainerProperties
para a configuração. Consulte a seção a seguir sobre como trabalhar com EventHubsInboundChannelAdapter
o .
Uso básico
Enviar mensagens para os Hubs de Eventos do Azure
Preencha as opções de configuração de credenciais.
Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml :
spring: cloud: azure: eventhubs: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Para credenciais como identidades gerenciadas, configure as seguintes propriedades no arquivo application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Para credenciais como entidade de serviço, configure as seguintes propriedades no arquivo application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Observação
Os valores permitidos sãotenant-id
: common
, , , organizations
consumers
ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e de organização) do Erro AADSTS50020 - A conta de usuário do provedor de identidade não existe no locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.
Crie
DefaultMessageHandler
com oEventHubsTemplate
bean para enviar mensagens aos Hubs de Eventos.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Crie uma associação de gateway de mensagens com o manipulador de mensagens acima por meio de um canal de mensagens.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Envie mensagens usando o gateway.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Receber mensagens dos Hubs de Eventos do Azure
Preencha as opções de configuração de credenciais.
Crie um bean de canal de mensagem como o canal de entrada.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Crie
EventHubsInboundChannelAdapter
com oEventHubsMessageListenerContainer
bean para receber mensagens dos Hubs de Eventos.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Crie uma associação de receptor de mensagem com EventHubsInboundChannelAdapter por meio do canal de mensagem criado anteriormente.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Configurar EventHubsMessageConverter para personalizar objectMapper
EventHubsMessageConverter
é feito como um bean configurável para permitir que os usuários personalizem o ObjectMapper.
Suporte ao consumidor em lote
Para consumir mensagens de Hubs de Eventos em lotes é semelhante com o exemplo acima, além de que os usuários devem definir as opções de configuração relacionadas ao consumo de lote para EventHubsInboundChannelAdapter
.
Ao criar EventHubsInboundChannelAdapter
, o modo de ouvinte deve ser definido como BATCH
. Ao criar o bean do , defina o modo de ponto de verificação como ou MANUAL
BATCH
, e as opções em lote podem ser configuradas EventHubsMessageListenerContainer
conforme necessário.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
Cabeçalhos de mensagens dos Hubs de Eventos
A tabela a seguir ilustra como as propriedades de mensagem dos Hubs de Eventos são mapeadas para cabeçalhos de mensagem do Spring. Para Hubs de Eventos do Azure, a mensagem é chamada como event
.
Mapeando entre Hubs de Eventos Propriedades de Mensagens/Eventos e Cabeçalhos de Mensagens Spring no Modo de Escuta de Registros:
Propriedades do evento Hubs de Eventos | Constantes de cabeçalho de mensagem de primavera | Tipo | Descrição |
---|---|---|---|
Tempo enfileirado | EventHubsHeaders#ENQUEUED_TIME | Instantâneos | O instante, em UTC, de quando o evento foi enfileirado na partição do Hub de Eventos. |
Compensação | EventHubsHeaders#OFFSET | Longo | O deslocamento do evento quando ele foi recebido da partição do Hub de Eventos associada. |
Chave de partição | AzureHeaders#PARTITION_KEY | String | A chave de hash de partição se ela foi definida ao publicar originalmente o evento. |
ID da Partição | AzureHeaders#RAW_PARTITION_ID | String | A ID de partição do Hub de Eventos. |
Número de sequência | EventHubsHeaders#SEQUENCE_NUMBER | Longo | O número de sequência atribuído ao evento quando ele foi enfileirado na partição do Hub de Eventos associada. |
Propriedades do último evento enfileirado | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | As propriedades do último evento enfileirado nesta partição. |
NA | AzureHeaders#CHECKPOINTER | Ponteiro de verificação | O cabeçalho do ponto de verificação da mensagem específica. |
Os usuários podem analisar os cabeçalhos das mensagens para obter as informações relacionadas de cada evento. Para definir um cabeçalho de mensagem para o evento, todos os cabeçalhos personalizados serão colocados como uma propriedade de aplicativo de um evento, onde o cabeçalho é definido como a chave de propriedade. Quando os eventos são recebidos dos Hubs de Eventos, todas as propriedades do aplicativo serão convertidas no cabeçalho da mensagem.
Observação
Não há suporte para cabeçalhos de mensagem de chave de partição, tempo enfileirado, deslocamento e número de sequência para serem definidos manualmente.
Quando o modo de consumidor em lote está habilitado, os cabeçalhos específicos das mensagens em lote são listados da seguinte forma, que contém uma lista de valores de cada evento único dos Hubs de Eventos.
Mapeando entre Propriedades de Mensagem/Evento de Hubs de Eventos e Cabeçalhos de Mensagens de Primavera no Modo de Escuta em Lote:
Propriedades do evento Hubs de Eventos | Constantes de cabeçalho de mensagem em lote de mola | Tipo | Descrição |
---|---|---|---|
Tempo enfileirado | EventHubsHeaders#ENQUEUED_TIME | Lista de Instantâneos | Lista do instante, em UTC, de quando cada evento foi enfileirado na partição do Hub de Eventos. |
Compensação | EventHubsHeaders#OFFSET | Lista de Long | Lista do deslocamento de cada evento quando ele foi recebido da partição do Hub de Eventos associada. |
Chave de partição | AzureHeaders#PARTITION_KEY | Lista de String | Lista da chave de hash de partição se ela foi definida ao publicar originalmente cada evento. |
Número de sequência | EventHubsHeaders#SEQUENCE_NUMBER | Lista de Long | Lista do número de sequência atribuído a cada evento quando ele foi enfileirado na partição do Hub de Eventos associada. |
Propriedades do sistema | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Lista de Mapa | Lista das propriedades do sistema de cada evento. |
Propriedades do aplicativo | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Lista de Mapa | Lista das propriedades do aplicativo de cada evento, onde todos os cabeçalhos de mensagem personalizados ou propriedades de evento são colocados. |
Observação
Ao publicar mensagens, todos os cabeçalhos em lote acima serão removidos das mensagens, se existirem.
Exemplos
Para obter mais informações, consulte o repositório azure-spring-boot-samples no GitHub.
Integração do Spring com o Barramento de Serviço do Azure
Conceitos principais
O Spring Integration permite mensagens leves em aplicativos baseados no Spring e oferece suporte à integração com sistemas externos por meio de adaptadores declarativos.
O projeto de extensão Spring Integration for Azure Service Bus fornece adaptadores de canal de entrada e saída para o Barramento de Serviço do Azure.
Observação
As APIs de suporte do CompletableFuture foram preteridas a partir da versão 2.10.0 e substituídas pelo Reactor Core a partir da versão 4.0.0. Consulte Javadoc para obter detalhes.
Configuração de dependência
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Configuração
Este iniciador fornece as seguintes 2 partes de opções de configuração:
Propriedades de configuração de conexão
Esta seção contém as opções de configuração usadas para se conectar ao Barramento de Serviço do Azure.
Observação
Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.
Propriedades configuráveis de conexão de spring-cloud-azure-starter-integration-servicebus:
Propriedade | Type | Descrição |
---|---|---|
spring.cloud.azure.servicebus.enabled | boolean | Se um Barramento de Serviço do Azure está habilitado. |
spring.cloud.azure.servicebus.connection-string | String | Valor da cadeia de conexão do Namespace do Barramento de Serviço. |
spring.cloud.azure.servicebus.namespace | String | Valor de namespace do Barramento de Serviço, que é o prefixo do FQDN. Um FQDN deve ser composto de NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | String | Nome de domínio de um valor de namespace do Barramento de Serviço do Azure. |
Propriedades de configuração do processador do Service Bus
O ServiceBusInboundChannelAdapter
usa o ServiceBusProcessorClient
para consumir mensagens, para configurar as propriedades gerais de um ServiceBusProcessorClient
, os desenvolvedores podem usar ServiceBusContainerProperties
para a configuração. Consulte a seção a seguir sobre como trabalhar com ServiceBusInboundChannelAdapter
o .
Uso básico
Enviar mensagens para o Barramento de Serviço do Azure
Preencha as opções de configuração de credenciais.
Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml :
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Para credenciais como identidades gerenciadas, configure as seguintes propriedades no arquivo application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Observação
Os valores permitidos sãotenant-id
: common
, , , organizations
consumers
ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e de organização) do Erro AADSTS50020 - A conta de usuário do provedor de identidade não existe no locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.
Para credenciais como entidade de serviço, configure as seguintes propriedades no arquivo application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Observação
Os valores permitidos sãotenant-id
: common
, , , organizations
consumers
ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e de organização) do Erro AADSTS50020 - A conta de usuário do provedor de identidade não existe no locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.
Crie
DefaultMessageHandler
com o bean para enviar mensagens ao Service Bus, defina o tipo de entidade para oServiceBusTemplate
ServiceBusTemplate. Este exemplo usa a Fila do Barramento de Serviço como exemplo.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Crie uma associação de gateway de mensagens com o manipulador de mensagens acima por meio de um canal de mensagens.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Envie mensagens usando o gateway.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Receber mensagens do Barramento de Serviço do Azure
Preencha as opções de configuração de credenciais.
Crie um bean de canal de mensagem como o canal de entrada.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Crie
ServiceBusInboundChannelAdapter
com oServiceBusMessageListenerContainer
bean para receber mensagens no Service Bus. Este exemplo usa a Fila do Barramento de Serviço como exemplo.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Crie uma ligação de receptor de mensagem com através do canal de
ServiceBusInboundChannelAdapter
mensagem que criamos antes.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Configurar ServiceBusMessageConverter para personalizar objectMapper
ServiceBusMessageConverter
é feito como um bean configurável para permitir que os usuários personalizem ObjectMapper
o .
Cabeçalhos de mensagem do Barramento de Serviço
Para alguns cabeçalhos do Barramento de Serviço que podem ser mapeados para várias constantes de cabeçalho Spring, a prioridade de diferentes cabeçalhos Spring é listada.
Mapeamento entre cabeçalhos do Service Bus e cabeçalhos de mola:
Cabeçalhos e propriedades de mensagens do Barramento de Serviço | Constantes de cabeçalho de mensagem de mola | Tipo | Configurável | Descrição |
---|---|---|---|---|
Tipo de conteúdo | MessageHeaders#CONTENT_TYPE |
String | Sim | O RFC2045 descritor Content-Type da mensagem. |
ID de Correlação | ServiceBusMessageHeaders#CORRELATION_ID |
String | Sim | A ID de correlação da mensagem |
ID da mensagem | ServiceBusMessageHeaders#MESSAGE_ID |
String | Sim | O ID da mensagem da mensagem, este cabeçalho tem prioridade mais alta do que MessageHeaders#ID . |
ID da mensagem | MessageHeaders#ID |
UUID | Sim | O ID da mensagem da mensagem, este cabeçalho tem prioridade mais baixa do que ServiceBusMessageHeaders#MESSAGE_ID . |
Chave de partição | ServiceBusMessageHeaders#PARTITION_KEY |
String | Sim | A chave de partição para enviar a mensagem para uma entidade particionada. |
Responder a | MessageHeaders#REPLY_CHANNEL |
String | Sim | O endereço de uma entidade para a qual enviar respostas. |
Responder ao ID da sessão | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
String | Sim | O valor da propriedade ReplyToGroupId da mensagem. |
Tempo de enfileiramento agendado utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Sim | A data/hora em que a mensagem deve ser enfileirada no Service Bus, esse cabeçalho tem prioridade mais alta que AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Tempo de enfileiramento agendado utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Inteiro | Sim | A data/hora em que a mensagem deve ser enfileirada no Service Bus, esse cabeçalho tem prioridade menor que ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
ID da Sessão | ServiceBusMessageHeaders#SESSION_ID |
String | Sim | O IDentifier de sessão para uma entidade com reconhecimento de sessão. |
Vida útil | ServiceBusMessageHeaders#TIME_TO_LIVE |
Duration | Sim | A duração do tempo antes desta mensagem expirar. |
Para | ServiceBusMessageHeaders#TO |
String | Sim | O endereço "para" da mensagem, reservado para uso futuro em cenários de roteamento e atualmente ignorado pelo próprio broker. |
Assunto | ServiceBusMessageHeaders#SUBJECT |
String | Sim | O assunto da mensagem. |
Descrição do erro de letra morta | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
String | Não | A descrição de uma mensagem que foi escrita em letra morta. |
Razão letra morta | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
String | Não | O motivo de uma mensagem ter sido escrita em letra morta. |
Fonte de letra morta | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
String | Não | A entidade na qual a mensagem foi escrita em letra morta. |
Contagem de entrega | ServiceBusMessageHeaders#DELIVERY_COUNT |
longo | Não | O número de vezes que essa mensagem foi entregue aos clientes. |
Número de sequência enfileirado | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
longo | Não | O número de sequência enfileirado atribuído a uma mensagem pelo Service Bus. |
Tempo enfileirado | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Não | A data/hora em que essa mensagem foi enfileirada no Service Bus. |
Expira em | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Não | A data/hora em que essa mensagem expirará. |
Token de bloqueio | ServiceBusMessageHeaders#LOCK_TOKEN |
String | Não | O token de bloqueio para a mensagem atual. |
Bloqueado até | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Não | A data/hora em que o bloqueio desta mensagem expira. |
Número de sequência | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
longo | Não | O número exclusivo atribuído a uma mensagem pelo Service Bus. |
State | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Não | O estado da mensagem, que pode ser Ativo, Adiado ou Agendado. |
Suporte a chave de partição
Esse iniciador oferece suporte ao particionamento do Barramento de Serviço, permitindo a configuração da chave de partição e da ID da sessão no cabeçalho da mensagem. Esta seção apresenta como definir a chave de partição para mensagens.
Recomendado: Use ServiceBusMessageHeaders.PARTITION_KEY
como a chave do cabeçalho.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Não recomendado, mas atualmente suportado:AzureHeaders.PARTITION_KEY
como a chave do cabeçalho.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Observação
Quando ambos ServiceBusMessageHeaders.PARTITION_KEY
e AzureHeaders.PARTITION_KEY
são definidos nos cabeçalhos da mensagem, ServiceBusMessageHeaders.PARTITION_KEY
é preferível.
Suporte de sessão
Este exemplo demonstra como definir manualmente a ID da sessão de uma mensagem no aplicativo.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Observação
Quando o é definido nos cabeçalhos da mensagem e um cabeçalho diferente ServiceBusMessageHeaders.PARTITION_KEY
também é definido, o valor da ID da sessão será eventualmente usado para substituir o ServiceBusMessageHeaders.SESSION_ID
valor da chave de partição.
Exemplos
Para obter mais informações, consulte o repositório azure-spring-boot-samples no GitHub.
Integração do Spring com filas de Armazenamento do Azure
Conceitos principais
O armazenamento de Filas do Azure é um serviço usado para armazenar grandes quantidades de mensagens. Você acessa as mensagens em qualquer lugar do mundo por meio de chamadas autenticadas usando HTTP ou HTTPS. Uma mensagem da fila pode ter até 64 KB. Uma fila pode conter milhões de mensagens, até o limite da capacidade total de uma conta de armazenamento. As filas são normalmente usadas para criar uma lista de pendências de trabalho para processamento assíncrono.
Configuração de dependência
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Configuração
Este iniciador fornece as seguintes opções de configuração:
Propriedades de configuração de conexão
Esta seção contém as opções de configuração usadas para se conectar à Fila de Armazenamento do Azure.
Observação
Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.
Propriedades configuráveis de conexão de spring-cloud-azure-starter-integration-storage-queue:
Propriedade | Type | Descrição |
---|---|---|
primavera.cloud.azure.storage.queue.enabled | boolean | Se uma Fila de Armazenamento do Azure está habilitada. |
spring.cloud.azure.storage.queue.connection-string | String | Valor da cadeia de conexão do namespace da fila de armazenamento. |
spring.cloud.azure.storage.queue.accountName | String | Nome da conta da Fila de Armazenamento. |
spring.cloud.azure.storage.queue.accountKey | String | Chave de conta da Fila de Armazenamento. |
spring.cloud.azure.storage.queue.endpoint | String | Ponto de extremidade do serviço de fila de armazenamento. |
spring.cloud.azure.storage.queue.sasToken | String | Credencial de token Sas |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion que é usado ao fazer solicitações de API. |
spring.cloud.azure.storage.queue.messageEncoding | String | Codificação de mensagem de fila. |
Uso básico
Enviar mensagens para a Fila de Armazenamento do Azure
Preencha as opções de configuração de credenciais.
Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml :
spring: cloud: azure: storage: queue: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Para credenciais como identidades gerenciadas, configure as seguintes propriedades no arquivo application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Observação
Os valores permitidos sãotenant-id
: common
, , , organizations
consumers
ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e de organização) do Erro AADSTS50020 - A conta de usuário do provedor de identidade não existe no locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.
Para credenciais como entidade de serviço, configure as seguintes propriedades no arquivo application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Observação
Os valores permitidos sãotenant-id
: common
, , , organizations
consumers
ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e de organização) do Erro AADSTS50020 - A conta de usuário do provedor de identidade não existe no locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.
Crie
DefaultMessageHandler
com o bean para enviar mensagens para aStorageQueueTemplate
Fila de Armazenamento.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Crie uma associação de gateway de mensagens com o manipulador de mensagens acima por meio de um canal de mensagens.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Envie mensagens usando o gateway.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Receber mensagens da Fila de Armazenamento do Azure
Preencha as opções de configuração de credenciais.
Crie um bean de canal de mensagem como o canal de entrada.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Crie
StorageQueueMessageSource
com oStorageQueueTemplate
bean para receber mensagens na Fila de Armazenamento.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Crie uma associação de receptor de mensagem com StorageQueueMessageSource criada na última etapa por meio do canal de mensagem que criamos antes.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Exemplos
Para obter mais informações, consulte o repositório azure-spring-boot-samples no GitHub.