练习 - 使用主题发送和接收消息
你已决定使用 Azure 服务总线主题在销售人员应用程序中分发销售业绩消息。 销售人员将在其移动设备上使用应用发送根据每个区域和时间段汇总了销售数字的消息。 这些消息会分发到位于公司运营区域(包括美洲和欧洲)的 Web 服务。
你已在 Azure 订阅中为该主题实现了所需的基础结构。 现在,你要编写用于将消息发送到主题的代码以及从订阅检索消息的代码。 然后,向主题发送消息并检索特定订阅的消息。
在 Azure Cloud Shell 中运行以下命令,以确保在正确的目录中操作:
cd ~/mslearn-connect-services-together/implement-message-workflows-with-service-bus/src/start
code .
编写将消息发送到主题的代码
若要完成用于发送有关销售业绩的消息的组件,请完成以下步骤:
在 Azure Cloud Shell 编辑器中,打开“performancemessagesender/Program.cs”,并找到以下代码行:
const string ServiceBusConnectionString = "";
将你在上一个练习中保存的连接字符串粘贴到引号之间。
如果使用不同于 salesmessages 的名称作为队列名称,请更新代码中
TopicName
属性的值:const string TopicName = "salesperformancemessages";
查找
SendPerformanceMessageAsync()
方法。 (提示:它位于第 26 行或附近。)在该方法中,找到以下代码行:// Create a Service Bus client here
将该代码行替换为以下代码:
// By leveraging "await using", the DisposeAsync method will be called automatically when the client variable goes out of scope. // In more realistic scenarios, you would store off a class reference to the client (rather than to a local variable) so that it can be used throughout your program. await using var client = new ServiceBusClient(ServiceBusConnectionString);
在
SendPerformanceMessageAsync()
方法中,找到以下代码行:// Create a sender here
将该代码行替换为以下代码:
await using ServiceBusSender sender = client.CreateSender(TopicName);
在
try...catch
块中,找到以下代码行:// Create and send a message here
将该代码行替换为以下代码:
string messageBody = "Total sales for Brazil in August: $13m."; var message = new ServiceBusMessage(messageBody);
若要在控制台中显示消息,请在下一行中插入以下代码:
Console.WriteLine($"Sending message: {messageBody}");
若要将消息发送到主题,请在下一行中插入以下代码:
await sender.SendMessageAsync(message);
检查最终代码是否类似于以下示例:
using System; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; namespace performancemessagesender { class Program { const string ServiceBusConnectionString = "Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx"; const string TopicName = "salesperformancemessages"; static void Main(string[] args) { Console.WriteLine("Sending a message to the Sales Performance topic..."); SendPerformanceMessageAsync().GetAwaiter().GetResult(); Console.WriteLine("Message was sent successfully."); } static async Task SendPerformanceMessageAsync() { // By leveraging "await using", the DisposeAsync method will be called automatically once the client variable goes out of scope. // In more realistic scenarios, you would store off a class reference to the client (rather than to a local variable) so that it can be used throughout your program. await using var client = new ServiceBusClient(ServiceBusConnectionString); await using ServiceBusSender sender = client.CreateSender(TopicName); try { string messageBody = "Total sales for Brazil in August: $13m."; var message = new ServiceBusMessage(messageBody); Console.WriteLine($"Sending message: {messageBody}"); await sender.SendMessageAsync(message); } catch (Exception exception) { Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}"); } } } }
若要保存更改,请选择 Ctrl+S,然后选择 Ctrl+Q 关闭编辑器。
向主题发送消息
若要运行用于发送有关销售的消息的组件,请在 Cloud Shell 中运行以下命令:
dotnet run --project performancemessagesender
程序执行时,监视 Cloud Shell 中表明消息正在发送的通知。 每次运行应用时,主题中会添加另一条消息,并且每个订阅都有一个副本。
Sending a message to the Sales Performance topic... Sending message: Total sales for Brazil in August: $13m. Message was sent successfully.
在检索订阅的消息之前检查消息计数
看到 Message was sent successfully
后,请运行以下命令以查看 Americas
订阅中的消息数。 请记住将 <namespace-name> 替换为服务总线命名空间。
az servicebus topic subscription show \
--resource-group "<rgn>[sandbox resource group name]</rgn>" \
--topic-name salesperformancemessages \
--name Americas \
--query messageCount \
--namespace-name <namespace-name>
如果将 Americas
替换为 EuropeAndAsia
,然后再次运行该命令,你将看到两个订阅的消息数相同。
编写检索订阅的主题消息的代码
若要创建用于检索有关销售业绩的消息的组件,请完成以下步骤:
运行
code .
以启动编辑器。在编辑器中,打开“performancemessagereceiver/Program.cs”,然后找到以下代码行:
const string ServiceBusConnectionString = "";
将你在上一个练习中保存的连接字符串粘贴到引号之间。
若要创建服务总线客户端,请查找
MainAsync()
方法。 在该方法中找到以下代码行:// Create a Service Bus client that will authenticate using a connection string
将该行替换为此代码:
var client = new ServiceBusClient(ServiceBusConnectionString);
若要配置消息处理选项,请找到以下代码行:
// Create the options to use for configuring the processor
将该行替换为此代码:
var processorOptions = new ServiceBusProcessorOptions { MaxConcurrentCalls = 1, AutoCompleteMessages = false };
若要创建处理器,请找到以下代码行:
// Create a processor that we can use to process the messages
将该行替换为此代码:
ServiceBusProcessor processor = client.CreateProcessor(TopicName, SubscriptionName, processorOptions);
若要配置处理程序,请找到以下代码行:
// Configure the message and error handler to use
将该行替换为此代码:
processor.ProcessMessageAsync += MessageHandler; processor.ProcessErrorAsync += ErrorHandler;
若要开始处理,请找到以下代码行:
// Start processing
将该行替换为此代码:
await processor.StartProcessingAsync();
查找下面的代码行:
// Since we didn't use the "await using" syntax here, we need to explicitly dispose the processor and client
将该行替换为此代码:
await processor.DisposeAsync(); await client.DisposeAsync();
若要在控制台中显示传入消息,请查找
MessageHandler()
方法。 你已注册此方法来处理传入消息。将该方法内的所有代码替换为以下代码:
Console.WriteLine($"Received message: SequenceNumber:{args.Message.SequenceNumber} Body:{args.Message.Body}");
若要从订阅中删除收到的消息,请在下一行中添加以下代码:
await args.CompleteMessageAsync(args.Message);
检查最终代码是否类似于以下示例:
using System; using System.Text; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; namespace performancemessagereceiver { class Program { const string ServiceBusConnectionString = "Endpoint=sb://alexgeddyneil.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx"; const string TopicName = "salesperformancemessages"; const string SubscriptionName = "Americas"; static void Main(string[] args) { MainAsync().GetAwaiter().GetResult(); } static async Task MainAsync() { var client = new ServiceBusClient(ServiceBusConnectionString); Console.WriteLine("======================================================"); Console.WriteLine("Press ENTER key to exit after receiving all the messages."); Console.WriteLine("======================================================"); var processorOptions = new ServiceBusProcessorOptions { MaxConcurrentCalls = 1, AutoCompleteMessages = false }; ServiceBusProcessor processor = client.CreateProcessor(TopicName, SubscriptionName, processorOptions); processor.ProcessMessageAsync += MessageHandler; processor.ProcessErrorAsync += ErrorHandler; await processor.StartProcessingAsync(); Console.Read(); await processor.DisposeAsync(); await client.DisposeAsync(); } static async Task MessageHandler(ProcessMessageEventArgs args) { Console.WriteLine($"Received message: SequenceNumber:{args.Message.SequenceNumber} Body:{args.Message.Body}"); await args.CompleteMessageAsync(args.Message); } static Task ErrorHandler(ProcessErrorEventArgs args) { Console.WriteLine($"Message handler encountered an exception {args.Exception}."); Console.WriteLine("Exception context for troubleshooting:"); Console.WriteLine($"- Endpoint: {args.FullyQualifiedNamespace}"); Console.WriteLine($"- Entity Path: {args.EntityPath}"); Console.WriteLine($"- Executing Action: {args.ErrorSource}"); return Task.CompletedTask; } } }
若要保存更改,请选择 Ctrl+S,然后选择 Ctrl+Q 关闭编辑器。
检索订阅的主题消息
若要运行用于检索有关订阅销售业绩的消息的组件,请运行以下命令:
dotnet run --project performancemessagereceiver
将会看到类似于以下示例的输出:
Received message: SequenceNumber:1 Body:Total sales for Brazil in August: $13m.
当程序已返回正在接收消息的通知时,按 Enter 停止应用。
检索订阅的消息后检查消息计数
运行以下命令,以确认 Americas
订阅中没有剩余的消息。 请确保将 <namespace-name> 替换为服务总线命名空间。
az servicebus topic subscription show \
--resource-group "<rgn>[sandbox resource group name]</rgn>" \
--topic-name salesperformancemessages \
--name Americas \
--query messageCount \
--namespace-name <namespace-name>
如果在此代码中将 Americas
替换为 EuropeAndAsia
以查看 EuropeAndAsia
订阅的当前消息计数,你将看到消息计数为 1
。 在以上代码中,仅设置了 Americas
以检索主题消息,因此消息仍在等待 EuropeAndAsia
检索它。