使用 WCF 通道模型从SQL Server接收基于轮询的数据更改消息
可以将 SQL 适配器配置为接收SQL Server表或视图的定期数据更改消息。 可以指定适配器执行的轮询语句来轮询数据库。 轮询语句可以是 SELECT 语句,也可以是返回结果集的存储过程。
有关适配器如何支持轮询的详细信息,请参阅 支持使用轮询的入站调用。
重要
如果要在单个应用程序中执行多个轮询操作,则必须将 InboundID 连接属性指定为连接 URI 的一部分,使其唯一。 指定的入站 ID 将添加到操作命名空间,使其唯一。
本主题如何演示轮询
在本主题中,为了演示 SQL 适配器如何支持接收数据更改消息,请为 轮询 操作创建 .NET 应用程序。 对于本主题,请将 PolledDataAvailableStatement 指定为:
SELECT COUNT(*) FROM Employee
PolledDataAvailableStatement 必须返回包含正值的第一个单元格的结果集。 如果第一个单元格不包含正值,则适配器不会执行轮询语句。
作为轮询语句的一部分,执行以下操作:
从 Employee 表中选择所有行。
(MOVE_EMP_DATA) 执行存储过程,将所有记录从 Employee 表移动到 EmployeeHistory 表。
(ADD_EMP_DETAILS) 执行存储过程,将新记录添加到 Employee 表。 此过程采用员工姓名、任命和薪水作为参数。
若要执行这些操作,必须为 PollingStatement 绑定属性指定以下内容:
SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000
执行轮询语句后,将选择 Employee 表中的所有记录,并接收来自SQL Server的消息。 适配器执行MOVE_EMP_DATA存储过程后,所有记录将移动到 EmployeeHistory 表。 然后,执行ADD_EMP_DETAILS存储过程,将新记录添加到 Employee 表。 下一次轮询执行将仅返回一条记录。 此循环会一直持续到关闭通道侦听器为止。
使用 SQL 适配器绑定属性配置轮询查询
下表汇总了用于将适配器配置为接收数据更改消息的 SQL 适配器绑定属性。 必须将这些绑定属性指定为 .NET 应用程序的一部分进行轮询。
Binding 属性 | 说明 |
---|---|
InboundOperationType | 指定是要执行 轮询、 TypedPolling 还是 通知 入站操作。 默认值为 轮询。 |
PolledDataAvailableStatement | 指定适配器执行的 SQL 语句,以确定是否有任何数据可用于轮询。 SQL 语句必须返回由行和列组成的结果集。 仅当行可用时,才会执行为 PollingStatement 绑定属性指定的 SQL 语句。 |
PollingIntervalInSeconds | 指定 SQL 适配器执行为 PolledDataAvailableStatement 绑定属性指定的语句的间隔(以秒为单位)。 默认为 30 秒。 轮询间隔确定连续轮询之间的时间间隔。 如果在指定的时间间隔内执行语句,适配器将等待间隔中的剩余时间。 |
PollingStatement | 指定要轮询SQL Server数据库表的 SQL 语句。 可以为轮询语句指定简单的 SELECT 语句或存储过程。 默认值为 NULL。 必须指定 PollingStatement 的值才能启用轮询。 仅当有可供轮询的数据(由 PolledDataAvailableStatement 绑定属性确定)时,才会执行轮询语句。 可以指定任意数量的 SQL 语句,用分号分隔。 |
PollWhileDataFound | 指定 SQL 适配器是否忽略轮询间隔并持续执行为 PolledDataAvailableStatement 绑定属性指定的 SQL 语句(如果数据在轮询表中可用)。 如果表中没有可用数据,适配器将还原为按指定的轮询间隔执行 SQL 语句。 默认值为 false。 |
有关这些属性的更完整说明,请参阅阅读有关 BizTalk 适配器SQL Server适配器绑定属性的信息。 有关如何使用 SQL 适配器轮询SQL Server的完整说明,请阅读本主题的其余部分。
使用轮询请求消息
适配器对代码调用轮询操作以轮询SQL Server数据库。 也就是说,适配器发送通过 IInputChannel 通道形状收到的轮询请求消息。 轮询请求消息包含 PollingStatement 绑定属性指定的查询的结果集。 可以通过以下两种方式之一使用轮询消息:
若要使用节点值流式处理使用消息,必须在响应消息上调用 WriteBodyContents 方法,并向其传递实现节点值流式处理的 XmlDictionaryWriter 。
若要使用节点流式处理使用消息,可以在响应消息上调用 GetReaderAtBodyContents 以获取 XmlReader。
关于本主题中使用的示例
本主题中的示例轮询 Employee 表。 该示例还使用 MOVE_EMP_DATA 和 ADD_EMP_DETAILS 存储过程。 示例提供了生成这些项目的脚本。 有关示例的详细信息,请参阅 SQL 适配器的示例。 SQL 适配器示例也提供了基于本主题 的示例 Polling_ChannelModel。
使用 WCF 通道模型接收轮询操作的入站消息
本部分介绍如何编写 .NET 应用程序 (通道模型) 以使用 SQL 适配器接收入站轮询消息。
从 SQL 适配器接收轮询消息
在 Visual Studio 中创建 Microsoft Visual C# 项目。 对于本主题,请创建控制台应用程序。
在解决方案资源管理器中添加对
Microsoft.Adapters.Sql
、Microsoft.ServiceModel.Channels
、System.ServiceModel
和 的System.Runtime.Serialization
引用。打开 Program.cs 文件并添加以下命名空间:
Microsoft.Adapters.Sql
System.ServiceModel
System.ServiceModel.Description
System.ServiceModel.Channels
System.Xml
指定连接 URI。 有关适配器连接 URI 的详细信息,请参阅创建SQL Server连接 URI。
Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?");
创建 SqlAdapterBinding 的实例,并设置配置轮询所需的绑定属性。 至少必须设置 InboundOperationType、 PolledDataAvailableStatement 和 PollingStatement 绑定属性。 有关用于配置轮询的绑定属性的详细信息,请参阅 支持使用轮询的入站调用。
SqlAdapterBinding binding = new SqlAdapterBinding(); binding.InboundOperationType = InboundOperation.Polling; binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE"; binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";
创建绑定参数集合并设置凭据。
ClientCredentials credentials = new ClientCredentials(); credentials.UserName.UserName = "<Enter user name here>"; credentials.UserName.Password = "<Enter password here>"; BindingParameterCollection bindingParams = new BindingParameterCollection(); bindingParams.Add(credentials);
创建通道侦听器并将其打开。 通过在 SqlAdapterBinding 上调用 BuildChannelListener<IInputChannel> 方法来创建侦听器。
IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams); listener.Open();
通过在侦听器上调用 AcceptChannel 方法并打开它来获取 IInputChannel 通道。
IInputChannel channel = listener.AcceptChannel(); channel.Open();
在通道上调用 Receive ,从适配器获取下一条 POLLINGSTMT 消息。
Message message = channel.Receive();
使用 POLLINGSTMT 操作返回的结果集。 可以使用 XmlReader 或 XmlDictionaryWriter 来使用消息。
XmlReader reader = message.GetReaderAtBodyContents();
完成处理请求后关闭通道。
channel.Close()
重要
处理完 POLLINGSTMT 操作后,必须关闭通道。 关闭通道失败可能会影响代码的行为。
完成接收数据更改消息后,关闭侦听器。
listener.Close()
重要
关闭侦听器不会关闭使用侦听器创建的通道。 必须显式关闭使用侦听器创建的每个通道。
示例
以下示例演示执行 Employee 表的轮询查询。 轮询语句执行以下任务:
从 Employee 表中选择所有记录。
执行MOVE_EMP_DATA存储过程,将所有记录从 Employee 表移动到 EmployeeHistory 表。
执行ADD_EMP_DETAILS存储过程,将单个记录添加到 Employee 表。
轮询消息保存在 。
C:\PollingOutput.xml
using System;
using Microsoft.Adapters.Sql;
using System.ServiceModel;
using System.ServiceModel.Description;
using System.ServiceModel.Channels;
using System.Xml;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Sample started. This sample will poll 5 times and will perform the following tasks:");
Console.WriteLine("Press any key to start polling...");
Console.ReadLine();
IChannelListener<IInputChannel> listener = null;
IInputChannel channel = null;
try
{
TimeSpan messageTimeout = new TimeSpan(0, 0, 30);
SqlAdapterBinding binding = new SqlAdapterBinding();
binding.InboundOperationType = InboundOperation.Polling;
binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE";
binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";
Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?");
ClientCredentials credentials = new ClientCredentials();
credentials.UserName.UserName = "<Enter user name here>";
credentials.UserName.Password = "<Enter password here>";
BindingParameterCollection bindingParams = new BindingParameterCollection();
bindingParams.Add(credentials);
listener = binding.BuildChannelListener<IInputChannel>(ConnectionUri, bindingParams);
listener.Open();
channel = listener.AcceptChannel();
channel.Open();
Console.WriteLine("Channel and Listener opened...");
Console.WriteLine("\nWaiting for polled data...");
Console.WriteLine("Receive request timeout is {0}", messageTimeout);
// Poll five times with the specified message timeout
// If a timeout occurs polling will be aborted
for (int i = 0; i < 5; i++)
{
Console.WriteLine("Polling: " + i);
Message message = null;
XmlReader reader = null;
try
{
//Message is received so process the results
message = channel.Receive(messageTimeout);
}
catch (System.TimeoutException toEx)
{
Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);
continue;
}
// Get the query results using an XML reader
try
{
reader = message.GetReaderAtBodyContents();
}
catch (Exception ex)
{
Console.WriteLine("Exception :" + ex);
throw;
}
XmlDocument doc = new XmlDocument();
doc.Load(reader);
using (XmlWriter writer = XmlWriter.Create("C:\\PollingOutput.xml"))
{
doc.WriteTo(writer);
Console.WriteLine("The polling response is saved at 'C:\\PollingOutput.xml'");
}
// return the cursor
Console.WriteLine();
// close the reader
reader.Close();
message.Close();
}
Console.WriteLine("\nPolling done -- hit <RETURN> to finish");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine("Exception is: " + ex.Message);
if (ex.InnerException != null)
{
Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);
}
}
finally
{
// IMPORTANT: close the channel and listener to stop polling
if (channel != null)
{
if (channel.State == CommunicationState.Opened)
channel.Close();
else
channel.Abort();
}
if (listener != null)
{
if (listener.State == CommunicationState.Opened)
listener.Close();
else
listener.Abort();
}
}
}
}
}