使用 WCF 服务模型在 Oracle 数据库中接收基于轮询的数据更改消息
可以将适用于 Oracle 数据库的 Microsoft BizTalk 适配器配置为针对 Oracle 表或视图接收基于轮询的数据更改消息。 为了接收数据更改的消息,适配器定期针对 Oracle 表或视图执行 SQL 查询,后跟可选的 PL/SQL 代码块。 然后,在入站 POLLINGSTMT 操作中,Oracle 数据库适配器将 SQL 查询的结果作为强类型结果集返回给应用程序。 有关用于使用 Oracle 数据库适配器在 Oracle 数据库上配置和执行轮询的机制的详细信息,请参阅 在 Oracle 数据库适配器中接收基于轮询的数据更改消息。 强烈建议在继续之前先阅读本主题。
若要在使用 WCF 服务模型时接收 POLLINGSTMT 操作,必须:
从适配器公开的元数据 (接口) 为 POLLINGSTMT 操作生成 WCF 服务协定。 为此,请使用添加适配器服务参考 Visual Studio 插件或 ServiceModel 元数据实用工具 (svcutil.exe) 。
从此接口实现 WCF 服务。
使用服务主机 (System.ServiceModel.ServiceHost) 托管此 WCF 服务。
本部分中的主题提供了有助于对 WCF 服务模型中的 Oracle 数据库表和视图执行轮询的信息和过程。
关于本主题中使用的示例
本主题中的示例使用 /SCOTT/ACCOUNTACTIVITY 表和 /SCOTT/Package/ACCOUNT_PKG/PROCESS_ACTIVITY 函数。 BizTalk 适配器包示例提供了生成这些项目的脚本。 有关示例的详细信息,请参阅 适配器示例。
在 WCF 服务模型中配置轮询
通过将绑定属性和可选连接属性 (参数) ,将 Oracle 数据库适配器配置为对 Oracle 数据库表和视图执行轮询。 其中一些属性是必需的,有些属性必须同时在设计时和运行时设置才能生效。
在设计时,在连接到 Oracle 数据库以生成 WCF 服务协定时设置连接参数和绑定属性。
在运行时,在用于创建服务主机的 OracleDBBinding 对象上设置绑定属性。 将服务侦听器添加到服务主机时,可以设置连接参数。
以下列表简要概述了用于配置轮询的绑定属性和连接参数:
PollingStatement 绑定属性。 必须在设计时和运行时设置此绑定属性。
可选绑定属性。 只需在运行时设置这些设置。
AcceptCredentialsInUri 绑定属性。 如果要在连接 URI 中启用凭据,则必须在运行时将此绑定属性设置为 true 。 将服务终结点添加到服务主机时,连接 URI 中必须存在用户名和密码。
连接 URI 中的 PollingId 查询字符串参数。 如果要更改 POLLINGSTMT 操作的命名空间,必须在设计时和运行时设置此连接属性。
有关用于配置轮询的绑定属性和连接参数的完整说明,请参阅 在 Oracle 数据库适配器中接收基于轮询的数据更改消息。
WCF 服务协定和类
使用添加适配器服务参考 Visual Studio 插件或 ServiceModel 元数据实用工具 (svcutil.exe) 创建 WCF 服务协定, (接口) 和 POLLINGSTMT 操作的支持类。
使用以下任一工具连接到 Oracle 数据库以生成 POLLINGSTMT 操作的服务协定时:
必须指定 PollingStatement 绑定属性。 适配器使用此绑定属性中的 SELECT 语句为 POLLINGSTMT 操作返回的强类型结果集生成正确的元数据。
可以选择在连接 URI 中指定 PollingId 参数。 适配器使用此参数为 POLLINGSTMT 操作生成命名空间。
在以下示例中:
PollingStatement 设置为“SELECT * FROM ACCOUNTACTIVITY FOR UPDATE”。
PollingId 设置为“AcctActivity”。
WCF 服务协定 (接口)
以下代码演示为 POLLINGSTMT 操作生成的 WCF 服务协定 (接口) 。
[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]
[System.ServiceModel.ServiceContractAttribute(Namespace="http://Microsoft.LobServices.OracleDB/2007/03", ConfigurationName="POLLINGSTMT_OperationGroup")]
public interface POLLINGSTMT_OperationGroup {
// CODEGEN: Generating message contract since the wrapper namespace (http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity)
// of message POLLINGSTMT does not match the default value (http://Microsoft.LobServices.OracleDB/2007/03)
[System.ServiceModel.OperationContractAttribute(IsOneWay=true, Action="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMT")]
void POLLINGSTMT(POLLINGSTMT request);
}
消息协定
消息协定命名空间由连接 URI 中的 PollingId 参数修改。 请求消息返回一组强类型记录。
[System.Diagnostics.DebuggerStepThroughAttribute()]
[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]
[System.ServiceModel.MessageContractAttribute(WrapperName="POLLINGSTMT", WrapperNamespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity", IsWrapped=true)]
public partial class POLLINGSTMT {
[System.ServiceModel.MessageBodyMemberAttribute(Namespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity", Order=0)]
public microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity.POLLINGSTMTRECORD[] POLLINGSTMTRECORD;
public POLLINGSTMT() {
}
public POLLINGSTMT(microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity.POLLINGSTMTRECORD[] POLLINGSTMTRECORD) {
this.POLLINGSTMTRECORD = POLLINGSTMTRECORD;
}
}
数据协定命名空间
“数据协定”是在服务与客户端之间达成的正式协议,用于以抽象方式描述要交换的数据。 也就是说,为了进行通信,客户端和服务不必共享相同的类型,只需共享相同的数据协定。
如果发生数据更改消息,则如果在连接 URI 中指定) ,则 PollingId 参数也会修改数据协定命名空间 (。 数据协定由一个类组成,该类表示查询结果集中的强类型记录。 此示例省略了类定义的详细信息。 类包含表示结果集中的列的属性。
在以下示例中,使用 PollingId“AcctActivity”。
namespace microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity {
using System.Runtime.Serialization;
[System.Diagnostics.DebuggerStepThroughAttribute()]
[System.CodeDom.Compiler.GeneratedCodeAttribute("System.Runtime.Serialization", "3.0.0.0")]
[System.Runtime.Serialization.DataContractAttribute(Name="POLLINGSTMTRECORD", Namespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity")]
public partial class POLLINGSTMTRECORD : object, System.Runtime.Serialization.IExtensibleDataObject {…}
}
}
WCF 服务类
添加适配器服务引用插件还会生成一个文件,该文件具有从服务协定 (接口) 实现的 WCF 服务类的存根。 文件的名称是 OracleDBBindingService.cs。 可以插入逻辑以直接在此类中处理 POLLINGSTMT 操作。 如果使用 svcutil.exe 生成服务协定接口,则必须自行实现此类。 以下代码显示由添加适配器服务引用插件生成的 WCF 服务类。
namespace OracleDBBindingNamespace {
public class OracleDBBindingService : POLLINGSTMT_OperationGroup {
// CODEGEN: Generating message contract since the wrapper namespace (http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity)
// of message POLLINGSTMT does not match the default value (http://Microsoft.LobServices.OracleDB/2007/03)
public virtual void POLLINGSTMT(POLLINGSTMT request) {
throw new System.NotImplementedException("The method or operation is not implemented.");
}
}
}
接收 POLLINGSTMT 操作
从 Oracle 数据库适配器接收轮询数据
使用添加适配器服务引用插件或 svcutil.exe 为 POLLINGSTMT 操作生成 WCF 服务协定 (接口) 和帮助程序类。 有关详细信息,请参阅 为 Oracle 数据库解决方案项目生成 WCF 客户端或 WCF 服务协定。 连接到适配器时,至少必须设置 PollingStatement 绑定属性。 可以选择在连接 URI 中指定 PollingId 参数。 如果使用添加适配器服务引用插件,则应设置配置所需的所有绑定参数。 这可以保证在生成的配置文件中正确设置它们。
从步骤 1 中生成的接口和帮助程序类实现 WCF 服务。 如果处理从 POLLINGSTMT 操作接收的数据时遇到错误,此类的 POLLINGSTMT 方法可能会引发异常来中止轮询事务;否则, 方法不返回任何内容。 必须按如下所示对 WCF 服务类进行属性设置:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
如果使用了添加适配器服务引用插件来生成接口,则可以直接在生成的 OracleDBBindingService 类的 POLLINGSTMT 方法中实现逻辑。 此类可在 OracleDBBindingService.cs 中找到。 此示例中的此代码子类为 OracleDBBindingService 类。
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] public class PollingStmtService : OracleDBBindingService { public override void POLLINGSTMT(POLLINGSTMT request) { Console.WriteLine("\nNew Polling Records Received"); Console.WriteLine("Tx Id\tAccount\tAmount\tDate\t\t\tDescription"); for (int i = 0; i < request.POLLINGSTMTRECORD.Length; i++) { Console.WriteLine("{0}\t{1}\t{2}\t{3}\t{4}", request.POLLINGSTMTRECORD[i].TID, request.POLLINGSTMTRECORD[i].ACCOUNT, request.POLLINGSTMTRECORD[i].AMOUNT, request.POLLINGSTMTRECORD[i].TRANSDATE, request.POLLINGSTMTRECORD[i].DESCRIPTION); } } }
如果使用 svcutil.exe 来生成接口,则必须创建实现接口的 WCF 服务,并在此类的 POLLINGSTMT 方法中实现逻辑。
创建在步骤 2 中创建的 WCF 服务的实例。
// create service instance PollingStmtService pollingInstance = new PollingStmtService();
使用 WCF 服务和基本连接 URI 创建 System.ServiceModel.ServiceHost 的实例。 基本连接 URI 不能包含 userinfoparams 或query_string。
// Enable service host Uri[] baseUri = new Uri[] { new Uri("oracledb://Adapter") }; ServiceHost srvHost = new ServiceHost(pollingInstance, baseUri);
创建 OracleDBBinding 并通过设置其绑定属性来配置轮询操作。 可以在代码中显式执行此操作,也可以在配置中以声明方式执行此操作。 至少必须指定轮询语句和轮询间隔。 在此示例中,将凭据指定为 URI 的一部分,因此还必须将 AcceptCredentialsInUri 设置为 true。
// Create and configure a binding for the service endpoint. NOTE: binding // parameters are set here for clarity, but these are already set in the // the generated configuration file OracleDBBinding binding = new OracleDBBinding(); // The credentials are included in the connection URI, so set this property to true binding.AcceptCredentialsInUri = true; // Same as statement specified in Configure Adapter dialog box binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE"; binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;"; // Be sure to set the interval long enough to complete processing before // the next poll binding.PollingInterval = 15; // Polling is transactional; be sure to set an adequate isolation level // for your environment binding.TransactionIsolationLevel = TransactionIsolationLevel.ReadCommitted;
将服务终结点添加到服务主机。 为此,请按以下步骤操作:
使用步骤 5 中创建的绑定。
指定包含凭据的连接 URI,并根据需要指定 PollingId。
将协定指定为“POLLINGSTMT_OperationGroup”。
// Add service endpoint: be sure to specify POLLINGSTMT_OperationGroup as the contract Uri serviceUri = new Uri("oracledb://User=SCOTT;Password=TIGER@Adapter?PollingId=AcctActivity"); srvHost.AddServiceEndpoint("POLLINGSTMT_OperationGroup", binding, serviceUri);
若要接收轮询数据,请打开服务主机。 每当查询返回结果集时,适配器都会返回数据。
// Open the service host to begin polling srvHost.Open();
若要终止轮询,请关闭服务主机。
重要
适配器将继续轮询,直到服务主机关闭。
srvHost.Close();
示例
以下示例演示针对 /SCOTT/ACCOUNTACTIVITY 表执行的轮询查询。 轮询后语句调用一个 Oracle 函数,该函数将已处理的记录移到另一个表 /SCOTT/ACCOUNTHISTORY。 通过在连接 URI 中将 PollingId 参数设置为“AccountActivity”来修改 POLLINGSTMT 操作的命名空间。 在此示例中,POLLINGSTMT 操作的 WCF 服务是通过对生成的 OracleDBBindingService 类进行子类创建的;但是,可以直接在生成的类中实现逻辑。
using System;
using System.Collections.Generic;
using System.Text;
// Add these three references to use the Oracle adapter
using System.ServiceModel;
using Microsoft.ServiceModel.Channels;
using Microsoft.Adapters.OracleDB;
using microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity;
using OracleDBBindingNamespace;
namespace OraclePollingSM
{
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
public class PollingStmtService : OracleDBBindingService
{
public override void POLLINGSTMT(POLLINGSTMT request)
{
Console.WriteLine("\nNew Polling Records Received");
Console.WriteLine("Tx Id\tAccount\tAmount\tDate\t\t\tDescription");
for (int i = 0; i < request.POLLINGSTMTRECORD.Length; i++)
{
Console.WriteLine("{0}\t{1}\t{2}\t{3}\t{4}", request.POLLINGSTMTRECORD[i].TID,
request.POLLINGSTMTRECORD[i].ACCOUNT,
request.POLLINGSTMTRECORD[i].AMOUNT,
request.POLLINGSTMTRECORD[i].TRANSDATE,
request.POLLINGSTMTRECORD[i].DESCRIPTION);
}
Console.WriteLine("\nHit <RETURN> to stop polling");
}
}
class Program
{
static void Main(string[] args)
{
ServiceHost srvHost = null;
// This URI is used to specify the address for the ServiceEndpoint
// It must contain credentials and the PollingId (if any) that was used to generate
// the WCF service callback interface
Uri serviceUri = new Uri("OracleDb://User=SCOTT;Password=TIGER@Adapter?PollingId=AcctActivity");
// This URI is used to initialize the ServiceHost. It cannot contain
// userinfoparms (credentials) or a query_string (PollingId); otherwise,
// an exception is thrown when the ServiceHost is initialized.
Uri[] baseUri = new Uri[] { new Uri("OracleDb://Adapter") };
Console.WriteLine("Sample started, initializing service host -- please wait");
// create an instanc of the WCF service callback class
PollingStmtService pollingInstance = new PollingStmtService();
try
{
// Create a ServiceHost with the service callback instance and a base URI (address)
srvHost = new ServiceHost(pollingInstance, baseUri);
// Create and configure a binding for the service endpoint. Note: binding
// parameters are set here for clarity but these are already set in the
// generated configuration file
//
// The following properties are set
// AcceptCredentialsInUri (true) to enable credentials in the connection URI for AddServiceEndpoint
// PollingStatement
// PostPollStatement calls PROCESS_ACTIVITY on Oracle. This procedure moves the queried records to
// the ACCOUNTHISTORY table
// PollingInterval (15 seconds)
// TransactionIsolationLevel
OracleDBBinding binding = new OracleDBBinding();
// The Credentials are included in the Connection Uri so set this property true
binding.AcceptCredentialsInUri = true;
// Same as statement specified in Configure Adapter dialog box
binding.InboundOperationType = InboundOperation.Polling;
binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";
binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";
// Be sure to set the interval long enough to complete processing before
// the next poll
binding.PollingInterval = 15;
// Polling is transactional, be sure to set an adequate isolation level
// for your environment
binding.TransactionIsolationLevel = TransactionIsolationLevel.ReadCommitted;
// Add service endpoint: be sure to specify POLLINGSTMT_OperationGroup as the contract
srvHost.AddServiceEndpoint("POLLINGSTMT_OperationGroup", binding, serviceUri);
Console.WriteLine("Opening the service host");
// Open the service host to begin polling
srvHost.Open();
// Wait to receive request
Console.WriteLine("\nPolling started. Returned records will be written to the console.");
Console.WriteLine("Hit <RETURN> to stop polling");
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine("Exception :" + e.Message);
Console.ReadLine();
/* If there is an Oracle Error it will be specified in the inner exception */
if (e.InnerException != null)
{
Console.WriteLine("InnerException: " + e.InnerException.Message);
Console.ReadLine();
}
}
finally
{
// IMPORTANT: you must close the ServiceHost to stop polling
if (srvHost.State == CommunicationState.Opened)
srvHost.Close();
else
srvHost.Abort();
}
}
}
}