使用 WCF 通道模型在 Oracle 資料庫中接收輪詢型資料變更訊息
您可以設定 Microsoft BizTalk Adapter for Oracle Database 來輪詢 Oracle 資料庫資料表或檢視是否有任何資料變更。 若要執行這類輪詢作業,配接器會定期針對 Oracle 資料表或檢視執行 SQL 查詢,後面接著選擇性 PL/SQL 程式碼區塊。 然後,Oracle 資料庫配接器會將 SQL 查詢的結果傳回至您的程式碼,作為輸入 POLLINGSTMT 作業中的強型別結果集。 如需使用 Oracle 資料庫配接器在 Oracle 資料庫上設定和執行輪詢之機制的詳細資訊,請參閱 在 Oracle 資料庫配接器中接收輪詢型資料變更訊息。 強烈建議您先閱讀本主題,再繼續進行。
您可以在 OracleDBBinding實例上設定系結屬性,以設定 Oracle Database 配接器來輪詢和 Oracle 資料庫資料表或檢視。 在 WCF 通道模型中,您會使用此系結來建置通道接聽程式,您可以從中取得 IInputChannel 通道來接收配接器的 POLLINGSTMT 作業。
如需如何在 WCF 中使用 IInputChannel 接收作業的概觀,請參閱 服務 Channel-Level 程式設計。
本主題中的各節提供資訊,協助您使用 WCF 通道模型對 Oracle 資料庫資料表和檢視執行輪詢。
取用 POLLINGSTMT 要求訊息
配接器會在程式碼上叫用 POLLINGSTMT 作業,以輪詢 Oracle 資料庫。 也就是說,配接器會傳送您透過 IInputChannel 通道圖形收到的 POLLINGSTMT 要求訊息。 POLLINGSTMT 要求訊息包含 PollingStatement 系結屬性所指定的查詢結果集。 您可以使用下列兩種方式之一來取用 POLLINGSTMT 訊息:
若要使用節點值串流取訊息,您必須在回應訊息上呼叫 WriteBodyContents 方法,並傳遞實作節點值串流的 XmlDictionaryWriter 。
若要使用節點串流取訊息,您可以在回應訊息上呼叫 GetReaderAtBodyContents 以取得 XmlReader。
您通常會使用節點值串流來取用包含 Oracle LOB 資料行的結果集。
如需 POLLINGSTMT 作業之訊息結構的詳細資訊,請參閱 輪詢作業的訊息架構。
如需 Oracle Database 配接器如何支援 LOB 資料串流的詳細資訊,請參閱 在 Oracle 資料庫配接器中串流大型物件資料類型。
如需在程式碼中實作節點值串流以支援 LOB 資料的端對端串流的詳細資訊,請參閱 使用 WCF 通道模型串流 Oracle Database LOB 資料類型。
關於本主題中使用的範例
本主題中的範例會使用 SCOTT。ACCOUNTACTIVITY 資料表和 SCOTT。ACCOUNT_PKG。PROCESS_ACTIVITY函式。 產生這些成品的腳本會隨附範例。 此範例會執行下列作業:
在輪詢語句中,從 ACCOUNTACTIVITY 資料表選取所有記錄,並在主控台上顯示。
作為 post poll 語句的一部分,此範例會叫用PROCESS_ACTIVITY函式,將所有記錄從 ACCOUNTACTIVITY 資料表移至 ACTIVITYHISTORY 資料表。
ACCOUNTACTIVITY 資料表上的後續輪詢不會傳回任何記錄。 不過,如果您想要範例在輪詢作業中傳回更多記錄,則必須在 ACCOUNTACTIVITY 資料表中插入一些記錄。 您可以執行範例所提供的 more_activity_data.sql 腳本來執行此動作。
如需範例的詳細資訊,請參閱 配接器範例。
如何使用 IInputChannel 輪詢 Oracle 資料庫?
若要輪詢 Oracle 資料庫資料表或檢視,以使用 WCF 通道模型接收資料變更訊息,請執行下列步驟。
使用 IInputChannel 接收資料變更的訊息
在 Visual Studio 中建立 Visual C# 專案。 針對本主題,建立主控台應用程式。
在方案總管中,新增 、
Microsoft.ServiceModel.Channels
System.ServiceModel
和System.Runtime.Serialization
的Microsoft.Adapters.OracleDB
參考。開啟 Program.cs 檔案,並新增下列命名空間:
Microsoft.Adapters.OracleDB
Microsoft.ServiceModel.Channels
System.ServiceModel
System.ServiceModel.Description
System.ServiceModel.Channels
System.Xml
System.Runtime.Serialization
System.IO
Microsoft.ServiceModel.Channels.Common
建立 OracleDBBinding 的實例,並設定設定輪詢所需的系結屬性。 您至少必須設定 InboundOperationType、 PollingStatement和 PollingInterval 系結屬性。 在此範例中,您也會設定 PostPollStatement 系結屬性。 如需用來設定輪詢之系結屬性的詳細資訊,請參閱 在 Oracle 資料庫配接器中接收輪詢型資料變更訊息。
OracleDBBinding binding = new OracleDBBinding(); binding.InboundOperationType = InboundOperation.Polling; binding.PollingInterval = 30; binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE"; binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;"
建立系結參數集合並設定認證。
ClientCredentials credentials = new ClientCredentials(); credentials.UserName.UserName = "SCOTT"; credentials.UserName.Password = "TIGER"; BindingParameterCollection bindingParams = new BindingParameterCollection(); bindingParams.Add(credentials);
建立通道接聽程式並加以開啟。 您可以叫用OracleDBBinding上的BuildChannelListener < IInputChannel >方法來建立接聽程式。 您可以在連線 URI 中設定 PollingId 屬性,以修改 POLLINGSTMT 作業的目標命名空間。 如需介面卡連線 URI 的詳細資訊,請參閱 建立 Oracle 資料庫連線 URI。
IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams); listener.Open();
在接聽程式上叫用AcceptChannel方法並加以開啟,以取得IInputChannel通道。
IInputChannel channel = listener.AcceptChannel(); channel.Open();
在通道上叫用 Receive ,以從配接器取得下一個 POLLINGSTMT 訊息。
Message message = channel.Receive();
取用 POLLINGSTMT 作業所傳回的結果集。 您可以使用 XmlReader 或 XmlDictionaryWriter來取用訊息。
XmlReader reader = message.GetReaderAtBodyContents();
當您完成處理要求時,請關閉通道。
channel.Close()
重要
完成 POLLINGSTMT 作業之後,您必須關閉通道。 無法關閉通道可能會影響程式碼的行為。
當您完成接收資料變更的訊息時,請關閉接聽程式。
listener.Close()
重要
關閉接聽程式不會關閉使用接聽程式建立的通道。 您必須明確關閉使用接聽程式建立的每個通道。
範例
下列範例示範如何設定 Oracle 資料庫配接器來輪詢 Oracle 資料庫資料表和檢視,並使用 WCF 通道模型接收 POLLLINGSTMT 作業。 POLLINGSTMT 作業中傳回的結果集會使用 XmlReader寫入主控台。
using System;
using System.Collections.Generic;
using System.Text;
// Add WCF, WCF LOB Adapter SDK, and Oracle Database adapter namepaces
using System.ServiceModel;
using System.ServiceModel.Description;
using Microsoft.ServiceModel.Channels;
using Microsoft.Adapters.OracleDB;
// Add this namespace for channel model
using System.ServiceModel.Channels;
using System.Xml;
using System.Runtime.Serialization;
using System.IO;
// Include this namespace for the WCF LOB Adapter SDK and Oracle exceptions
using Microsoft.ServiceModel.Channels.Common;
namespace OraclePollingCM
{
class Program
{
static void Main(string[] args)
{
Uri connectionUri = new Uri("oracleDB://ADAPTER/");
IChannelListener<IInputChannel> listener = null;
IInputChannel channel = null;
// set timeout to receive POLLINGSTMT message
TimeSpan messageTimeout = new TimeSpan(0, 0, 30);
Console.WriteLine("Sample Started");
try
{
// Create a binding: specify the InboundOperationType, PollingInterval (in seconds), the
// PollingStatement,and the PostPollStatement.
OracleDBBinding binding = new OracleDBBinding();
binding.InboundOperationType = InboundOperation.Polling;
binding.PollingInterval = 30;
binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";
binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";
// Create a binding parameter collection and set the credentials
ClientCredentials credentials = new ClientCredentials();
credentials.UserName.UserName = "SCOTT";
credentials.UserName.Password = "TIGER";
BindingParameterCollection bindingParams = new BindingParameterCollection();
bindingParams.Add(credentials);
Console.WriteLine("Opening listener");
// get a listener from the binding
listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);
listener.Open();
Console.WriteLine("Opening channel");
// get a channel from the listener
channel = listener.AcceptChannel();
channel.Open();
Console.WriteLine("Channel opened -- waiting for polled data");
Console.WriteLine("Receive request timeout is {0}", messageTimeout);
// Poll five times with the specified message timeout
// If a timeout occurs polling will be aborted
for (int i = 0; i < 5; i++)
{
Console.WriteLine("Polling: " + i);
Message message = null;
XmlReader reader = null;
try
{
//Message is received so process the results
message = channel.Receive(messageTimeout);
}
catch (System.TimeoutException toEx)
{
Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);
continue;
}
// Get the query results using an XML reader
try
{
reader = message.GetReaderAtBodyContents();
}
catch (Exception ex)
{
Console.WriteLine("Exception :" + ex);
throw;
}
// Write the TID, ACCOUNT, AMOUNT, and TRANSDATE for each record to the Console
Console.WriteLine("\nPolling data received for request number {0}", i+1);
Console.WriteLine("Tx ID\tACCOUNT\tAMOUNT\tTx DATE");
while (reader.Read())
{
if (reader.IsStartElement())
{
switch (reader.Name)
{
case "POLLINGSTMTRECORD":
Console.Write("\n");
break;
case "TID":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "ACCOUNT":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "AMOUNT":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "TRANSDATE":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
default:
break;
}
}
}
// return the cursor
Console.WriteLine();
// close the reader
reader.Close();
// To save the polling data to a file you can REPLACE the code above with the following
//
// XmlDocument doc = new XmlDocument();
// doc.Load(reader);
// using (XmlWriter writer = XmlWriter.Create("PollingOutput.xml"))
// {
// doc.WriteTo(writer);
// }
message.Close();
}
Console.WriteLine("\nPolling done -- hit <RETURN> to finish");
Console.ReadLine();
}
catch (TargetSystemException tex)
{
Console.WriteLine("Exception occurred on the Oracle Database");
Console.WriteLine(tex.InnerException.Message);
}
catch (ConnectionException cex)
{
Console.WriteLine("Exception occurred connecting to the Oracle Database");
Console.WriteLine(cex.InnerException.Message);
}
catch (Exception ex)
{
Console.WriteLine("Exception is: " + ex.Message);
if (ex.InnerException != null)
{
Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);
}
}
finally
{
// IMPORTANT: close the channel and listener to stop polling
if (channel != null)
{
if (channel.State == CommunicationState.Opened)
channel.Close();
else
channel.Abort();
}
if (listener != null)
{
if (listener.State == CommunicationState.Opened)
listener.Close();
else
listener.Abort();
}
}
}
}
}