练习 - 使用队列发送和接收消息

已完成

你已选择了使用服务总线队列在销售人员所用的移动应用与托管在 Azure 中、将用于在 Azure SQL 数据库实例中存储有关每笔销售详细信息的 Web 服务之间处理个人销售的消息。

在上一个练习中,你在 Azure 订阅中实现了所需的对象。 现在,你想要编写代码,用于将消息发送到该队列并检索消息。

在此单元中,你将生成两个控制台应用程序,一个将消息放入服务总线队列,另一个从服务总线队列中检索消息。 这两个应用程序属于单个 .NET Core 解决方案。

获取连接到服务总线命名空间的连接字符串

需要在两个控制台应用中配置两条信息,以便访问服务总线命名空间并使用该命名空间中的队列:

  • 命名空间的终结点
  • 用于身份验证的共享访问密钥

可以从连接字符串中获取这些值。

  1. 在 Cloud Shell 窗口屏幕的右上角,选择“更多”图标 (...),然后选择“设置”>“转到经典版本”

  2. 运行以下命令,将 <namespace-name> 替换为在上个练习中创建的服务总线命名空间。

    az servicebus namespace authorization-rule keys list \
        --resource-group "<rgn>[sandbox resource group name]</rgn>" \
        --name RootManageSharedAccessKey \
        --query primaryConnectionString \
        --output tsv \
        --namespace-name <namespace-name>
    

    响应中的最后一行是连接字符串,其中包含命名空间的终结点和共享访问密钥。 它应与下面的示例类似:

    Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxx
    
  3. 从 Cloud Shell 复制连接字符串。 在整个模块中,你会多次用到此连接字符串,因此建议将其保持到某个方便的位置。

克隆并打开初学者应用程序

注意

为了简单起见,以下任务将指导你在两个控制台应用程序的 Program.cs 文件中对连接字符串进行硬编码。 在生产应用程序中,应使用配置文件或 Azure Key Vault 来存储连接字符串。

  1. 在 Cloud Shell 中运行以下命令来克隆 Git 项目解决方案:

    cd ~
    git clone https://github.com/MicrosoftDocs/mslearn-connect-services-together.git
    
  2. 运行以下命令,转到已克隆项目中的 start 文件夹,并打开 Cloud Shell 编辑器:

    cd ~/mslearn-connect-services-together/implement-message-workflows-with-service-bus/src/start
    code .
    

编写代码以将消息发送到队列

  1. 在 Cloud Shell 编辑器中,打开“privatemessagesender/Program.cs”,并找到以下代码行:

    const string ServiceBusConnectionString = "";
    

    将连接字符串粘贴在引号之间。

  2. 如果为队列名称使用了不同于 salesmessages 的名称,请更新代码中 QueueName 属性的值:

    const string QueueName = "salesmessages";
    
  3. 要完成发送销售相关消息的组件,需要添加 await 运算符以暂停对异步方法的评估,直到异步操作完成。 查找 SendSalesMessageAsync() 方法。 在该方法中找到以下代码行:

    // Create a Service Bus client here
    

    请使用以下代码替换该代码行:

    // By leveraging "await using", the DisposeAsync method will be called automatically once the client variable goes out of scope. 
    // In more realistic scenarios, you would want to store off a class reference to the client (rather than a local variable) so that it can be used throughout your program.
    
    await using var client = new ServiceBusClient(ServiceBusConnectionString);
    
  4. SendSalesMessageAsync() 方法中,找到以下代码行:

    // Create a sender here
    

    使用下面的代码替换该注释:

    await using ServiceBusSender sender = client.CreateSender(QueueName);
    
  5. try...catch 块中,找到以下代码行:

    // Create and send a message here
    

    使用以下代码行替换该代码行:

    string messageBody = $"$10,000 order for bicycle parts from retailer Adventure Works.";
    var message = new ServiceBusMessage(messageBody);
    
  6. 直接在刚刚添加的新行下方插入以下代码,以在控制台中显示消息:

    Console.WriteLine($"Sending message: {messageBody}");
    
  7. 将以下代码插入下一行:

    await sender.SendMessageAsync(message);
    
  8. 若要释放发送方和客户端对象,请在文件末尾附近找到一下注释:

    // Close the connection to the sender here
    

    请使用以下代码替换这一行:

    finally
    {
        // Calling DisposeAsync on client types is required to ensure that network
        // resources and other unmanaged objects are properly cleaned up.
        await sender.DisposeAsync();
        await client.DisposeAsync();
    }
    
  9. 检查“privatemessagesender/Program.cs”的最终代码是否类似于以下示例:

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.ServiceBus;
    
    namespace privatemessagesender
    {
        class Program
        {
            const string ServiceBusConnectionString = "Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx";
            const string QueueName = "salesmessages";
    
            static void Main(string[] args)
            {
                Console.WriteLine("Sending a message to the Sales Messages queue...");
                SendSalesMessageAsync().GetAwaiter().GetResult();
                Console.WriteLine("Message was sent successfully.");
            }
    
            static async Task SendSalesMessageAsync()
            {
                await using var client = new ServiceBusClient(ServiceBusConnectionString);
    
                await using ServiceBusSender sender = client.CreateSender(QueueName);
                try
                {
                    string messageBody = $"$10,000 order for bicycle parts from retailer Adventure Works.";
                    var message = new ServiceBusMessage(messageBody);
                    Console.WriteLine($"Sending message: {messageBody}");
                    await sender.SendMessageAsync(message);
                }
                catch (Exception exception)
                {
                    Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
                }
                finally
                {
                    // Calling DisposeAsync on client types is required to ensure that network
                    // resources and other unmanaged objects are properly cleaned up.
                    await sender.DisposeAsync();
                    await client.DisposeAsync();
                }
            }
        }
    }
    
  10. 若要保存更改,请选择 Ctrl+S,然后选择 Ctrl+Q 关闭编辑器。

