练习 - 使用队列发送和接收消息
你已选择了使用服务总线队列在销售人员所用的移动应用与托管在 Azure 中、将用于在 Azure SQL 数据库实例中存储有关每笔销售详细信息的 Web 服务之间处理个人销售的消息。
在上一个练习中,你在 Azure 订阅中实现了所需的对象。 现在,你想要编写代码,用于将消息发送到该队列并检索消息。
在此单元中,你将生成两个控制台应用程序,一个将消息放入服务总线队列,另一个从服务总线队列中检索消息。 这两个应用程序属于单个 .NET Core 解决方案。
获取连接到服务总线命名空间的连接字符串
需要在两个控制台应用中配置两条信息,以便访问服务总线命名空间并使用该命名空间中的队列:
- 命名空间的终结点
- 用于身份验证的共享访问密钥
可以从连接字符串中获取这些值。
在 Cloud Shell 窗口屏幕的右上角,选择“更多”图标 (...),然后选择“设置”>“转到经典版本”。
运行以下命令,将
<namespace-name>
替换为在上个练习中创建的服务总线命名空间。az servicebus namespace authorization-rule keys list \ --resource-group "<rgn>[sandbox resource group name]</rgn>" \ --name RootManageSharedAccessKey \ --query primaryConnectionString \ --output tsv \ --namespace-name <namespace-name>
响应中的最后一行是连接字符串,其中包含命名空间的终结点和共享访问密钥。 它应与下面的示例类似:
Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxx
从 Cloud Shell 复制连接字符串。 在整个模块中,你会多次用到此连接字符串,因此建议将其保持到某个方便的位置。
克隆并打开初学者应用程序
注意
为了简单起见,以下任务将指导你在两个控制台应用程序的 Program.cs 文件中对连接字符串进行硬编码。 在生产应用程序中,应使用配置文件或 Azure Key Vault 来存储连接字符串。
在 Cloud Shell 中运行以下命令来克隆 Git 项目解决方案:
cd ~ git clone https://github.com/MicrosoftDocs/mslearn-connect-services-together.git
运行以下命令,转到已克隆项目中的 start 文件夹,并打开 Cloud Shell 编辑器:
cd ~/mslearn-connect-services-together/implement-message-workflows-with-service-bus/src/start code .
编写代码以将消息发送到队列
在 Cloud Shell 编辑器中,打开“privatemessagesender/Program.cs”,并找到以下代码行:
const string ServiceBusConnectionString = "";
将连接字符串粘贴在引号之间。
如果为队列名称使用了不同于 salesmessages 的名称,请更新代码中
QueueName
属性的值:const string QueueName = "salesmessages";
要完成发送销售相关消息的组件,需要添加
await
运算符以暂停对异步方法的评估,直到异步操作完成。 查找SendSalesMessageAsync()
方法。 在该方法中找到以下代码行:// Create a Service Bus client here
请使用以下代码替换该代码行:
// By leveraging "await using", the DisposeAsync method will be called automatically once the client variable goes out of scope. // In more realistic scenarios, you would want to store off a class reference to the client (rather than a local variable) so that it can be used throughout your program. await using var client = new ServiceBusClient(ServiceBusConnectionString);
在
SendSalesMessageAsync()
方法中,找到以下代码行:// Create a sender here
使用下面的代码替换该注释:
await using ServiceBusSender sender = client.CreateSender(QueueName);
在
try...catch
块中,找到以下代码行:// Create and send a message here
使用以下代码行替换该代码行:
string messageBody = $"$10,000 order for bicycle parts from retailer Adventure Works."; var message = new ServiceBusMessage(messageBody);
直接在刚刚添加的新行下方插入以下代码,以在控制台中显示消息:
Console.WriteLine($"Sending message: {messageBody}");
将以下代码插入下一行:
await sender.SendMessageAsync(message);
若要释放发送方和客户端对象,请在文件末尾附近找到一下注释:
// Close the connection to the sender here
请使用以下代码替换这一行:
finally { // Calling DisposeAsync on client types is required to ensure that network // resources and other unmanaged objects are properly cleaned up. await sender.DisposeAsync(); await client.DisposeAsync(); }
检查“privatemessagesender/Program.cs”的最终代码是否类似于以下示例:
using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; namespace privatemessagesender { class Program { const string ServiceBusConnectionString = "Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx"; const string QueueName = "salesmessages"; static void Main(string[] args) { Console.WriteLine("Sending a message to the Sales Messages queue..."); SendSalesMessageAsync().GetAwaiter().GetResult(); Console.WriteLine("Message was sent successfully."); } static async Task SendSalesMessageAsync() { await using var client = new ServiceBusClient(ServiceBusConnectionString); await using ServiceBusSender sender = client.CreateSender(QueueName); try { string messageBody = $"$10,000 order for bicycle parts from retailer Adventure Works."; var message = new ServiceBusMessage(messageBody); Console.WriteLine($"Sending message: {messageBody}"); await sender.SendMessageAsync(message); } catch (Exception exception) { Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}"); } finally { // Calling DisposeAsync on client types is required to ensure that network // resources and other unmanaged objects are properly cleaned up. await sender.DisposeAsync(); await client.DisposeAsync(); } } } }
若要保存更改,请选择 Ctrl+S,然后选择 Ctrl+Q 关闭编辑器。
向队列发送消息
在 Cloud Shell 中,运行以下命令来发送有关销售的消息。 第一行可确保位于正确路径。
cd ~/mslearn-connect-services-together/implement-message-workflows-with-service-bus/src/start dotnet run --project ./privatemessagesender
注意
首次运行本练习中的应用时,需要允许
dotnet
还原来自远程源的包并生成应用。程序运行时,将有消息输出到控制台,指示应用正在发送消息:
Sending a message to the Sales Messages queue... Sending message: $10,000 order for bicycle parts from retailer Adventure Works. Message was sent successfully.
应用完成后,运行以下命令,将 <namespace-name> 替换为服务总线命名空间的名称。 此命令返回队列中的消息数。
az servicebus queue show \ --resource-group "<rgn>[sandbox resource group name]</rgn>" \ --name salesmessages \ --query messageCount \ --namespace-name <namespace-name>
再次从步骤 1 运行
dotnet run
命令,然后再次运行servicebus queue show
命令。 每次运行 dotnet 应用时,都会向队列中添加一条新消息。 每次运行 Azure 命令时,都会看到messageCount
增加。
编写从队列接收消息的代码
运行以下命令以再次打开编辑器:
code .
在编辑器中,打开“privatemessagereceiver/Program.cs”,然后找到以下代码行:
const string ServiceBusConnectionString = "";
将之前保存的连接字符串粘贴到引号之间。
查找
ReceiveSalesMessageAsync()
方法。 在该方法中找到以下代码行:// Create a Service Bus client that will authenticate using a connection string
请使用以下代码替换这一行:
var client = new ServiceBusClient(ServiceBusConnectionString);
若要配置消息处理选项,请找到以下代码行:
// Create the options to use for configuring the processor
将该代码行替换为以下代码行:
var processorOptions = new ServiceBusProcessorOptions { MaxConcurrentCalls = 1, AutoCompleteMessages = false };
若要创建处理器,请找到以下代码行:
// Create a processor that we can use to process the messages
请使用以下代码替换这一行:
await using ServiceBusProcessor processor = client.CreateProcessor(QueueName, processorOptions);
若要配置处理程序,请找到以下代码行:
// Configure the message and error handler to use
请使用以下代码替换这一行:
processor.ProcessMessageAsync += MessageHandler; processor.ProcessErrorAsync += ErrorHandler;
若要开始处理,请找到以下代码行:
// Start processing
请使用以下代码替换这一行:
await processor.StartProcessingAsync();
若要关闭与服务总线的连接,请找到以下代码行:
// Close the processor here
请使用以下代码替换这一行:
await processor.CloseAsync();
查看
MessageHandler
方法中的代码:// handle received messages static async Task MessageHandler(ProcessMessageEventArgs args) { // extract the message string body = args.Message.Body.ToString(); // print the message Console.WriteLine($"Received: {body}"); // complete the message so that message is deleted from the queue. await args.CompleteMessageAsync(args.Message); }
查看
ErrorHandler
方法中的代码:// handle any errors when receiving messages static Task ErrorHandler(ProcessErrorEventArgs args) { // print the exception message Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask; }
检查“privatemessagereceiver/Program.cs”的最终代码是否类似于以下示例:
using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; namespace privatemessagereceiver { class Program { const string ServiceBusConnectionString = "Endpoint=sb://<examplenamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"; const string QueueName = "salesmessages"; static void Main(string[] args) { ReceiveSalesMessageAsync().GetAwaiter().GetResult(); } static async Task ReceiveSalesMessageAsync() { Console.WriteLine("======================================================"); Console.WriteLine("Press ENTER key to exit after receiving all the messages."); Console.WriteLine("======================================================"); var client = new ServiceBusClient(ServiceBusConnectionString); var processorOptions = new ServiceBusProcessorOptions { MaxConcurrentCalls = 1, AutoCompleteMessages = false }; await using ServiceBusProcessor processor = client.CreateProcessor(QueueName, processorOptions); processor.ProcessMessageAsync += MessageHandler; processor.ProcessErrorAsync += ErrorHandler; await processor.StartProcessingAsync(); Console.Read(); await processor.CloseAsync(); } // handle received messages static async Task MessageHandler(ProcessMessageEventArgs args) { string body = args.Message.Body.ToString(); Console.WriteLine($"Received: {body}"); // complete the message. messages is deleted from the queue. await args.CompleteMessageAsync(args.Message); } // handle any errors when receiving messages static Task ErrorHandler(ProcessErrorEventArgs args) { Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask; } } }
若要保存更改,请选择 Ctrl+S,然后选择 Ctrl+Q 关闭编辑器。
从队列接收消息
若要运行用于接收有关销售的消息的组件,请在 Cloud Shell 中运行以下命令:
dotnet run --project privatemessagereceiver
查看 Cloud Shell 中的通知。 在 Azure 门户中,转到你的服务总线命名空间,然后检查“消息”图表:
Received: $10,000 order for bicycle parts from retailer Adventure Works.
看到 Cloud Shell 中已收到消息时,按 Enter 停止应用。
检查消息计数
运行以下代码以确认已从队列中删除所有消息,记得将 <namespace-name> 替换为服务总线命名空间。
az servicebus queue show \
--resource-group "<rgn>[sandbox resource group name]</rgn>" \
--name salesmessages \
--query messageCount \
--namespace-name <namespace-name>
若已删除所有消息,则输出将为 0
。
现已编写了用于向服务总线队列发送单独销售消息的代码。 在销售人员分布式应用程序中,应在销售人员在设备上使用的移动应用中编写此代码。
此外,还编写了用于从服务总线队列接收消息的代码。 在销售人员分布式应用程序中,应在运行在 Azure 中、用于处理已接收消息的 Web 服务中编写此代码。