使用 WCF 服务模型从SQL Server接收基于强类型轮询的数据更改消息

可以将 SQL 适配器配置为接收来自SQL Server的强类型轮询消息。 可以指定适配器为轮询数据库而执行的轮询语句。 轮询语句可以是 SELECT 语句,也可以是返回结果集的存储过程。 在想要接收强类型结果集的方案中,必须使用强类型轮询。 有关适配器如何支持强类型轮询的详细信息,请参阅 支持使用轮询的入站调用

重要

如果要在单个应用程序中执行多个轮询操作,则必须将 InboundID 连接属性指定为连接 URI 的一部分,使其唯一。 指定的入站 ID 将添加到操作命名空间,使其唯一。

本主题如何演示轮询

在本主题中,若要演示 SQL 适配器如何支持接收强类型数据更改消息,请创建 .NET 应用程序并为 TypedPolling 操作生成 WCF 服务协定。 请确保在生成 WCF 服务协定时指定以下内容:

  • 必须将 InboundID 指定为连接 URI 的一部分。

  • 必须为 PollingStatement 绑定属性指定轮询语句。

    此外,如果要在生成代理类时指定其他与轮询相关的绑定属性,请将 PolledDataAvailableStatement 指定为:

SELECT COUNT(*) FROM Employee

PolledDataAvailableStatement 必须返回包含正值的第一个单元格的结果集。 如果第一个单元格不包含正值,则适配器不会执行轮询语句。

作为轮询语句的一部分,执行以下操作:

  1. 从 Employee 表中选择所有行。

  2. (MOVE_EMP_DATA) 执行存储过程,将所有记录从 Employee 表移动到 EmployeeHistory 表。

  3. 执行存储过程 (ADD_EMP_DETAILS) 向 Employee 表添加新记录。 此过程采用员工姓名、指定和工资作为参数。

    若要执行这些操作,必须在生成 WCF 服务协定和帮助程序类时为 PollingStatement 绑定属性指定以下内容:

SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000

执行轮询语句后,将选中 Employee 表中的所有记录,并接收来自SQL Server的消息。 适配器执行MOVE_EMP_DATA存储过程后,所有记录将移动到 EmployeeHistory 表。 然后,执行ADD_EMP_DETAILS存储过程以将新记录添加到 Employee 表。 下一次轮询执行将仅返回一条记录。 此周期一直持续到关闭服务主机为止。

使用 SQL 适配器绑定属性配置类型化轮询

下表汇总了用于配置适配器以接收数据更改消息的 SQL 适配器绑定属性。 运行 .NET 应用程序时,除了 PollingStatement 绑定属性外,本节中列出的所有其他绑定属性都是必需属性。 在生成 WCF 服务协定 TypedPolling 操作之前,必须指定 PollingStatement 绑定属性。

Binding 属性 说明
InboundOperationType 指定是要执行 轮询TypedPolling 还是 通知 入站操作。 默认值为 轮询。 若要接收强类型轮询消息,请将此项设置为 TypedPolling
PolledDataAvailableStatement 指定适配器执行的 SQL 语句,以确定是否有任何数据可用于轮询。 SQL 语句必须返回由行和列组成的结果集。 仅当行可用时,才会执行为 PollingStatement 绑定属性指定的 SQL 语句。
PollingIntervalInSeconds 指定 SQL 适配器执行为 PolledDataAvailableStatement 绑定属性指定的语句的时间间隔(以秒为单位)。 默认为 30 秒。 轮询间隔确定连续轮询之间的时间间隔。 如果在指定的时间间隔内执行语句,适配器将等待间隔中的剩余时间。
PollingStatement 指定用于轮询SQL Server数据库表的 SQL 语句。 可以为轮询语句指定简单的 SELECT 语句或存储过程。 默认值为 NULL。 必须指定 PollingStatement 的值才能启用轮询。 仅当有可供轮询的数据(由 PolledDataAvailableStatement 绑定属性确定)时,才会执行轮询语句。 可以指定任意数量的用分号分隔的 SQL 语句。 重要: 对于 TypedPolling,必须在生成元数据之前指定此绑定属性。
PollWhileDataFound 指定 SQL 适配器是否忽略轮询间隔并持续执行为 PolledDataAvailableStatement 绑定属性指定的 SQL 语句(如果数据在要轮询的表中可用)。 如果表中没有可用数据,适配器将还原为按指定的轮询间隔执行 SQL 语句。 默认值为 false

有关这些属性的更完整说明,请参阅阅读有关 SQL Server 适配器绑定属性的 BizTalk 适配器。 有关如何使用 SQL 适配器轮询SQL Server的完整说明,请进一步阅读。

在 WCF 服务模型中配置强类型轮询

