WCF チャネル モデルを使用して Oracle Database でポーリング ベースのデータ変更メッセージを受信する
Microsoft BizTalk Adapter for Oracle Database を構成して、Oracle データベース テーブルまたはビューでデータの変更をポーリングできます。 このようなポーリング操作を実行するために、アダプターは Oracle テーブルまたはビューに対して SQL クエリを定期的に実行し、その後にオプションの PL/SQL コード ブロックを実行します。 SQL クエリの結果は、Oracle Database アダプターによって、受信 POLLINGSTMT 操作で厳密に型指定された結果セットとしてコードに返されます。 Oracle データベース アダプターを使用して Oracle データベースのポーリングを構成および実行するために使用されるメカニズムの詳細については、「 Oracle Database アダプターでポーリング ベースのデータ変更メッセージを受信する」を参照してください。 先に進む前に、このトピックを読むことを強くお勧めします。
OracleDBBinding のインスタンスにバインド プロパティを設定して、Oracle データベース のテーブルまたはビューをポーリングするように Oracle Database アダプターを構成します。 WCF チャネル モデルでは、このバインディングを使用してチャネル リスナーを作成します。このリスナーから IInputChannel チャネルを取得し、アダプターから POLLINGSTMT 操作を受信できます。
WCF で IInputChannel を使用して操作を受け取る方法の概要については、「 サービス Channel-Level プログラミング」を参照してください。
このトピックのセクションでは、WCF チャネル モデルを使用して Oracle データベース テーブルとビューに対するポーリングを実行するのに役立つ情報を提供します。
POLLINGSTMT 要求メッセージの使用
アダプターは、Oracle データベースをポーリングするために、コードに対して POLLINGSTMT 操作を呼び出します。 つまり、アダプターは、 IInputChannel チャネル図形を介して受信した POLLINGSTMT 要求メッセージを送信します。 POLLINGSTMT 要求メッセージには、 PollingStatement バインド プロパティで指定されたクエリの結果セットが含まれています。 POLLINGSTMT メッセージは、次の 2 つの方法のいずれかで使用できます。
ノード値ストリーミングを使用してメッセージを使用するには、応答メッセージで WriteBodyContents メソッドを呼び出し、ノード値ストリーミングを実装する XmlDictionaryWriter を渡す必要があります。
ノード ストリーミングを使用してメッセージを使用するには、応答メッセージで GetReaderAtBodyContents を呼び出して XmlReader を取得します。
通常、ノード値ストリーミングを使用して、Oracle LOB データ列を含む結果セットを使用します。
POLLINGSTMT 操作のメッセージ構造の詳細については、「ポーリング操作の メッセージ スキーマ」を参照してください。
Oracle Database アダプターが LOB データのストリーミングをサポートする方法の詳細については、「 Oracle Database アダプターでのラージ オブジェクト データ型のストリーミング」を参照してください。
LOB データのエンド ツー エンド ストリーミングをサポートするためにコードにノード値ストリーミングを実装する方法の詳細については、「 WCF チャネル モデルを使用した Oracle データベース LOB データ型のストリーミング」を参照してください。
このトピックで使用する例について
このトピックの例では、SCOTT を使用します。ACCOUNTACTIVITY テーブルと SCOTT。ACCOUNT_PKG。PROCESS_ACTIVITY関数。 これらの成果物を生成するスクリプトは、サンプルと共に提供されます。 この例では、次の操作を実行します。
ポーリング ステートメントの一部として、ACCOUNTACTIVITY テーブルからすべてのレコードを選択し、コンソールに表示します。
この例では、post poll ステートメントの一部として、ACCOUNTACTIVITY テーブルから ACTIVITYHISTORY テーブルにすべてのレコードを移動するPROCESS_ACTIVITY関数を呼び出します。
ACCOUNTACTIVITY テーブルに対する後続のポーリングでは、レコードは返されません。 ただし、この例でポーリング操作の一部としてさらに多くのレコードを返す場合は、ACCOUNTACTIVITY テーブルにいくつかのレコードを挿入する必要があります。 これを行うには、サンプルに付属のmore_activity_data.sql スクリプトを実行します。
サンプルの詳細については、「 アダプターのサンプル」を参照してください。
IInputChannel を使用して Oracle データベースをポーリングする方法
WCF チャネル モデルを使用して Oracle データベース のテーブルまたはビューをポーリングしてデータ変更メッセージを受信するには、次の手順を実行します。
IInputChannel を使用してデータ変更メッセージを受信するには
Visual Studio で Visual C# プロジェクトを作成します。 このトピックでは、コンソール アプリケーションを作成します。
ソリューション エクスプローラーで、および への参照を
Microsoft.Adapters.OracleDB
Microsoft.ServiceModel.Channels
System.ServiceModel
追加します。System.Runtime.Serialization
Program.cs ファイルを開き、次の名前空間を追加します。
Microsoft.Adapters.OracleDB
Microsoft.ServiceModel.Channels
System.ServiceModel
System.ServiceModel.Description
System.ServiceModel.Channels
System.Xml
System.Runtime.Serialization
System.IO
Microsoft.ServiceModel.Channels.Common
OracleDBBinding のインスタンスを作成し、ポーリングの構成に必要なバインド プロパティを設定します。 少なくとも 、InboundOperationType、 PollingStatement、および PollingInterval バインド プロパティを設定する必要があります。 この例では、 PostPollStatement バインド プロパティも設定します。 ポーリングの構成に使用されるバインディング プロパティの詳細については、「 Oracle Database アダプターでポーリング ベースのデータ変更メッセージを受信する」を参照してください。
OracleDBBinding binding = new OracleDBBinding(); binding.InboundOperationType = InboundOperation.Polling; binding.PollingInterval = 30; binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE"; binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;"
バインド パラメーター コレクションを作成し、資格情報を設定します。
ClientCredentials credentials = new ClientCredentials(); credentials.UserName.UserName = "SCOTT"; credentials.UserName.Password = "TIGER"; BindingParameterCollection bindingParams = new BindingParameterCollection(); bindingParams.Add(credentials);
チャネル リスナーを作成して開きます。 OracleDBBinding で BuildChannelListener<IInputChannel> メソッドを呼び出して、リスナーを作成します。 接続 URI で PollingId プロパティを設定することで、POLLINGSTMT 操作のターゲット名前空間を変更できます。 アダプター接続 URI の詳細については、「 Oracle データベース接続 URI の作成」を参照してください。
IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams); listener.Open();
リスナーで AcceptChannel メソッドを呼び出して IInputChannel チャネルを取得し、開きます。
IInputChannel channel = listener.AcceptChannel(); channel.Open();
チャネルで Receive を呼び出して、アダプターから次の POLLINGSTMT メッセージを取得します。
Message message = channel.Receive();
POLLINGSTMT 操作によって返された結果セットを使用します。 メッセージは、 XmlReader または XmlDictionaryWriter を使用して使用できます。
XmlReader reader = message.GetReaderAtBodyContents();
要求の処理が完了したら、チャネルを閉じます。
channel.Close()
重要
POLLINGSTMT 操作の処理が完了したら、チャネルを閉じる必要があります。 チャネルを閉じ失敗すると、コードの動作に影響する可能性があります。
データ変更メッセージの受信が完了したら、リスナーを閉じます。
listener.Close()
重要
リスナーを閉じても、リスナーを使用して作成されたチャネルは閉じません。 リスナーを使用して作成された各チャネルを明示的に閉じる必要があります。
例
次の例は、Oracle データベース のテーブルとビューをポーリングし、WCF チャネル モデルを使用して POLLLINGSTMT 操作を受信するように Oracle Database アダプターを構成する方法を示しています。 POLLINGSTMT 操作で返される結果セットは、 XmlReader を使用してコンソールに書き込まれます。
using System;
using System.Collections.Generic;
using System.Text;
// Add WCF, WCF LOB Adapter SDK, and Oracle Database adapter namepaces
using System.ServiceModel;
using System.ServiceModel.Description;
using Microsoft.ServiceModel.Channels;
using Microsoft.Adapters.OracleDB;
// Add this namespace for channel model
using System.ServiceModel.Channels;
using System.Xml;
using System.Runtime.Serialization;
using System.IO;
// Include this namespace for the WCF LOB Adapter SDK and Oracle exceptions
using Microsoft.ServiceModel.Channels.Common;
namespace OraclePollingCM
{
class Program
{
static void Main(string[] args)
{
Uri connectionUri = new Uri("oracleDB://ADAPTER/");
IChannelListener<IInputChannel> listener = null;
IInputChannel channel = null;
// set timeout to receive POLLINGSTMT message
TimeSpan messageTimeout = new TimeSpan(0, 0, 30);
Console.WriteLine("Sample Started");
try
{
// Create a binding: specify the InboundOperationType, PollingInterval (in seconds), the
// PollingStatement,and the PostPollStatement.
OracleDBBinding binding = new OracleDBBinding();
binding.InboundOperationType = InboundOperation.Polling;
binding.PollingInterval = 30;
binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";
binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";
// Create a binding parameter collection and set the credentials
ClientCredentials credentials = new ClientCredentials();
credentials.UserName.UserName = "SCOTT";
credentials.UserName.Password = "TIGER";
BindingParameterCollection bindingParams = new BindingParameterCollection();
bindingParams.Add(credentials);
Console.WriteLine("Opening listener");
// get a listener from the binding
listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);
listener.Open();
Console.WriteLine("Opening channel");
// get a channel from the listener
channel = listener.AcceptChannel();
channel.Open();
Console.WriteLine("Channel opened -- waiting for polled data");
Console.WriteLine("Receive request timeout is {0}", messageTimeout);
// Poll five times with the specified message timeout
// If a timeout occurs polling will be aborted
for (int i = 0; i < 5; i++)
{
Console.WriteLine("Polling: " + i);
Message message = null;
XmlReader reader = null;
try
{
//Message is received so process the results
message = channel.Receive(messageTimeout);
}
catch (System.TimeoutException toEx)
{
Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);
continue;
}
// Get the query results using an XML reader
try
{
reader = message.GetReaderAtBodyContents();
}
catch (Exception ex)
{
Console.WriteLine("Exception :" + ex);
throw;
}
// Write the TID, ACCOUNT, AMOUNT, and TRANSDATE for each record to the Console
Console.WriteLine("\nPolling data received for request number {0}", i+1);
Console.WriteLine("Tx ID\tACCOUNT\tAMOUNT\tTx DATE");
while (reader.Read())
{
if (reader.IsStartElement())
{
switch (reader.Name)
{
case "POLLINGSTMTRECORD":
Console.Write("\n");
break;
case "TID":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "ACCOUNT":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "AMOUNT":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "TRANSDATE":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
default:
break;
}
}
}
// return the cursor
Console.WriteLine();
// close the reader
reader.Close();
// To save the polling data to a file you can REPLACE the code above with the following
//
// XmlDocument doc = new XmlDocument();
// doc.Load(reader);
// using (XmlWriter writer = XmlWriter.Create("PollingOutput.xml"))
// {
// doc.WriteTo(writer);
// }
message.Close();
}
Console.WriteLine("\nPolling done -- hit <RETURN> to finish");
Console.ReadLine();
}
catch (TargetSystemException tex)
{
Console.WriteLine("Exception occurred on the Oracle Database");
Console.WriteLine(tex.InnerException.Message);
}
catch (ConnectionException cex)
{
Console.WriteLine("Exception occurred connecting to the Oracle Database");
Console.WriteLine(cex.InnerException.Message);
}
catch (Exception ex)
{
Console.WriteLine("Exception is: " + ex.Message);
if (ex.InnerException != null)
{
Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);
}
}
finally
{
// IMPORTANT: close the channel and listener to stop polling
if (channel != null)
{
if (channel.State == CommunicationState.Opened)
channel.Close();
else
channel.Abort();
}
if (listener != null)
{
if (listener.State == CommunicationState.Opened)
listener.Close();
else
listener.Abort();
}
}
}
}
}