在会话中对排队消息进行分组
使用 Windows Communication Foundation (WCF) 提供的会话可以将一组相关消息分为一组,以便由单个接收应用程序进行处理。 属于一个会话的消息必须属于同一事务。 因为所有消息都属于同一事务,所以如果有一个消息未能得到处理,整个会话都将回滚。 会话对于死信队列和病毒队列具有类似的行为。 在为会话配置的排队绑定上设置的生存时间 (TTL) 属性被应用于整个会话。 如果在 TTL 过期前仅发送了会话中的一部分消息,则会将整个会话都放到死信队列中。 与此类似,如果会话中有消息未能发送到应用程序队列中的应用程序,则会将整个会话都放到病毒队列(如果可用)中。
消息分组示例
如果将订单处理应用程序作为 WCF 服务来实现,将消息分组会十分有用。 例如,某个客户端向此应用程序提交一个包含许多项的订单。 对于每个项,该客户端都要调用一次服务,每次调用都会产生一个要发送的消息。 有可能发生服务器 A 接收第一个项,而服务器 B 接收第二个项的情况。 每次添加一个项时,处理该项的服务器都必须找到相应的订单并将该项添加到订单中 — 这样做效率非常低。 如果只使用单个服务器来处理所有请求,仍然会非常低效,因为该服务器必须跟踪当前正在处理的所有订单并确定新项属于哪个订单。 将单个订单的所有请求分为一组可大大简化这样一个应用程序的实现。 客户端应用程序在一个会话中发送单个订单的所有项,因此当服务处理该订单时,它可以一次性处理整个会话。 \
过程
设置服务协定以使用会话
定义一个需要会话的服务协定。 为此,可使用 ServiceContractAttribute 属性,方法是指定以下内容:
SessionMode=SessionMode.Required
将该协定中的操作标记为单向操作,因为这些方法不返回任何结果。 可使用 OperationContractAttribute 属性来完成此操作,方法是指定以下内容:
[OperationContract(IsOneWay = true)]
实现该服务协定并指定一个类型为 InstanceContextMode 的 InstanceContextMode.PerSession。 这样,对于每个会话,仅实例化服务一次。
[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
每个服务操作都需要一个事务。 可使用 OperationBehaviorAttribute 属性予以指定。 负责完成该事务的操作还应将 TransactionAutoComplete 设置为
true
。[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
配置一个终结点,该终结点使用系统提供的
NetMsmqBinding
绑定。使用 System.Messaging 创建一个事务性队列。 还可以使用消息队列 (MSMQ) 或 MMC 创建该队列。 如果你这样做,请创建一个事务性队列。
使用 ServiceHost 为该服务创建一个服务主机。
打开服务主机使服务处于可用状态。
关闭服务主机。
设置客户端
创建一个要写入事务性队列的事务范围。
使用 ServiceModel Metadata Utility Tool (Svcutil.exe) 工具创建 WCF 客户端。
下订单。
关闭 WCF 客户端。
示例
说明
下面的示例提供 IProcessOrder
服务的代码和一个使用此服务的客户端的代码。 该示例演示 WCF 如何使用排队会话来提供分组行为。
服务代码
// Service Code:
using System;
using System.ServiceModel.Channels;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.Transactions;
using System.Text;
using System.Collections.Generic;
namespace Microsoft.ServiceModel.Samples
{
// Define a service contract.
[ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples", SessionMode=SessionMode.Required)]
public interface IOrderTaker
{
[OperationContract(IsOneWay = true)]
void OpenPurchaseOrder(string customerId);
[OperationContract(IsOneWay = true)]
void AddProductLineItem(string productId, int quantity);
[OperationContract(IsOneWay = true)]
void EndPurchaseOrder();
}
// Define the Purchase Order Line Item
public class PurchaseOrderLineItem
{
static Random r = new Random(137);
string ProductId;
float UnitCost;
int Quantity;
public PurchaseOrderLineItem(string productId, int quantity)
{
this.ProductId = productId;
this.Quantity = quantity;
this.UnitCost = r.Next(10000);
}
public override string ToString()
{
String displayString = "Order LineItem: " + Quantity + " of " + ProductId + " @unit price: $" + UnitCost + "\n";
return displayString;
}
public float TotalCost
{
get { return UnitCost * Quantity; }
}
}
// Define Purchase Order
public class PurchaseOrder
{
string PONumber;
string CustomerId;
LinkedList<PurchaseOrderLineItem> orderLineItems = new LinkedList<PurchaseOrderLineItem>();
public PurchaseOrder(string customerId)
{
this.CustomerId = customerId;
this.PONumber = Guid.NewGuid().ToString();
}
public void AddProductLineItem(string productId, int quantity)
{
orderLineItems.AddLast(new PurchaseOrderLineItem(productId, quantity));
}
public float TotalCost
{
get
{
float totalCost = 0;
foreach (PurchaseOrderLineItem lineItem in orderLineItems)
totalCost += lineItem.TotalCost;
return totalCost;
}
}
public string Status
{
get
{
return "Pending";
}
}
public override string ToString()
{
StringBuilder strbuf = new StringBuilder("Purchase Order: " + PONumber + "\n");
strbuf.Append("\tCustomer: " + CustomerId + "\n");
strbuf.Append("\tOrderDetails\n");
foreach (PurchaseOrderLineItem lineItem in orderLineItems)
{
strbuf.Append("\t\t" + lineItem.ToString());
}
strbuf.Append("\tTotal cost of this order: $" + TotalCost + "\n");
strbuf.Append("\tOrder status: " + Status + "\n");
return strbuf.ToString();
}
}
// Service class which implements the service contract.
// Added code to write output to the console window
[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
public class OrderTakerService : IOrderTaker
{
PurchaseOrder po;
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = false)]
public void OpenPurchaseOrder(string customerId)
{
Console.WriteLine("Creating purchase order");
po = new PurchaseOrder(customerId);
}
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = false)]
public void AddProductLineItem(string productId, int quantity)
{
po.AddProductLineItem(productId, quantity);
Console.WriteLine("Product " + productId + " quantity " + quantity + " added to purchase order");
}
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void EndPurchaseOrder()
{
Console.WriteLine("Purchase Order Completed");
Console.WriteLine();
Console.WriteLine(po.ToString());
}
// Host the service within this EXE console application.
public static void Main()
{
// Get MSMQ queue name from app settings in configuration
string queueName = ConfigurationManager.AppSettings["queueName"];
// Create the transacted MSMQ queue if necessary.
if (!MessageQueue.Exists(queueName))
MessageQueue.Create(queueName, true);
// Get the base address that is used to listen for WS-MetaDataExchange requests
string baseAddress = ConfigurationManager.AppSettings["baseAddress"];
// Create a ServiceHost for the OrderTakerService type.
using (ServiceHost serviceHost = new ServiceHost(typeof(OrderTakerService), new Uri(baseAddress)))
{
// Open the ServiceHostBase to create listeners and start listening for messages.
serviceHost.Open();
// The service can now be accessed.
Console.WriteLine("The service is ready.");
Console.WriteLine("Press <ENTER> to terminate service.");
Console.WriteLine();
Console.ReadLine();
// Close the ServiceHostBase to shutdown the service.
serviceHost.Close();
}
}
}
}
' Service Code:
Imports System.ServiceModel.Channels
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.Transactions
Imports System.Text
Imports System.Collections.Generic
Namespace Microsoft.ServiceModel.Samples
' Define a service contract.
<ServiceContract(Namespace:="http://Microsoft.ServiceModel.Samples", SessionMode:=SessionMode.Required)> _
Public Interface IOrderTaker
<OperationContract(IsOneWay:=True)> _
Sub OpenPurchaseOrder(ByVal customerId As String)
<OperationContract(IsOneWay:=True)> _
Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer)
<OperationContract(IsOneWay:=True)> _
Sub EndPurchaseOrder()
End Interface
' Define the Purchase Order Line Item
Public Class PurchaseOrderLineItem
Private Shared r As New Random(137)
Private ProductId As String
Private UnitCost As Single
Private Quantity As Integer
Public Sub New(ByVal productId As String, ByVal quantity As Integer)
Me.ProductId = productId
Me.Quantity = quantity
Me.UnitCost = r.Next(10000)
End Sub
Public Overrides Function ToString() As String
Dim displayString As String = "Order LineItem: " & Quantity & " of " & ProductId & " @unit price: $" & UnitCost + Constants.vbLf
Return displayString
End Function
Public ReadOnly Property TotalCost() As Single
Get
Return UnitCost * Quantity
End Get
End Property
End Class
' Define Purchase Order
Public Class PurchaseOrder
Private PONumber As String
Private CustomerId As String
Private orderLineItems As New LinkedList(Of PurchaseOrderLineItem)()
Public Sub New(ByVal customerId As String)
Me.CustomerId = customerId
Me.PONumber = Guid.NewGuid().ToString()
End Sub
Public Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer)
orderLineItems.AddLast(New PurchaseOrderLineItem(productId, quantity))
End Sub
Public ReadOnly Property TotalCost() As Single
Get
Dim totalCost_Renamed As Single = 0
For Each lineItem In orderLineItems
totalCost_Renamed += lineItem.TotalCost
Next lineItem
Return totalCost_Renamed
End Get
End Property
Public ReadOnly Property Status() As String
Get
Return "Pending"
End Get
End Property
Public Overrides Function ToString() As String
Dim strbuf As New StringBuilder("Purchase Order: " & PONumber & Constants.vbLf)
strbuf.Append(Constants.vbTab & "Customer: " & CustomerId & Constants.vbLf)
strbuf.Append(Constants.vbTab & "OrderDetails" & Constants.vbLf)
For Each lineItem In orderLineItems
strbuf.Append(Constants.vbTab + Constants.vbTab + lineItem.ToString())
Next lineItem
strbuf.Append(Constants.vbTab & "Total cost of this order: $" & TotalCost + Constants.vbLf)
strbuf.Append(Constants.vbTab & "Order status: " & Status + Constants.vbLf)
Return strbuf.ToString()
End Function
End Class
' Service class which implements the service contract.
' Added code to write output to the console window
<ServiceBehavior(InstanceContextMode:=InstanceContextMode.PerSession)> _
Public Class OrderTakerService
Implements IOrderTaker
Private po As PurchaseOrder
<OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=False)> _
Public Sub OpenPurchaseOrder(ByVal customerId As String) Implements IOrderTaker.OpenPurchaseOrder
Console.WriteLine("Creating purchase order")
po = New PurchaseOrder(customerId)
End Sub
<OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=False)> _
Public Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer) Implements IOrderTaker.AddProductLineItem
po.AddProductLineItem(productId, quantity)
Console.WriteLine("Product " & productId & " quantity " & quantity & " added to purchase order")
End Sub
<OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
Public Sub EndPurchaseOrder() Implements IOrderTaker.EndPurchaseOrder
Console.WriteLine("Purchase Order Completed")
Console.WriteLine()
Console.WriteLine(po.ToString())
End Sub
' Host the service within this EXE console application.
Public Shared Sub Main()
' Get MSMQ queue name from app settings in configuration
Dim queueName As String = ConfigurationManager.AppSettings("queueName")
' Create the transacted MSMQ queue if necessary.
If (Not MessageQueue.Exists(queueName)) Then
MessageQueue.Create(queueName, True)
End If
' Get the base address that is used to listen for WS-MetaDataExchange requests
Dim baseAddress As String = ConfigurationManager.AppSettings("baseAddress")
' Create a ServiceHost for the OrderTakerService type.
Using serviceHost As New ServiceHost(GetType(OrderTakerService), New Uri(baseAddress))
' Open the ServiceHostBase to create listeners and start listening for messages.
serviceHost.Open()
' The service can now be accessed.
Console.WriteLine("The service is ready.")
Console.WriteLine("Press <ENTER> to terminate service.")
Console.WriteLine()
Console.ReadLine()
' Close the ServiceHostBase to shutdown the service.
serviceHost.Close()
End Using
End Sub
End Class
End Namespace
客户端代码
using System;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.Transactions;
namespace Microsoft.ServiceModel.Samples
{
//The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.
//Client implementation code.
class Client
{
static void Main()
{
//Create a transaction scope.
using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
{
// Create a proxy with given client endpoint configuration
OrderTakerClient client = new OrderTakerClient("OrderTakerEndpoint");
try
{
// Open a purchase order
client.OpenPurchaseOrder("somecustomer.com");
Console.WriteLine("Purchase Order created");
// Add product line items
Console.WriteLine("Adding 10 quantities of blue widget");
client.AddProductLineItem("Blue Widget", 10);
Console.WriteLine("Adding 23 quantities of red widget");
client.AddProductLineItem("Red Widget", 23);
// Close the purchase order
Console.WriteLine("Closing the purchase order");
client.EndPurchaseOrder();
client.Close();
}
catch (CommunicationException ex)
{
client.Abort();
}
// Complete the transaction.
scope.Complete();
}
Console.WriteLine();
Console.WriteLine("Press <ENTER> to terminate client.");
Console.ReadLine();
}
}
}
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.Transactions
Namespace Microsoft.ServiceModel.Samples
'The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.
'Client implementation code.
Friend Class Client
Shared Sub Main()
'Create a transaction scope.
Using scope As New TransactionScope(TransactionScopeOption.Required)
' Create a proxy with given client endpoint configuration
Dim client As New OrderTakerClient("OrderTakerEndpoint")
Try
' Open a purchase order
client.OpenPurchaseOrder("somecustomer.com")
Console.WriteLine("Purchase Order created")
' Add product line items
Console.WriteLine("Adding 10 quantities of blue widget")
client.AddProductLineItem("Blue Widget", 10)
Console.WriteLine("Adding 23 quantities of red widget")
client.AddProductLineItem("Red Widget", 23)
' Close the purchase order
Console.WriteLine("Closing the purchase order")
client.EndPurchaseOrder()
client.Close()
Catch ex As CommunicationException
client.Abort()
End Try
' Complete the transaction.
scope.Complete()
End Using
Console.WriteLine()
Console.WriteLine("Press <ENTER> to terminate client.")
Console.ReadLine()
End Sub
End Class
End Namespace