你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Azure 服务总线适用于 .NET 的客户端库 - 版本 7.16.2
Azure 服务总线允许你构建利用异步消息传送模式的应用程序,这些模式使用高度可靠的服务在生成者和使用者之间中转消息。 Azure 服务总线在客户端和服务器之间提供灵活的中转消息传送,以及结构化的先入先出 (FIFO) 消息传递,以及具有复杂路由的发布/订阅功能。 如果想要了解有关Azure 服务总线的详细信息,请查看:什么是Azure 服务总线?
使用用于Azure 服务总线的客户端库可以:
传输业务数据:利用消息传送持久交换信息,例如销售或采购订单、日记帐或库存移动。
分离应用程序:提高应用程序和服务的可靠性和可伸缩性,缓解发送方和接收方同时联机的需求。
控制消息的处理方式:支持使用队列的消息的传统竞争使用者,或者允许每个使用者使用主题和订阅自己的消息实例。
实现复杂的工作流:消息会话支持需要消息排序或消息延迟的方案。
源代码 | 包 (NuGet) | API 参考文档 | 产品文档 | 迁移指南 (Microsoft.Azure.ServiceBus) | WindowsAzure.ServiceBus) ( | 迁移指南故障排除指南
入门
先决条件
Microsoft Azure 订阅:若要使用 Azure 服务(包括Azure 服务总线),需要订阅。 如果没有现有的 Azure 帐户,可以在 创建帐户时注册免费试用版或使用 MSDN 订阅者权益。
服务总线命名空间:若要与Azure 服务总线交互,还需要有一个可用的命名空间。 如果不熟悉如何创建 Azure 资源,建议遵循使用 Azure 门户创建服务总线命名空间的分步指南。 还可以在此处找到有关使用 Azure CLI、Azure PowerShell或 Azure 资源管理器 (ARM) 模板创建服务总线实体的详细说明。
C# 8.0:Azure 服务总线 客户端库使用 C# 8.0 中引入的新功能。 为了利用 C# 8.0 语法,建议使用语言版本的
latest
.NET Core SDK 3.0 或更高版本进行编译。希望充分利用 C# 8.0 语法的 Visual Studio 用户需要使用 Visual Studio 2019 或更高版本。 可在此处下载 Visual Studio 2019(包括免费的 Community Edition)。 Visual Studio 2017 的用户可以通过使用 Microsoft.Net.Compilers NuGet 包 和设置语言版本来利用 C# 8 语法,尽管编辑体验可能并不理想。
你仍然可以将库与以前的 C# 语言版本一起使用,但需要手动管理异步可枚举和异步可释放成员,而不是受益于新语法。 仍可能面向 .NET Core SDK 支持的任何框架版本,包括早期版本的 .NET Core 或 .NET Framework。 有关详细信息,请参阅: 如何指定目标框架。
重要说明: 为了在不修改的情况下生成或运行 示例 和 示例 ,必须使用 C# 8.0。 如果决定针对其他语言版本调整示例,仍可以运行这些示例。
若要在 Azure 中快速创建所需的服务总线资源并接收其连接字符串,可以通过单击以下操作部署示例模板:
安装包
使用 NuGet 安装适用于 .NET 的 Azure 服务总线 客户端库:
dotnet add package Azure.Messaging.ServiceBus
验证客户端
若要使服务总线客户端库与队列或主题交互,需要了解如何与其连接和授权。 执行此操作的最简单方法是使用连接字符串,该连接字符串是在创建服务总线命名空间时自动创建的。 如果不熟悉 Azure 中的共享访问策略,建议按照分步指南获取服务总线连接字符串。
拥有连接字符串后,可以使用它对客户端进行身份验证。
// Create a ServiceBusClient that will authenticate using a connection string
string connectionString = "<connection_string>";
await using var client = new ServiceBusClient(connectionString);
若要查看如何使用 Azure.Identity 进行身份验证,请查看此示例。
有关如何对 ASP.NET Core应用程序进行身份验证的示例,请查看此示例。
若要查看如何启动与自定义终结点的连接,请查看 此示例。
关键概念
初始化 ServiceBusClient
后,可以与服务总线命名空间中的主资源类型进行交互,其中可以存在多个资源类型,并且实际消息传输发生在哪个上,命名空间通常充当应用程序容器:
队列:允许发送和接收消息。 通常用于点到点通信。
主题:与队列不同,主题更适合发布/订阅方案。 主题可以发送到 ,但需要订阅,其中可以有多个并行使用。
订阅:要从主题使用的机制。 每个订阅都是独立的,并接收发送到主题的每条消息的副本。 规则和筛选器可用于定制特定订阅接收的消息。
有关这些资源的详细信息,请参阅什么是Azure 服务总线?。
若要与这些资源交互,应熟悉以下 SDK 概念:
服务总线客户端是开发人员与服务总线客户端库交互的主要接口。 它充当与库进行所有交互的网关。
服务总线发送方的范围限定为特定队列或主题,并使用服务总线客户端创建。 发送方允许将消息发送到队列或主题。 它还允许将消息安排在指定日期进行传递。
服务总线接收器的范围限定为特定的队列或订阅,并使用服务总线客户端创建。 接收方允许你从队列或订阅接收消息。 它还允许在收到消息后解决它们。 有四种方法可以解决消息:
- 完成 - 导致消息从队列或主题中删除。
- 放弃 - 释放接收方对消息的锁定,允许其他接收方接收消息。
- 延迟 - 通过正常方式延迟接收消息。 若要接收延迟的消息,需要保留消息的序列号。
- DeadLetter - 将消息移动到死信队列。 这将阻止再次接收消息。 若要从死信队列接收消息,需要一个作用域为死信队列的接收方。
服务总线会话接收器的范围限定为启用会话的特定队列或订阅,并使用服务总线客户端创建。 会话接收器几乎与标准接收器相同,不同之处在于,公开的会话管理操作仅适用于启用会话的实体。 这些操作包括获取和设置会话状态,以及续订会话锁。
服务总线处理器的范围限定为特定的队列或订阅,并使用服务总线客户端创建。
ServiceBusProcessor
可以将 视为围绕一组接收器的抽象。 它使用回调模型允许在收到消息和发生异常时指定代码。 它提供已处理消息的自动完成、自动消息锁续订和用户指定的事件处理程序的并发执行。 由于其功能集,它应该是用于编写从服务总线实体接收的应用程序的工具。 对于处理器无法提供直接使用 ServiceBusReceiver 时所期望的精细控制的复杂情况,建议使用 ServiceBusReceiver。服务总线会话处理器的范围限定为启用会话的特定队列或订阅,并使用服务总线客户端创建。 会话处理器几乎与标准处理器相同,不同之处在于,公开的会话管理操作仅适用于启用会话的实体。
有关更多概念和更深入的讨论,请参阅: 服务总线高级功能。
客户端生存期
ServiceBusClient
、发送方、接收方和处理器可以安全地缓存,并在应用程序的生存期内用作单一实例,这是定期发送或接收消息时的最佳做法。 它们负责高效管理网络、CPU 和内存使用,努力在处于非活动状态期间保持低使用率。
这些类型是可释放的,需要调用 DisposeAsync
或 CloseAsync
来确保正确清理网络资源和其他非托管对象。 请务必注意,当释放实例时 ServiceBusClient
,它将自动关闭并清理使用它创建的任何发送方、接收方和处理器。
线程安全
我们保证所有客户端实例方法都是线程安全的,并且彼此独立 (准则) 。 这可确保重用客户端实例的建议始终是安全的,即使在线程之间也是如此。
其他概念
示例
发送和接收消息
消息发送是使用 ServiceBusSender
执行的。 接收是使用 ServiceBusReceiver
执行的。
string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);
// create the sender
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send. UTF-8 encoding is used when providing a string.
ServiceBusMessage message = new ServiceBusMessage("Hello world!");
// send the message
await sender.SendMessageAsync(message);
// create a receiver that we can use to receive the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);
// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();
// get the message body as a string
string body = receivedMessage.Body.ToString();
Console.WriteLine(body);
发送一批消息
可通过两种方法一次发送多个消息。 执行此操作的第一种方法使用安全批处理。 使用安全批处理,可以创建一个 ServiceBusMessageBatch
对象,这样就可以尝试使用 TryAdd
方法一次向批处理添加一条消息。 如果消息无法容纳在批处理中, TryAdd
将返回 false。
// add the messages that we plan to send to a local queue
Queue<ServiceBusMessage> messages = new Queue<ServiceBusMessage>();
messages.Enqueue(new ServiceBusMessage("First message"));
messages.Enqueue(new ServiceBusMessage("Second message"));
messages.Enqueue(new ServiceBusMessage("Third message"));
// create a message batch that we can send
// total number of messages to be sent to the Service Bus queue
int messageCount = messages.Count;
// while all messages are not sent to the Service Bus queue
while (messages.Count > 0)
{
// start a new batch
using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
// add the first message to the batch
if (messageBatch.TryAddMessage(messages.Peek()))
{
// dequeue the message from the .NET queue once the message is added to the batch
messages.Dequeue();
}
else
{
// if the first message can't fit, then it is too large for the batch
throw new Exception($"Message {messageCount - messages.Count} is too large and cannot be sent.");
}
// add as many messages as possible to the current batch
while (messages.Count > 0 && messageBatch.TryAddMessage(messages.Peek()))
{
// dequeue the message from the .NET queue as it has been added to the batch
messages.Dequeue();
}
// now, send the batch
await sender.SendMessagesAsync(messageBatch);
// if there are any remaining messages in the .NET queue, the while loop repeats
}
第二种方法使用 SendMessagesAsync
接受 IEnumerable 的 ServiceBusMessage
重载。 使用此方法,我们将尝试将所提供的所有消息放入要发送到服务的单个消息批中。 如果消息太大,无法容纳在单个批处理中,则操作将引发异常。
IList<ServiceBusMessage> messages = new List<ServiceBusMessage>();
messages.Add(new ServiceBusMessage("First"));
messages.Add(new ServiceBusMessage("Second"));
// send the messages
await sender.SendMessagesAsync(messages);
接收一批消息
// create a receiver that we can use to receive the messages
ServiceBusReceiver receiver = client.CreateReceiver(queueName);
// the received message is a different type as it contains some service set properties
// a batch of messages (maximum of 2 in this case) are received
IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await receiver.ReceiveMessagesAsync(maxMessages: 2);
// go through each of the messages received
foreach (ServiceBusReceivedMessage receivedMessage in receivedMessages)
{
// get the message body as a string
string body = receivedMessage.Body.ToString();
}
完成消息
为了从队列或订阅中删除消息,我们可以调用 CompleteMessageAsync
方法。
string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);
// create the sender
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage("Hello world!");
// send the message
await sender.SendMessageAsync(message);
// create a receiver that we can use to receive and settle the message
ServiceBusReceiver receiver = client.CreateReceiver(queueName);
// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();
// complete the message, thereby deleting it from the service
await receiver.CompleteMessageAsync(receivedMessage);
放弃邮件
放弃消息会释放接收方的锁,从而允许此接收方或其他接收方接收消息。
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();
// abandon the message, thereby releasing the lock and allowing it to be received again by this or other receivers
await receiver.AbandonMessageAsync(receivedMessage);
延迟消息
延迟消息将阻止使用 ReceiveMessageAsync
或 ReceiveMessagesAsync
方法再次接收消息。 相反,有单独的方法, ReceiveDeferredMessageAsync
用于 ReceiveDeferredMessagesAsync
接收延迟的消息。
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();
// defer the message, thereby preventing the message from being received again without using
// the received deferred message API.
await receiver.DeferMessageAsync(receivedMessage);
// receive the deferred message by specifying the service set sequence number of the original
// received message
ServiceBusReceivedMessage deferredMessage = await receiver.ReceiveDeferredMessageAsync(receivedMessage.SequenceNumber);
死信消息
消息的死信类似于延迟,有一个main区别是消息在收到一定次数后,服务会自动死信。 应用程序可以根据要求选择手动死信消息。 当消息为死信时,它实际上会移动到原始队列的子队列中。 请注意,ServiceBusReceiver
无论main队列是否已启用会话,都使用 从死信子队列接收消息。
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();
// Dead-letter the message, thereby preventing the message from being received again without receiving from the dead letter queue.
// We can optionally pass a dead letter reason and dead letter description to further describe the reason for dead-lettering the message.
await receiver.DeadLetterMessageAsync(receivedMessage, "sample reason", "sample description");
// receive the dead lettered message with receiver scoped to the dead letter queue.
ServiceBusReceiver dlqReceiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter
});
ServiceBusReceivedMessage dlqMessage = await dlqReceiver.ReceiveMessageAsync();
// The reason and the description that we specified when dead-lettering the message will be available in the received dead letter message.
string reason = dlqMessage.DeadLetterReason;
string description = dlqMessage.DeadLetterErrorDescription;
有关详细信息,请参阅 ServiceBus 死信队列概述。
使用处理器
ServiceBusProcessor
可以被视为围绕一组接收器的抽象。 它使用回调模型允许在收到消息和发生异常时指定代码。 它提供已处理消息的自动完成、自动消息锁续订和用户指定的事件处理程序的并发执行。 由于其功能集,它应该是用于编写从服务总线实体接收的应用程序的转到工具。 对于处理器无法在直接使用 ServiceBusReceiver 时预期的精细控制的复杂情况,建议使用 ServiceBusReceiver。
string connectionString = "<connection_string>";
string queueName = "<queue_name>";
// since ServiceBusClient implements IAsyncDisposable we create it with "await using"
await using var client = new ServiceBusClient(connectionString);
// create the sender
ServiceBusSender sender = client.CreateSender(queueName);
// create a set of messages that we can send
ServiceBusMessage[] messages = new ServiceBusMessage[]
{
new ServiceBusMessage("First"),
new ServiceBusMessage("Second")
};
// send the message batch
await sender.SendMessagesAsync(messages);
// create the options to use for configuring the processor
var options = new ServiceBusProcessorOptions
{
// By default or when AutoCompleteMessages is set to true, the processor will complete the message after executing the message handler
// Set AutoCompleteMessages to false to [settle messages](/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock) on your own.
// In both cases, if the message handler throws an exception without settling the message, the processor will abandon the message.
AutoCompleteMessages = false,
// I can also allow for multi-threading
MaxConcurrentCalls = 2
};
// create a processor that we can use to process the messages
await using ServiceBusProcessor processor = client.CreateProcessor(queueName, options);
// configure the message and error handler to use
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine(body);
// we can evaluate application logic and use that to determine how to settle the message.
await args.CompleteMessageAsync(args.Message);
}
Task ErrorHandler(ProcessErrorEventArgs args)
{
// the error source tells me at what point in the processing an error occurred
Console.WriteLine(args.ErrorSource);
// the fully qualified namespace is available
Console.WriteLine(args.FullyQualifiedNamespace);
// as well as the entity path
Console.WriteLine(args.EntityPath);
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
// start processing
await processor.StartProcessingAsync();
// since the processing happens in the background, we add a Console.ReadKey to allow the processing to continue until a key is pressed.
Console.ReadKey();
使用 Azure.Identity 进行身份验证
Azure 标识库为身份验证提供简单的 Azure Active Directory 支持。
// Create a ServiceBusClient that will authenticate through Active Directory
string fullyQualifiedNamespace = "yournamespace.servicebus.windows.net";
await using var client = new ServiceBusClient(fullyQualifiedNamespace, new DefaultAzureCredential());
使用会话
会话 提供了一种用于对相关消息进行分组的机制。 若要使用会话,需要使用已启用会话的实体。
注册 ASP.NET Core依赖项注入
若要在 ASP.NET Core 应用中作为依赖项进行注入ServiceBusClient
,请安装适用于 ASP.NET Core 包的 Azure 客户端库集成。
dotnet add package Microsoft.Extensions.Azure
然后注册配置了服务的客户端。 对于 ASP.NET Core应用程序,这通常直接在 或 StartupConfigureServices
方法中Program.cs
出现:
public void ConfigureServices(IServiceCollection services)
{
services.AddAzureClients(builder =>
{
builder.AddServiceBusClient("<< SERVICE BUS CONNECTION STRING >>");
});
// Register other services, controllers, and other infrastructure.
}
对于喜欢为其客户端使用共享 Azure.Identity
凭据的应用程序,注册看起来略有不同:
public void ConfigureServices(IServiceCollection services)
{
services.AddAzureClients(builder =>
{
// This will register the ServiceBusClient using an Azure Identity credential.
builder.AddServiceBusClientWithNamespace("<< YOUR NAMESPACE >>.servicebus.windows.net");
// By default, DefaultAzureCredential is used, which is likely desired for most
// scenarios. If you need to restrict to a specific credential instance, you could
// register that instance as the default credential instead.
builder.UseCredential(new ManagedIdentityCredential());
});
// Register other services, controllers, and other infrastructure.
}
还可以使用已注册ServiceBusClient
的实例向 DI 注册子客户端,例如 ServiceBusSender
和 ServiceBusReceiver
。 例如,为属于命名空间的每个队列注册发送方:
public async Task ConfigureServicesAsync(IServiceCollection services)
{
// Query the available queues for the Service Bus namespace.
var adminClient = new ServiceBusAdministrationClient("<< SERVICE BUS CONNECTION STRING >>");
var queueNames = new List<string>();
// Because the result is async, they need to be captured to a standard list to avoid async
// calls when registering. Failure to do so results in an error with the services collection.
await foreach (var queue in adminClient.GetQueuesAsync())
{
queueNames.Add(queue.Name);
}
// After registering the ServiceBusClient, register a named factory for each
// queue. This allows them to be lazily created and managed as singleton instances.
services.AddAzureClients(builder =>
{
builder.AddServiceBusClient("<< SERVICE BUS CONNECTION STRING >>");
foreach (var queueName in queueNames)
{
builder.AddClient<ServiceBusSender, ServiceBusClientOptions>((_, _, provider) =>
provider
.GetService<ServiceBusClient>()
.CreateSender(queueName)
)
.WithName(queueName);
}
});
// Register other services, controllers, and other infrastructure.
}
由于发送方为其关联的队列命名,因此注入时,不会直接绑定到它们。 相反,你将绑定到可用于检索命名发件人的工厂:
public class ServiceBusSendingController : ControllerBase
{
private readonly ServiceBusSender _sender;
public ServiceBusSendingController(IAzureClientFactory<ServiceBusSender> serviceBusSenderFactory)
{
// Though the method is called "CreateClient", the factory will manage the sender as a
// singleton, creating a new instance only on the first use.
_sender = serviceBusSenderFactory.CreateClient("<< QUEUE NAME >>");
}
}
有关更多详细信息和示例,请参阅 使用适用于 .NET 的 Azure SDK 进行依赖项注入。
疑难解答
请参阅 服务总线故障排除指南。
后续步骤
除了讨论的介绍性方案之外,Azure 服务总线客户端库还支持其他方案,以帮助利用Azure 服务总线服务的完整功能集。 为了帮助探索其中一些方案,服务总线客户端库提供了一个示例项目,作为常见方案的插图。 有关详细信息,请参阅 示例自述文件 。
贡献
本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 https://cla.microsoft.com 。
提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。
此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。
有关详细信息,请参阅我们的 贡献指南 。