若要在使用 WCF 服务模型时接收 轮询 操作,必须:

  1. 从适配器公开的元数据生成 wcf 服务协定 (接口) TypedPolling 操作。 为此,可以使用添加适配器服务引用 Visual Studio 插件。 为此示例生成 WCF 服务协定时,请确保:

    • InboundID 指定为 Employee

    • PollingStatement 绑定属性指定轮询语句。 对于此示例,将轮询语句指定为:

      SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000
      
  2. 从此接口实现 WCF 服务。

  3. 使用服务主机 (System.ServiceModel.ServiceHost) 托管此 WCF 服务。

关于本主题中使用的示例

本主题中的示例轮询 Employee 表。 该示例还使用 MOVE_EMP_DATA 和 ADD_EMP_DETAILS 存储过程。 示例提供了生成这些项目的脚本。 有关示例的详细信息,请参阅 SQL 适配器的示例。 SQL 适配器示例还提供了基于本主题 的示例 TypedPolling_ServiceModel

WCF 服务协定和类

可以使用添加适配器服务引用插件创建 WCF 服务协定 (接口) 和 TypedPolling 操作的支持类。 有关生成 WCF 服务协定的详细信息,请参阅为SQL Server项目生成 WCF 客户端或 WCF 服务协定

WCF 服务协定 (接口)

以下代码显示为 TypedPolling 操作生成的 WCF 服务协定 (接口) 。

[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]
[System.ServiceModel.ServiceContractAttribute(Namespace="http://schemas.microsoft.com/Sql/2008/05/", ConfigurationName="TypedPolling_Employee")]
public interface TypedPolling_Employee {

    // CODEGEN: Generating message contract since the wrapper namespace (https://schemas.microsoft.com/Sql/2008/05/TypedPolling/Employee) of message TypedPolling
    // does not match the default value (https://schemas.microsoft.com/Sql/2008/05/)
    [System.ServiceModel.OperationContractAttribute(IsOneWay=true, Action="TypedPolling")]
    void TypedPolling(TypedPolling request);
}

消息协定

消息协定命名空间由连接 URI 中的 InboundID 参数修改(如果已指定)。 在此示例中,将入站 ID 指定为 Employee。 请求消息返回强类型结果集。

[System.Diagnostics.DebuggerStepThroughAttribute()]
[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]
[System.ServiceModel.MessageContractAttribute(WrapperName="TypedPolling", WrapperNamespace="http://schemas.microsoft.com/Sql/2008/05/TypedPolling/Employee", IsWrapped=true)]
public partial class TypedPolling {

[System.ServiceModel.MessageBodyMemberAttribute(Namespace="http://schemas.microsoft.com/Sql/2008/05/TypedPolling/Employee", Order=0)]
    public schemas.microsoft.com.Sql._2008._05.TypedPolling.Employee.TypedPollingResultSet0[] TypedPollingResultSet0;

[System.ServiceModel.MessageBodyMemberAttribute(Namespace="http://schemas.microsoft.com/Sql/2008/05/TypedPolling/Employee", Order=1)]
    public schemas.microsoft.com.Sql._2008._05.TypedPolling.Employee.TypedPollingResultSet1[] TypedPollingResultSet1;

    public TypedPolling() {
    }

    public TypedPolling(schemas.microsoft.com.Sql._2008._05.TypedPolling.Employee.TypedPollingResultSet0[] TypedPollingResultSet0, schemas.microsoft.com.Sql._2008._05.TypedPolling.Employee.TypedPollingResultSet1[] TypedPollingResultSet1) {
        this.TypedPollingResultSet0 = TypedPollingResultSet0;
        this.TypedPollingResultSet1 = TypedPollingResultSet1;
    }
}

WCF 服务类

添加适配器服务引用插件还会生成一个文件,该文件包含从服务协定 (接口) 实现的 WCF 服务类的存根。 文件的名称为 SqlAdapterBindingService.cs。 可以插入逻辑以直接在此类中处理 TypedPolling 操作。 以下代码显示由添加适配器服务引用插件生成的 WCF 服务类。

namespace SqlAdapterBindingNamespace {

    public class SqlAdapterBindingService : TypedPolling_Employee {

        // CODEGEN: Generating message contract since the wrapper namespace (https://schemas.microsoft.com/Sql/2008/05/TypedPolling/Employee) of message TypedPolling
        // does not match the default value (https://schemas.microsoft.com/Sql/2008/05/)
        public virtual void TypedPolling(TypedPolling request) {
            throw new System.NotImplementedException("The method or operation is not implemented.");
        }
    }
}

接收用于轮询操作的强类型入站消息

