練習 - 使用佇列來傳送及接收訊息
您已選擇使用服務匯流排佇列,在您銷售人員所使用的行動應用程式和會將每筆銷售的相關詳細資料儲存在 Azure SQL Database 執行個體中的 Web 服務 (裝載在 Azure 中) 之間,處理個別銷售的相關訊息。
在上一個練習中,您已在 Azure 訂用帳戶中實作必要的物件。 現在,您想要撰寫將訊息傳送至該佇列並擷取訊息的程式碼。
在本單元中,您將建置兩個主控台應用程式:一個應用程式會訊息將放入服務匯流排佇列,一個應用程式則會從服務匯流排佇列擷取訊息。 應用程式是單一 .NET Core 解決方案的一部分。
取得服務匯流排命名空間的連接字串
您必須在兩個主控台應用程式中設定兩項資訊,才能存取您的服務匯流排命名空間,以及使用該命名空間內的佇列:
- 命名空間的端點
- 驗證的共用存取金鑰
您可以從連接字串中取得這些值。
在畫面右側的 Cloud Shell 視窗,選取 [更多] 圖示 ([...]),然後選取 [設定] > [前往傳統版本]。
執行下列命令,並將
<namespace-name>
取代為您在上一個練習中建立的服務匯流排命名空間。az servicebus namespace authorization-rule keys list \ --resource-group "<rgn>[sandbox resource group name]</rgn>" \ --name RootManageSharedAccessKey \ --query primaryConnectionString \ --output tsv \ --namespace-name <namespace-name>
回應中的最後一行就是連接字串,其中包含命名空間和共用存取金鑰的端點。 其應該類似下列範例:
Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxx
複製 Cloud Shell 的連接字串。 在本模組中您將需要多次用到此連接字串,因此建議您將其儲存至某個方便的位置。
複製並開啟起始應用程式
注意
為了簡單起見,下列工作會指示您將連接字串硬式編碼到兩個主控台應用程式的 Program.cs 檔案中。 在生產應用程式中,您應使用設定檔或 Azure Key Vault 來儲存連接字串。
在 Cloud Shell 中執行下列命令,以複製 Git 專案解決方案:
cd ~ git clone https://github.com/MicrosoftDocs/mslearn-connect-services-together.git
執行下列命令,以移至複製專案中的 start 資料夾,然後開啟 Cloud Shell 編輯器:
cd ~/mslearn-connect-services-together/implement-message-workflows-with-service-bus/src/start code .
撰寫程式碼以將訊息傳送至佇列
在 Cloud Shell 編輯器中,開啟 privatemessagesender/Program.cs 並尋找下列程式碼行:
const string ServiceBusConnectionString = "";
複製引號間的連接字串。
如果您使用與 salesmessages 不同的名稱作為佇列名稱,請更新程式碼中的
QueueName
屬性值:const string QueueName = "salesmessages";
若要完成傳送銷售相關訊息的元件,您必須新增
await
運算子以暫停評估非同步方法,直到非同步作業完成為止。 尋找SendSalesMessageAsync()
方法。 在該方法中,找出下列程式碼行:// Create a Service Bus client here
使用下列程式碼取代該程式碼:
// By leveraging "await using", the DisposeAsync method will be called automatically once the client variable goes out of scope. // In more realistic scenarios, you would want to store off a class reference to the client (rather than a local variable) so that it can be used throughout your program. await using var client = new ServiceBusClient(ServiceBusConnectionString);
在
SendSalesMessageAsync()
方法內尋找下列程式碼:// Create a sender here
以下列程式碼取代該註解:
await using ServiceBusSender sender = client.CreateSender(QueueName);
在
try...catch
區塊內,尋找下列程式碼:// Create and send a message here
使用下列程式碼取代該程式碼:
string messageBody = $"$10,000 order for bicycle parts from retailer Adventure Works."; var message = new ServiceBusMessage(messageBody);
將下列程式碼直接插入您剛才新增至主控台中以顯示訊息下方的新行:
Console.WriteLine($"Sending message: {messageBody}");
在下一行插入下列程式碼:
await sender.SendMessageAsync(message);
若要處置傳送者和用戶端物件,請在檔案結尾附近尋找下列註解:
// Close the connection to the sender here
使用下列程式碼取代該行:
finally { // Calling DisposeAsync on client types is required to ensure that network // resources and other unmanaged objects are properly cleaned up. await sender.DisposeAsync(); await client.DisposeAsync(); }
檢查您 privatemessagesender/Program.cs 的最終程式碼是否類似下列範例:
using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; namespace privatemessagesender { class Program { const string ServiceBusConnectionString = "Endpoint=sb://example.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx"; const string QueueName = "salesmessages"; static void Main(string[] args) { Console.WriteLine("Sending a message to the Sales Messages queue..."); SendSalesMessageAsync().GetAwaiter().GetResult(); Console.WriteLine("Message was sent successfully."); } static async Task SendSalesMessageAsync() { await using var client = new ServiceBusClient(ServiceBusConnectionString); await using ServiceBusSender sender = client.CreateSender(QueueName); try { string messageBody = $"$10,000 order for bicycle parts from retailer Adventure Works."; var message = new ServiceBusMessage(messageBody); Console.WriteLine($"Sending message: {messageBody}"); await sender.SendMessageAsync(message); } catch (Exception exception) { Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}"); } finally { // Calling DisposeAsync on client types is required to ensure that network // resources and other unmanaged objects are properly cleaned up. await sender.DisposeAsync(); await client.DisposeAsync(); } } } }
若要儲存變更,請選取 Ctrl+S,然後選取 Ctrl+Q 以關閉編輯器。
將訊息傳送至佇列
在 Cloud Shell 中,執行下列命令來傳送有關銷售的訊息。 第一行可確保您位於正確的路徑。
cd ~/mslearn-connect-services-together/implement-message-workflows-with-service-bus/src/start dotnet run --project ./privatemessagesender
注意
您第一次在本練習中執行應用程式時,請允許
dotnet
從遠端來源還原套件並建置應用程式。當程式執行時,會將訊息列印至主控台,指出應用程式正在傳送訊息:
Sending a message to the Sales Messages queue... Sending message: $10,000 order for bicycle parts from retailer Adventure Works. Message was sent successfully.
當應用程式完成時,請執行下列命令,以您服務匯流排命名空間的名稱取代 <namespace-name>。 此命令會傳回佇列中的訊息數目。
az servicebus queue show \ --resource-group "<rgn>[sandbox resource group name]</rgn>" \ --name salesmessages \ --query messageCount \ --namespace-name <namespace-name>
再次執行步驟 1 中的
dotnet run
命令,然後再次執行servicebus queue show
命令。 每次執行 dotnet 應用程式時,都會有一則新訊息新增至佇列。 每次執行 Azure 命令時,您都會看到messageCount
增加。
撰寫程式碼以從佇列接收訊息
再次執行下列命令以開啟編輯器:
code .
在編輯器中,開啟 privatemessagesender/Program.cs 並尋找下列程式碼:
const string ServiceBusConnectionString = "";
在引號之間貼上您稍早儲存的連接字串。
尋找
ReceiveSalesMessageAsync()
方法。 在該方法中,找出下列程式碼行:// Create a Service Bus client that will authenticate using a connection string
使用下列程式碼取代該行:
var client = new ServiceBusClient(ServiceBusConnectionString);
若要設定訊息處理選項,請尋找下列程式碼:
// Create the options to use for configuring the processor
使用下列程式碼取代該程式碼:
var processorOptions = new ServiceBusProcessorOptions { MaxConcurrentCalls = 1, AutoCompleteMessages = false };
若要建立處理器,請尋找下列程式碼:
// Create a processor that we can use to process the messages
使用下列程式碼取代該行:
await using ServiceBusProcessor processor = client.CreateProcessor(QueueName, processorOptions);
若要設定處理常式,請尋找下列程式碼:
// Configure the message and error handler to use
使用下列程式碼取代該行:
processor.ProcessMessageAsync += MessageHandler; processor.ProcessErrorAsync += ErrorHandler;
若要開始處理,請尋找下列程式碼:
// Start processing
使用下列程式碼取代該行:
await processor.StartProcessingAsync();
若要關閉與服務匯流排的連線,請尋找下列程式碼:
// Close the processor here
使用下列程式碼取代該行:
await processor.CloseAsync();
檢閱
MessageHandler
方法中的程式碼:// handle received messages static async Task MessageHandler(ProcessMessageEventArgs args) { // extract the message string body = args.Message.Body.ToString(); // print the message Console.WriteLine($"Received: {body}"); // complete the message so that message is deleted from the queue. await args.CompleteMessageAsync(args.Message); }
檢閱
ErrorHandler
方法中的程式碼:// handle any errors when receiving messages static Task ErrorHandler(ProcessErrorEventArgs args) { // print the exception message Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask; }
檢查您 privatemessagereceiver/Program.cs 的最終程式碼是否類似下列範例:
using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; namespace privatemessagereceiver { class Program { const string ServiceBusConnectionString = "Endpoint=sb://<examplenamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"; const string QueueName = "salesmessages"; static void Main(string[] args) { ReceiveSalesMessageAsync().GetAwaiter().GetResult(); } static async Task ReceiveSalesMessageAsync() { Console.WriteLine("======================================================"); Console.WriteLine("Press ENTER key to exit after receiving all the messages."); Console.WriteLine("======================================================"); var client = new ServiceBusClient(ServiceBusConnectionString); var processorOptions = new ServiceBusProcessorOptions { MaxConcurrentCalls = 1, AutoCompleteMessages = false }; await using ServiceBusProcessor processor = client.CreateProcessor(QueueName, processorOptions); processor.ProcessMessageAsync += MessageHandler; processor.ProcessErrorAsync += ErrorHandler; await processor.StartProcessingAsync(); Console.Read(); await processor.CloseAsync(); } // handle received messages static async Task MessageHandler(ProcessMessageEventArgs args) { string body = args.Message.Body.ToString(); Console.WriteLine($"Received: {body}"); // complete the message. messages is deleted from the queue. await args.CompleteMessageAsync(args.Message); } // handle any errors when receiving messages static Task ErrorHandler(ProcessErrorEventArgs args) { Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask; } } }
若要儲存變更,請選取 Ctrl+S,然後選取 Ctrl+Q 以關閉編輯器。
從佇列接收訊息
若要執行可接收銷售相關訊息的元件,請在 Cloud Shell 中執行此命令:
dotnet run --project privatemessagereceiver
檢查 Cloud Shell 中的通知。 在 Azure 入口網站中,移至您的服務匯流排命名空間,然後檢查您的訊息圖表:
Received: $10,000 order for bicycle parts from retailer Adventure Works.
看到 Cloud Shell 已接收訊息後,請按 Enter 停止應用程式。
檢查訊息計數
執行下列程式碼以確認已將所有訊息從佇列中移除,並記得以您的服務匯流排命名空間來取代 <namespace-name>。
az servicebus queue show \
--resource-group "<rgn>[sandbox resource group name]</rgn>" \
--name salesmessages \
--query messageCount \
--namespace-name <namespace-name>
如果所有訊息都已移除,則輸出為 0
。
您已撰寫程式碼,將個人的銷售訊息傳送給服務匯流排佇列。 在銷售人員分散式應用程式中,您應在銷售人員用於裝置上的行動應用程式中撰寫此程式碼。
您也已撰寫從服務匯流排佇列接收訊息的程式碼。 在銷售人員分散式應用程式中,您應在執行於 Azure 中且對接收到的訊息進行處理的 Web 服務中撰寫此程式碼。