向队列发送消息

  1. 在 Cloud Shell 中,运行以下命令来发送有关销售的消息。 第一行可确保位于正确路径。

    cd ~/mslearn-connect-services-together/implement-message-workflows-with-service-bus/src/start
    dotnet run --project ./privatemessagesender
    

    注意

    首次运行本练习中的应用时,需要允许 dotnet 还原来自远程源的包并生成应用。

    程序运行时,将有消息输出到控制台,指示应用正在发送消息:

    Sending a message to the Sales Messages queue...
    Sending message: $10,000 order for bicycle parts from retailer Adventure Works.
    Message was sent successfully.
    
  2. 应用完成后,运行以下命令,将 <namespace-name> 替换为服务总线命名空间的名称。 此命令返回队列中的消息数。

    az servicebus queue show \
        --resource-group "<rgn>[sandbox resource group name]</rgn>" \
        --name salesmessages \
        --query messageCount \
        --namespace-name <namespace-name>
    
  3. 再次从步骤 1 运行 dotnet run 命令,然后再次运行 servicebus queue show 命令。 每次运行 dotnet 应用时,都会向队列中添加一条新消息。 每次运行 Azure 命令时,都会看到 messageCount 增加。

编写从队列接收消息的代码

  1. 运行以下命令以再次打开编辑器:

    code .
    
  2. 在编辑器中,打开“privatemessagereceiver/Program.cs”,然后找到以下代码行:

    const string ServiceBusConnectionString = "";
    

    将之前保存的连接字符串粘贴到引号之间。

  3. 查找 ReceiveSalesMessageAsync() 方法。 在该方法中找到以下代码行:

    // Create a Service Bus client that will authenticate using a connection string
    

    请使用以下代码替换这一行:

    var client = new ServiceBusClient(ServiceBusConnectionString);
    
  4. 若要配置消息处理选项,请找到以下代码行:

    // Create the options to use for configuring the processor
    

    将该代码行替换为以下代码行:

    var processorOptions = new ServiceBusProcessorOptions
    {
        MaxConcurrentCalls = 1,
        AutoCompleteMessages = false
    };
    
  5. 若要创建处理器,请找到以下代码行:

    // Create a processor that we can use to process the messages
    

    请使用以下代码替换这一行:

    await using ServiceBusProcessor processor = client.CreateProcessor(QueueName, processorOptions);
    
  6. 若要配置处理程序,请找到以下代码行:

    // Configure the message and error handler to use
    

    请使用以下代码替换这一行:

    processor.ProcessMessageAsync += MessageHandler;
    processor.ProcessErrorAsync += ErrorHandler;
    
  7. 若要开始处理,请找到以下代码行:

    // Start processing
    

    请使用以下代码替换这一行:

    await processor.StartProcessingAsync();
    
  8. 若要关闭与服务总线的连接,请找到以下代码行:

    // Close the processor here
    

    请使用以下代码替换这一行:

    await processor.CloseAsync();
    
  9. 查看 MessageHandler 方法中的代码:

    // handle received messages
    static async Task MessageHandler(ProcessMessageEventArgs args)
    {
        // extract the message
        string body = args.Message.Body.ToString();
    
        // print the message
        Console.WriteLine($"Received: {body}");
    
        // complete the message so that message is deleted from the queue. 
        await args.CompleteMessageAsync(args.Message);
    }
    
  10. 查看 ErrorHandler 方法中的代码:

    // handle any errors when receiving messages
    static Task ErrorHandler(ProcessErrorEventArgs args)
    {
        // print the exception message
        Console.WriteLine(args.Exception.ToString());
        return Task.CompletedTask;
    }    
    
  11. 检查“privatemessagereceiver/Program.cs”的最终代码是否类似于以下示例:

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.ServiceBus;
    
    namespace privatemessagereceiver
    {
        class Program
        {
    
            const string ServiceBusConnectionString = "Endpoint=sb://<examplenamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
            const string QueueName = "salesmessages";
    
            static void Main(string[] args)
            {
    
                ReceiveSalesMessageAsync().GetAwaiter().GetResult();
    
            }
    
            static async Task ReceiveSalesMessageAsync()
            {
    
                Console.WriteLine("======================================================");
                Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
                Console.WriteLine("======================================================");
    
    
                var client = new ServiceBusClient(ServiceBusConnectionString);
    
                var processorOptions = new ServiceBusProcessorOptions
                {
                    MaxConcurrentCalls = 1,
                    AutoCompleteMessages = false
                };
    
                await using ServiceBusProcessor processor = client.CreateProcessor(QueueName, processorOptions);
    
                processor.ProcessMessageAsync += MessageHandler;
                processor.ProcessErrorAsync += ErrorHandler;
    
    
                await processor.StartProcessingAsync();
    
                Console.Read();
    
                await processor.CloseAsync();
    
            }
    
            // handle received messages
            static async Task MessageHandler(ProcessMessageEventArgs args)
            {
                string body = args.Message.Body.ToString();
                Console.WriteLine($"Received: {body}");
    
                // complete the message. messages is deleted from the queue. 
                await args.CompleteMessageAsync(args.Message);
            }
    
            // handle any errors when receiving messages
            static Task ErrorHandler(ProcessErrorEventArgs args)
            {
                Console.WriteLine(args.Exception.ToString());
                return Task.CompletedTask;
            }
        }
    }
    
    
  12. 若要保存更改,请选择 Ctrl+S,然后选择 Ctrl+Q 关闭编辑器。

从队列接收消息

  1. 若要运行用于接收有关销售的消息的组件,请在 Cloud Shell 中运行以下命令:

    dotnet run --project privatemessagereceiver
    
  2. 查看 Cloud Shell 中的通知。 在 Azure 门户中,转到你的服务总线命名空间,然后检查“消息”图表:

    Received: $10,000 order for bicycle parts from retailer Adventure Works.
    
  3. 看到 Cloud Shell 中已收到消息时,按 Enter 停止应用。

检查消息计数

运行以下代码以确认已从队列中删除所有消息,记得将 <namespace-name> 替换为服务总线命名空间。

az servicebus queue show \
    --resource-group "<rgn>[sandbox resource group name]</rgn>" \
    --name salesmessages \
    --query messageCount \
    --namespace-name <namespace-name>

若已删除所有消息,则输出将为 0

现已编写了用于向服务总线队列发送单独销售消息的代码。 在销售人员分布式应用程序中,应在销售人员在设备上使用的移动应用中编写此代码。

此外,还编写了用于从服务总线队列接收消息的代码。 在销售人员分布式应用程序中,应在运行在 Azure 中、用于处理已接收消息的 Web 服务中编写此代码。