本部分介绍如何编写 .NET 应用程序以使用 SQL 适配器接收强类型入站轮询消息。

  1. 使用添加适配器服务引用插件生成 WCF 服务协定, (TypedPolling 操作的接口) 和帮助程序类。 请确保在为此示例生成 WCF 服务协定时指定以下内容:

    • 必须将 InboundID 指定为 Employee

    • 必须为 PollingStatement 绑定属性指定轮询语句。 对于此示例,将轮询语句指定为:

      SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000
      

      有关详细信息,请参阅为SQL Server项目生成 WCF 客户端或 WCF 服务协定。 可以选择在生成服务协定和帮助程序类时指定绑定属性。 这可以保证在生成的配置文件中正确设置它们。

  2. 从步骤 1 中生成的接口和帮助程序类实现 WCF 服务。 如果处理从 TypedPolling 操作接收的数据时遇到错误,此类的 TypedPolling 方法可能会引发异常以中止轮询事务;否则, 方法不返回任何内容。 必须按如下所示对 WCF 服务类进行特性化:

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
    

    TypedPolling 方法中,可以直接实现应用程序逻辑。 可以在 SqlAdapterBindingService.cs 中找到此类。 此示例中的此代码子类为 SqlAdapterBindingService 类。 在此代码中,作为强类型结果集收到的轮询消息将写入控制台。

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
    
    public class PollingService : SqlAdapterBindingNamespace.SqlAdapterBindingService
    {
        public override void TypedPolling(TypedPolling request)
        {
            Console.WriteLine("\nNew Polling Records Received");
            Console.WriteLine("*************************************************");
            Console.WriteLine("Employee ID\tName\tDesignation\tSalary");
    
            for (int i = 0; i < request.TypedPollingResultSet0.Length; i++)
            {
                Console.WriteLine("{0}\t{1}\t{2}\t{3}",
                request.TypedPollingResultSet0[i].Employee_ID,
                request.TypedPollingResultSet0[i].Name,
                request.TypedPollingResultSet0[i].Designation,
                request.TypedPollingResultSet0[i].Salary);
            }
            Console.WriteLine("*************************************************");
            Console.WriteLine("\nHit <RETURN> to stop polling");
        }
    }
    
  3. 由于 SQL 适配器不接受凭据作为连接 URI 的一部分,因此必须实现以下类才能传递SQL Server数据库的凭据。 在应用程序的后一部分中,将实例化此类以传递SQL Server凭据。

    class PollingCredentials : ClientCredentials, IServiceBehavior
    {
        public void AddBindingParameters(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase, Collection<ServiceEndpoint> endpoints, BindingParameterCollection bindingParameters)
        {
            bindingParameters.Add(this);
        }
    
        public void ApplyDispatchBehavior(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase)
        { }
    
        public void Validate(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase)
        { }
    
        protected override ClientCredentials CloneCore()
        {
            ClientCredentials clone = new PollingCredentials();
            clone.UserName.UserName = this.UserName.UserName;
            clone.UserName.Password = this.UserName.Password;
            return clone;
        }
    }
    
  4. 创建 SqlAdapterBinding 并通过指定绑定属性来配置轮询操作。 可以在代码中显式执行此操作,也可以在配置中以声明方式执行此操作。 至少必须指定 InboundOperationTypePolledDataAvailableStatementPollingStatement

    SqlAdapterBinding binding = new SqlAdapterBinding();
    binding.InboundOperationType = InboundOperation.TypedPolling;
    binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE";
    binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";
    
  5. 通过实例化在步骤 3 中创建的 PollingCredentials 类,指定SQL Server数据库凭据。

    PollingCredentials credentials = new PollingCredentials();
    credentials.UserName.UserName = "<Enter user name here>";
    credentials.UserName.Password = "<Enter password here>";
    
  6. 创建在步骤 2 中创建的 WCF 服务的实例。

    // create service instance
    PollingService service = new PollingService();
    
  7. 使用 WCF 服务和基本连接 URI 创建 System.ServiceModel.ServiceHost 的实例。 基本连接 URI 不能包含入站 ID。 还必须在此处指定凭据。

    // Enable service host
    Uri[] baseUri = new Uri[] { new Uri("mssql://mysqlserver//mydatabase") };
    ServiceHost serviceHost = new ServiceHost(service, baseUri);
    serviceHost.Description.Behaviors.Add(credentials);
    
    
  8. 将服务终结点添加到服务主机。 为此,请按以下步骤操作:

    • 使用在步骤 4 中创建的绑定。

    • 指定包含凭据的连接 URI,并根据需要指定入站 ID。

    • 将协定指定为“TypedPolling_Employee”。

    // Add service endpoint: be sure to specify TypedPolling_Employee as the contract
    Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?InboundID=Employee");
    serviceHost.AddServiceEndpoint("TypedPolling_Employee", binding, ConnectionUri);
    
  9. 若要接收轮询数据,请打开服务主机。 每当查询返回结果集时,适配器都将返回数据。

    // Open the service host to begin polling
    serviceHost.Open();
    
  10. 若要终止轮询,请关闭服务主机。

    重要

    适配器将继续轮询,直到服务主机关闭。

    serviceHost.Close();
    

