配信不能キューを使用したメッセージ転送エラー処理
キューに置かれたメッセージは、配信に失敗する可能性があります。 配信に失敗したメッセージは、配信不能キューに記録されます。 配信の失敗は、ネットワーク エラー、キューが削除されている、キューがいっぱいになっている、認証エラー、配信が時間どおりに行われなかったなど、さまざまな理由で生じる可能性があります。
キューに置かれたメッセージは、受信側のアプリケーションでタイムリーに読み取られないと、長時間キューに残ることがあります。 時間依存のメッセージでは、このような動作が適切でない場合があります。 時間依存のメッセージでは、メッセージをキューに格納しておくことができる期間を示す TTL (Time to Live) プロパティが、キューに置かれたバインディングに設定されおり、この期間を超えると、メッセージの有効期限が切れます。 期限切れのメッセージは、配信不能キューと呼ばれる特別なキューに送信されます。 また、キューのクォータの超過、認証エラーなどの理由で、メッセージが配信不能キューに置かれる場合もあります。
一般に、アプリケーションには、配信不能キューのメッセージとエラーの理由を読み取るための補正ロジックが記述されています。 補正ロジックはエラーの原因に依存します。 たとえば、認証エラーの場合は、メッセージに添付された証明書を修正し、メッセージを再送信できます。 また、ターゲット キューのクォータに達したために配信が失敗した場合は、クォータの問題が解決されていることを期待して配信を再試行できます。
ほとんどのキュー システムには、そのシステムから配信できなかったすべてのメッセージを格納するための、システム全体の配信不能キューがあります。 メッセージ キュー (MSMQ) には、システム全体の配信不能キューが 2 種類用意されています。1 つはトランザクション キューへの配信に失敗したメッセージを格納するトランザクション システム全体の配信不能キューで、もう 1 つは非トランザクション キューへの配信に失敗したメッセージを格納する非トランザクション システム全体の配信不能キューです。 2 つのクライアントで 2 つの異なるサービスにメッセージを送信しているために、WCF の異なるキューで、送信に同じ MSMQ サービスを共有している場合は、システム配信不能キューでメッセージが混在する可能性があります。 これは、必ずしも最適であるとは言えません。 場合によっては (たとえば、セキュリティ上の理由で)、一方のクライアントが、もう一方のクライアントのメッセージを配信不能キューから読み取ることができないようにする必要があるからです。 また、共有された配信不能キューでは、クライアントがキューを参照して、送信したメッセージを検索する必要もありますが、配信不能キューに置かれているメッセージの数によっては、これは極めて大きな負荷になる可能性があります。 そのため、Windows Vista の WCFNetMsmqBinding
、MsmqIntegrationBinding,
、MSMQ などには、カスタム配信不能キュー (アプリケーション固有の配信不能キューと呼ばれることもあります) が用意されています。
カスタム配信不能キューでは、同じ MSMQ サービスを共有してメッセージを送信するクライアントをそれぞれ分離できます。
Windows Communication Foundation (WCF) の場合、Windows Server 2003 と Windows XP では、キューに置かれたすべてのクライアント アプリケーションに対して、システム全体で 1 つの配信不能キューが提供されます。 一方、Windows Vista では、WCF により、キューに置かれたクライアント アプリケーションごとに配信不能キューが提供されます。
配信不能キューの使用の指定
配信不能キューは、送信元アプリケーションのキュー マネージャーに存在します。 ここには、有効期限が切れたメッセージと転送または配信に失敗したメッセージが格納されます。
バインディングには、次の配信不能キュー プロパティがあります。
配信不能キューからのメッセージの読み取り
配信不能キューからメッセージを読み取るアプリケーションは、アプリケーション キューから読み取る WCF サービスとよく似ていますが、次のようなわずかな違いがあります。
トランザクション システムの配信不能キューからメッセージを読み取るには、URI (Uniform Resource Identifier) を net.msmq://localhost/system$;DeadXact という形式にする必要があります。
非トランザクション システムの配信不能キューからメッセージを読み取る場合、URI は、net.msmq://localhost/system$;DeadLetter という形式にする必要があります。
カスタム配信不能キューからメッセージを読み取るには、URI を net.msmq://localhost/private/<custom-dlq-name> という形式にする必要があります。ここで、custom-dlq-name は、カスタム配信不能キューの名前です。
キューのアドレス指定方法の詳細については、「サービス エンドポイントとキューのアドレス指定」を参照してください。
受信側の WCF スタックでは、サービスがリッスンしているアドレスとメッセージのアドレスが照合されます。 アドレスが一致する場合、メッセージはディスパッチされますが、一致しない場合はディスパッチされません。 これにより、配信不能キューから読み取るときに問題が生じる可能性があります。一般に、配信不能キュー内のメッセージは該当サービスにアドレス指定され、配信不能キュー サービスにアドレス指定されないからです。 そのため、配信不能キューから読み取るサービスは、ServiceBehavior
アドレス フィルターをインストールし、受信者とは無関係にキュー内のすべてのメッセージを一致させるようスタックに指示する必要があります。 具体的には、ServiceBehavior
パラメーターを持つ Any を、配信不能キューからメッセージを読み取るサービスに追加する必要があります。
配信不能キューの有害メッセージの処理
配信不能キューでは、条件付きで有害メッセージを処理できます。 システム キューからサブキューを作成できないため、システム配信不能キューから読み取るときは、ReceiveErrorHandling
を Move
に設定できません。 カスタム配信不能キューから読み取る場合は、サブキューを使用できるので、Move
が有害メッセージに対する有効な処置であることに注意してください。
ReceiveErrorHandling
を Reject
に設定しているときにカスタム配信不能キューから読み取った場合、有害メッセージはシステム配信不能キューに置かれます。 システム配信不能キューから読み取ると、有害メッセージは破棄 (削除) されます。 MSMQ のシステム配信不能キューから拒否すると、メッセージは破棄 (削除) されます。
例
次の例は、配信不能キューを作成する方法と、配信不能キューを使用して期限切れのメッセージを処理する方法を示しています。 この例は、「方法: WCF エンドポイントを使用してキューに置かれたメッセージを交換する」の例に基づいています。 この例では、アプリケーションごとの配信不能キューを使用する発注書処理サービスにクライアント コードを書き込む方法を示します。 また、配信不能キューのメッセージを処理する方法も示します。
以下は、アプリケーションごとの配信不能キューを指定するクライアントのコードです。
using System;
using System.ServiceModel.Channels;
using System.Configuration;
//using System.Messaging;
using System.ServiceModel;
using System.Transactions;
namespace Microsoft.ServiceModel.Samples
{
//The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.
//Client implementation code.
class Client
{
static void Main()
{
// Get MSMQ queue name from appsettings in configuration.
string deadLetterQueueName = ConfigurationManager.AppSettings["deadLetterQueueName"];
// Create the transacted MSMQ queue for storing dead message if necessary.
if (!System.Messaging.MessageQueue.Exists(deadLetterQueueName))
System.Messaging.MessageQueue.Create(deadLetterQueueName, true);
OrderProcessorClient client = new OrderProcessorClient("OrderProcessorEndpoint");
try
{
// Create the purchase order.
PurchaseOrder po = new PurchaseOrder();
po.CustomerId = "somecustomer.com";
po.PONumber = Guid.NewGuid().ToString();
PurchaseOrderLineItem lineItem1 = new PurchaseOrderLineItem();
lineItem1.ProductId = "Blue Widget";
lineItem1.Quantity = 54;
lineItem1.UnitCost = 29.99F;
PurchaseOrderLineItem lineItem2 = new PurchaseOrderLineItem();
lineItem2.ProductId = "Red Widget";
lineItem2.Quantity = 890;
lineItem2.UnitCost = 45.89F;
po.orderLineItems = new PurchaseOrderLineItem[2];
po.orderLineItems[0] = lineItem1;
po.orderLineItems[1] = lineItem2;
//Create a transaction scope.
using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
{
// Make a queued call to submit the purchase order.
client.SubmitPurchaseOrder(po);
// Complete the transaction.
scope.Complete();
}
client.Close();
}
catch(TimeoutException timeout)
{
Console.WriteLine(timeout.Message);
client.Abort();
}
catch(CommunicationException conexcp)
{
Console.WriteLine(conexcp.Message);
client.Abort();
}
Console.WriteLine();
Console.WriteLine("Press <ENTER> to terminate client.");
Console.ReadLine();
}
}
}
Imports System.ServiceModel.Channels
Imports System.Configuration
'using System.Messaging;
Imports System.ServiceModel
Imports System.Transactions
Namespace Microsoft.ServiceModel.Samples
'The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.
'Client implementation code.
Friend Class Client
Shared Sub Main()
' Get MSMQ queue name from appsettings in configuration.
Dim deadLetterQueueName As String = ConfigurationManager.AppSettings("deadLetterQueueName")
' Create the transacted MSMQ queue for storing dead message if necessary.
If (Not System.Messaging.MessageQueue.Exists(deadLetterQueueName)) Then
System.Messaging.MessageQueue.Create(deadLetterQueueName, True)
End If
Dim client As New OrderProcessorClient("OrderProcessorEndpoint")
Try
' Create the purchase order.
Dim po As New PurchaseOrder()
po.CustomerId = "somecustomer.com"
po.PONumber = Guid.NewGuid().ToString()
Dim lineItem1 As New PurchaseOrderLineItem()
lineItem1.ProductId = "Blue Widget"
lineItem1.Quantity = 54
lineItem1.UnitCost = 29.99F
Dim lineItem2 As New PurchaseOrderLineItem()
lineItem2.ProductId = "Red Widget"
lineItem2.Quantity = 890
lineItem2.UnitCost = 45.89F
po.orderLineItems = New PurchaseOrderLineItem(1) {}
po.orderLineItems(0) = lineItem1
po.orderLineItems(1) = lineItem2
'Create a transaction scope.
Using scope As New TransactionScope(TransactionScopeOption.Required)
' Make a queued call to submit the purchase order.
client.SubmitPurchaseOrder(po)
' Complete the transaction.
scope.Complete()
End Using
client.Close()
Catch timeout As TimeoutException
Console.WriteLine(timeout.Message)
client.Abort()
Catch conexcp As CommunicationException
Console.WriteLine(conexcp.Message)
client.Abort()
End Try
Console.WriteLine()
Console.WriteLine("Press <ENTER> to terminate client.")
Console.ReadLine()
End Sub
End Class
End Namespace
以下は、クライアント構成ファイルのコードです。
以下は、配信不能キューのメッセージを処理するサービスのコードです。
using System;
using System.ServiceModel.Description;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Transactions;
namespace Microsoft.ServiceModel.Samples
{
// Define a service contract.
[ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")]
public interface IOrderProcessor
{
[OperationContract(IsOneWay = true)]
void SubmitPurchaseOrder(PurchaseOrder po);
}
// Service class that implements the service contract.
// Added code to write output to the console window
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single, AddressFilterMode = AddressFilterMode.Any)]
public class PurchaseOrderDLQService : IOrderProcessor
{
OrderProcessorClient orderProcessorService;
public PurchaseOrderDLQService()
{
orderProcessorService = new OrderProcessorClient("OrderProcessorEndpoint");
}
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void SimpleSubmitPurchaseOrder(PurchaseOrder po)
{
Console.WriteLine("Submitting purchase order did not succeed ", po);
MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;
Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus);
Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure);
Console.WriteLine();
}
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void SubmitPurchaseOrder(PurchaseOrder po)
{
Console.WriteLine("Submitting purchase order did not succeed ", po);
MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;
Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus);
Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure);
Console.WriteLine();
// Resend the message if timed out.
if (mqProp.DeliveryFailure == DeliveryFailure.ReachQueueTimeout ||
mqProp.DeliveryFailure == DeliveryFailure.ReceiveTimeout)
{
// Re-send.
Console.WriteLine("Purchase order Time To Live expired");
Console.WriteLine("Trying to resend the message");
// Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
orderProcessorService.SubmitPurchaseOrder(po);
Console.WriteLine("Purchase order resent");
}
}
// Host the service within this EXE console application.
public static void Main()
{
// Create a ServiceHost for the PurchaseOrderDLQService type.
using (ServiceHost serviceHost = new ServiceHost(typeof(PurchaseOrderDLQService)))
{
// Open the ServiceHostBase to create listeners and start listening for messages.
serviceHost.Open();
// The service can now be accessed.
Console.WriteLine("The dead letter service is ready.");
Console.WriteLine("Press <ENTER> to terminate service.");
Console.WriteLine();
Console.ReadLine();
// Close the ServiceHostBase to shutdown the service.
serviceHost.Close();
}
}
}
}
Imports System.ServiceModel.Description
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.ServiceModel.Channels
Imports System.Transactions
Namespace Microsoft.ServiceModel.Samples
' Define a service contract.
<ServiceContract(Namespace:="http://Microsoft.ServiceModel.Samples")> _
Public Interface IOrderProcessor
<OperationContract(IsOneWay:=True)> _
Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder)
End Interface
' Service class that implements the service contract.
' Added code to write output to the console window
<ServiceBehavior(InstanceContextMode:=InstanceContextMode.Single, ConcurrencyMode:=ConcurrencyMode.Single, AddressFilterMode:=AddressFilterMode.Any)> _
Public Class PurchaseOrderDLQService
Implements IOrderProcessor
Private orderProcessorService As OrderProcessorClient
Public Sub New()
orderProcessorService = New OrderProcessorClient("OrderProcessorEndpoint")
End Sub
<OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
Public Sub SimpleSubmitPurchaseOrder(ByVal po As PurchaseOrder)
Console.WriteLine("Submitting purchase order did not succeed ", po)
Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)
Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
Console.WriteLine()
End Sub
<OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
Public Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder) Implements IOrderProcessor.SubmitPurchaseOrder
Console.WriteLine("Submitting purchase order did not succeed ", po)
Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)
Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
Console.WriteLine()
' Resend the message if timed out.
If mqProp.DeliveryFailure = DeliveryFailure.ReachQueueTimeout OrElse mqProp.DeliveryFailure = DeliveryFailure.ReceiveTimeout Then
' Re-send.
Console.WriteLine("Purchase order Time To Live expired")
Console.WriteLine("Trying to resend the message")
' Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
orderProcessorService.SubmitPurchaseOrder(po)
Console.WriteLine("Purchase order resent")
End If
End Sub
' Host the service within this EXE console application.
Public Shared Sub Main()
' Create a ServiceHost for the PurchaseOrderDLQService type.
Using serviceHost As New ServiceHost(GetType(PurchaseOrderDLQService))
' Open the ServiceHostBase to create listeners and start listening for messages.
serviceHost.Open()
' The service can now be accessed.
Console.WriteLine("The dead letter service is ready.")
Console.WriteLine("Press <ENTER> to terminate service.")
Console.WriteLine()
Console.ReadLine()
' Close the ServiceHostBase to shutdown the service.
serviceHost.Close()
End Using
End Sub
End Class
End Namespace
以下は、配信不能キュー サービス構成ファイルのコードです。