使用 WCF 通道模型在 Oracle 数据库中接收基于轮询的数据更改消息
可以将适用于 Oracle 数据库的 Microsoft BizTalk 适配器配置为轮询 Oracle 数据库表或视图以查找任何数据更改。 若要执行此类轮询操作,适配器会定期对 Oracle 表或视图执行 SQL 查询,后跟可选的 PL/SQL 代码块。 然后,Oracle 数据库适配器会将 SQL 查询的结果作为入站 POLLINGSTMT 操作中的强类型结果集返回给代码。 有关用于使用 Oracle 数据库适配器在 Oracle 数据库上配置和执行轮询的机制的详细信息,请参阅 在 Oracle 数据库适配器中接收基于轮询的数据更改消息。 强烈建议在继续之前先阅读本主题。
通过在 OracleDBBinding 实例上设置绑定属性,将 Oracle 数据库适配器配置为轮询和 Oracle 数据库表或视图。 在 WCF 通道模型中,使用此绑定生成一个通道侦听器,从中可以获取 IInputChannel 通道以从适配器接收 POLLINGSTMT 操作。
有关如何在 WCF 中使用 IInputChannel 接收操作的概述,请参阅 服务 Channel-Level 编程。
本主题中的各节提供的信息有助于使用 WCF 通道模型对 Oracle 数据库表和视图执行轮询。
使用 POLLINGSTMT 请求消息
适配器对代码调用 POLLINGSTMT 操作以轮询 Oracle 数据库。 也就是说,适配器发送通过 IInputChannel 通道形状收到的 POLLINGSTMT 请求消息。 POLLINGSTMT 请求消息包含 PollingStatement 绑定属性指定的查询的结果集。 可以通过以下两种方式之一使用 POLLINGSTMT 消息:
若要使用节点值流式处理消息,必须对响应消息调用 WriteBodyContents 方法,并向其传递实现节点值流式处理功能的 XmlDictionaryWriter 。
若要使用节点流式处理消息,可以对响应消息调用 GetReaderAtBodyContents 以获取 XmlReader。
通常使用节点值流式处理来使用包含 Oracle LOB 数据列的结果集。
有关 POLLINGSTMT 操作的消息结构的详细信息,请参阅 轮询操作的消息架构。
有关 Oracle 数据库适配器如何支持对 LOB 数据进行流式处理的详细信息,请参阅 在 Oracle 数据库适配器中流式处理大型对象数据类型。
有关在代码中实现节点值流式处理以支持 LOB 数据的端到端流式处理的详细信息,请参阅 使用 WCF 通道模型流式处理 Oracle Database LOB 数据类型。
关于本主题中使用的示例
本主题中的示例使用 SCOTT。ACCOUNTACTIVITY 表和 SCOTT。ACCOUNT_PKG。PROCESS_ACTIVITY 函数。 示例提供了生成这些项目的脚本。 该示例执行以下操作:
作为轮询语句的一部分,从 ACCOUNTACTIVITY 表中选择所有记录,并在控制台上显示。
作为投票后语句的一部分,该示例调用 PROCESS_ACTIVITY 函数,该函数将所有记录从 ACCOUNTACTIVITY 表移动到 ACTIVITYHISTORY 表。
对 ACCOUNTACTIVITY 表的后续轮询不会返回任何记录。 但是,如果希望示例在轮询操作过程中返回更多记录,则必须在 ACCOUNTACTIVITY 表中插入一些记录。 为此,可以运行示例随附的 more_activity_data.sql 脚本。
有关示例的详细信息,请参阅 适配器示例。
如何使用 IInputChannel 轮询 Oracle 数据库?
若要使用 WCF 通道模型轮询 Oracle 数据库表或视图以接收数据更改消息,请执行以下步骤。
使用 IInputChannel 接收数据更改的消息
在 Visual Studio 中创建 Visual C# 项目。 对于本主题,请创建控制台应用程序。
在解决方案资源管理器中添加对 、
Microsoft.ServiceModel.Channels
、System.ServiceModel
和 的Microsoft.Adapters.OracleDB
System.Runtime.Serialization
引用。打开 Program.cs 文件并添加以下命名空间:
Microsoft.Adapters.OracleDB
Microsoft.ServiceModel.Channels
System.ServiceModel
System.ServiceModel.Description
System.ServiceModel.Channels
System.Xml
System.Runtime.Serialization
System.IO
Microsoft.ServiceModel.Channels.Common
创建 OracleDBBinding 实例并设置配置轮询所需的绑定属性。 至少必须设置 InboundOperationType、 PollingStatement 和 PollingInterval 绑定属性。 在此示例中,还设置了 PostPollStatement 绑定属性。 有关用于配置轮询的绑定属性的详细信息,请参阅 在 Oracle 数据库适配器中接收基于轮询的数据更改消息。
OracleDBBinding binding = new OracleDBBinding(); binding.InboundOperationType = InboundOperation.Polling; binding.PollingInterval = 30; binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE"; binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;"
创建绑定参数集合并设置凭据。
ClientCredentials credentials = new ClientCredentials(); credentials.UserName.UserName = "SCOTT"; credentials.UserName.Password = "TIGER"; BindingParameterCollection bindingParams = new BindingParameterCollection(); bindingParams.Add(credentials);
创建并打开通道侦听器。 通过在 OracleDBBinding 上调用 BuildChannelListener<IInputChannel> 方法来创建侦听器。 可以通过在连接 URI 中设置 PollingId 属性来修改 POLLINGSTMT 操作的目标命名空间。 有关适配器连接 URI 的详细信息,请参阅 创建 Oracle 数据库连接 URI。
IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams); listener.Open();
通过在侦听器上调用 AcceptChannel 方法获取 IInputChannel 通道,并将其打开。
IInputChannel channel = listener.AcceptChannel(); channel.Open();
在通道上调用 Receive ,从适配器获取下一条 POLLINGSTMT 消息。
Message message = channel.Receive();
使用 POLLINGSTMT 操作返回的结果集。 可以使用 XmlReader 或 XmlDictionaryWriter 来使用消息。
XmlReader reader = message.GetReaderAtBodyContents();
完成处理请求后关闭通道。
channel.Close()
重要
处理完 POLLINGSTMT 操作后,必须关闭通道。 关闭通道失败可能会影响代码的行为。
接收完数据更改的消息后,关闭侦听器。
listener.Close()
重要
关闭侦听器不会关闭使用侦听器创建的通道。 必须显式关闭使用侦听器创建的每个通道。
示例
以下示例演示如何配置 Oracle 数据库适配器以使用 WCF 通道模型轮询 Oracle 数据库表和视图以及接收 POLLLINGSTMT 操作。 使用 XmlReader 将 POLLINGSTMT 操作中返回的结果集写入控制台。
using System;
using System.Collections.Generic;
using System.Text;
// Add WCF, WCF LOB Adapter SDK, and Oracle Database adapter namepaces
using System.ServiceModel;
using System.ServiceModel.Description;
using Microsoft.ServiceModel.Channels;
using Microsoft.Adapters.OracleDB;
// Add this namespace for channel model
using System.ServiceModel.Channels;
using System.Xml;
using System.Runtime.Serialization;
using System.IO;
// Include this namespace for the WCF LOB Adapter SDK and Oracle exceptions
using Microsoft.ServiceModel.Channels.Common;
namespace OraclePollingCM
{
class Program
{
static void Main(string[] args)
{
Uri connectionUri = new Uri("oracleDB://ADAPTER/");
IChannelListener<IInputChannel> listener = null;
IInputChannel channel = null;
// set timeout to receive POLLINGSTMT message
TimeSpan messageTimeout = new TimeSpan(0, 0, 30);
Console.WriteLine("Sample Started");
try
{
// Create a binding: specify the InboundOperationType, PollingInterval (in seconds), the
// PollingStatement,and the PostPollStatement.
OracleDBBinding binding = new OracleDBBinding();
binding.InboundOperationType = InboundOperation.Polling;
binding.PollingInterval = 30;
binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";
binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";
// Create a binding parameter collection and set the credentials
ClientCredentials credentials = new ClientCredentials();
credentials.UserName.UserName = "SCOTT";
credentials.UserName.Password = "TIGER";
BindingParameterCollection bindingParams = new BindingParameterCollection();
bindingParams.Add(credentials);
Console.WriteLine("Opening listener");
// get a listener from the binding
listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);
listener.Open();
Console.WriteLine("Opening channel");
// get a channel from the listener
channel = listener.AcceptChannel();
channel.Open();
Console.WriteLine("Channel opened -- waiting for polled data");
Console.WriteLine("Receive request timeout is {0}", messageTimeout);
// Poll five times with the specified message timeout
// If a timeout occurs polling will be aborted
for (int i = 0; i < 5; i++)
{
Console.WriteLine("Polling: " + i);
Message message = null;
XmlReader reader = null;
try
{
//Message is received so process the results
message = channel.Receive(messageTimeout);
}
catch (System.TimeoutException toEx)
{
Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);
continue;
}
// Get the query results using an XML reader
try
{
reader = message.GetReaderAtBodyContents();
}
catch (Exception ex)
{
Console.WriteLine("Exception :" + ex);
throw;
}
// Write the TID, ACCOUNT, AMOUNT, and TRANSDATE for each record to the Console
Console.WriteLine("\nPolling data received for request number {0}", i+1);
Console.WriteLine("Tx ID\tACCOUNT\tAMOUNT\tTx DATE");
while (reader.Read())
{
if (reader.IsStartElement())
{
switch (reader.Name)
{
case "POLLINGSTMTRECORD":
Console.Write("\n");
break;
case "TID":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "ACCOUNT":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "AMOUNT":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "TRANSDATE":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
default:
break;
}
}
}
// return the cursor
Console.WriteLine();
// close the reader
reader.Close();
// To save the polling data to a file you can REPLACE the code above with the following
//
// XmlDocument doc = new XmlDocument();
// doc.Load(reader);
// using (XmlWriter writer = XmlWriter.Create("PollingOutput.xml"))
// {
// doc.WriteTo(writer);
// }
message.Close();
}
Console.WriteLine("\nPolling done -- hit <RETURN> to finish");
Console.ReadLine();
}
catch (TargetSystemException tex)
{
Console.WriteLine("Exception occurred on the Oracle Database");
Console.WriteLine(tex.InnerException.Message);
}
catch (ConnectionException cex)
{
Console.WriteLine("Exception occurred connecting to the Oracle Database");
Console.WriteLine(cex.InnerException.Message);
}
catch (Exception ex)
{
Console.WriteLine("Exception is: " + ex.Message);
if (ex.InnerException != null)
{
Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);
}
}
finally
{
// IMPORTANT: close the channel and listener to stop polling
if (channel != null)
{
if (channel.State == CommunicationState.Opened)
channel.Close();
else
channel.Abort();
}
if (listener != null)
{
if (listener.State == CommunicationState.Opened)
listener.Close();
else
listener.Abort();
}
}
}
}
}