示例

以下示例演示执行 Employee 表的轮询查询。 轮询语句执行以下任务:

  1. 从 Employee 表中选择所有记录。

  2. 执行MOVE_EMP_DATA存储过程,将所有记录从 Employee 表移动到 EmployeeHistory 表。

  3. 执行ADD_EMP_DETAILS存储过程,将单个记录添加到 Employee 表。

    第一条轮询消息将包含 Employee 表中的所有记录。 后续轮询消息将仅包含ADD_EMP_DETAILS存储过程插入的最后一条记录。 适配器将继续轮询,直到你通过按 <RETURN>关闭服务主机。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

using Microsoft.Adapters.Sql;
using Microsoft.ServiceModel.Channels;
using System.ServiceModel;
using System.ServiceModel.Description;
using System.ServiceModel.Channels;
using System.Collections.ObjectModel;

namespace TypedPolling_ServiceModel
{
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]

    public class PollingService : SqlAdapterBindingNamespace.SqlAdapterBindingService
    {
        public override void TypedPolling(TypedPolling request)
        {
            Console.WriteLine("\nNew Polling Records Received");
            Console.WriteLine("*************************************************");
            Console.WriteLine("Employee ID\tName\tDesignation\tSalary");

            for (int i = 0; i < request.TypedPollingResultSet0.Length; i++)
            {
                Console.WriteLine("{0}\t{1}\t{2}\t{3}",
                request.TypedPollingResultSet0[i].Employee_ID,
                request.TypedPollingResultSet0[i].Name,
                request.TypedPollingResultSet0[i].Designation,
                request.TypedPollingResultSet0[i].Salary);
            }
            Console.WriteLine("*************************************************");
            Console.WriteLine("\nHit <RETURN> to stop polling");
        }
    }

    class PollingCredentials : ClientCredentials, IServiceBehavior
    {
        public void AddBindingParameters(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase, Collection<ServiceEndpoint> endpoints, BindingParameterCollection bindingParameters)
        {
            bindingParameters.Add(this);
        }

        public void ApplyDispatchBehavior(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase)
        { }

        public void Validate(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase)
        { }

        protected override ClientCredentials CloneCore()
        {
            ClientCredentials clone = new PollingCredentials();
            clone.UserName.UserName = this.UserName.UserName;
            clone.UserName.Password = this.UserName.Password;
            return clone;
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            ServiceHost serviceHost = null;
            try
            {
                Console.WriteLine("Sample started...");
                Console.WriteLine("Press any key to start polling...");
                Console.ReadLine();

                SqlAdapterBinding binding = new SqlAdapterBinding();
                binding.InboundOperationType = InboundOperation.TypedPolling;
                binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE";
                binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";
                Console.WriteLine("Binding properties assigned...");

                // This URI is used to specify the address for the ServiceEndpoint
                // It must contain the InboundId that was used to generate
                // the WCF service callback interface
                Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?InboundId=Employee");

                // This URI is used to initialize the ServiceHost. It cannot contain
                // the InboundID; otherwise,an exception is thrown when
                // the ServiceHost is initialized.
                Uri[] baseUri = new Uri[] { new Uri("mssql://mysqlserver//mydatabase") };

                PollingCredentials credentials = new PollingCredentials();
                credentials.UserName.UserName = "<Enter user name here>";
                credentials.UserName.Password = "<Enter password here>";

                Console.WriteLine("Opening service host...");
                PollingService service = new PollingService();
                serviceHost = new ServiceHost(service, baseUri);
                serviceHost.Description.Behaviors.Add(credentials);
                serviceHost.AddServiceEndpoint("TypedPolling_Employee", binding, ConnectionUri);
                serviceHost.Open();
                Console.WriteLine("Service host opened...");
                Console.WriteLine("Polling started...");
                Console.ReadLine();
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception :" + e.Message);
                Console.ReadLine();

                /* If there is an error it will be specified in the inner exception */
                if (e.InnerException != null)
                {
                    Console.WriteLine("InnerException: " + e.InnerException.Message);
                    Console.ReadLine();
                }
            }
            finally
            {
                // IMPORTANT: you must close the ServiceHost to stop polling
                if (serviceHost.State == CommunicationState.Opened)
                    serviceHost.Close();
                else
                    serviceHost.Abort();
            }
        }
    }
}

另请参阅

使用 SQL 适配器和 WCF 服务模型轮询SQL Server