다음을 통해 공유


StreamInsight手札(三)——使用IObservable接口创建StreamInsight程序

本文将介绍如何利用IObservable接口创建StreamInsight应用程序。

Observer是.net Framework 4中引入的开发模式。IObserver<T>IObservable<T>接口为基于推送的通知提供通用机制,也称为观察者设计模式。IObservable<T> 接口表示发送通知(提供程序)的类;IObserver<T> 接口表示接收通知(观察器)的类。

在下列应用场景下,适合采用Observable模式:

  1. 历史数据查询。对有限长的大数据集进行临时的查询。
  2. Ad hoc查询。对数据进行一系列交互的ad hoc查询。
  3. 内嵌于用户程序。StreamInsight内嵌于用户应用程序之中。

使用这种开发模型,开发者只需要:

  1. 实现IObservable接口
  2. 实现IObserver接口
  3. 绑定到查询

下面将用示例代码来说明.。

1、安装Reactive Extension for .Net 4: 

2、新建一个工程,注意选择.net framework 4.0。

3、添加如下引用。System.CoreEx和System.Reactive在C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2856.0\Net4\目录下。

定义事件Payload:

namespace HelloInsightObservable

{

    public class HelloPayload

    {

        public int value { get; set; }

    }

}

5、定义InputObservable类实现Iobservable接口

public class InputObservable : IObservable<int>

这个类需要实现Subscribe方法:

public IDisposable Subscribe(IObserver<int> observer)

{

    if (observer != null && !this.observers.Contains(observer))

    {

        this.observers.Add(observer);

    }

    return observer as IDisposable;

}

我们还需要在这个类里模拟输入。我们定义了一个Timer,当定时器时间到时,调用定时器的回调函数生成一个随机整数,并调用observer的OnNext方法推送数据。

private void GenerateInput(object _)

{

    foreach (var observer in observers)

    {

        int value = random.Next(100);

        Console.WriteLine("Random generated data {0} : {1}", generatedNumber, value);

        observer.OnNext(value);

        generatedNumber++;

        if (generatedNumber >= dataNumber)

        {

            observer.OnCompleted();

            timer.Change(Timeout.Infinite, timeSpan);

            return;

        }

    }

    timer.Change(timeSpan, timeSpan);

}

6、将Observable类作为CEP引擎的输入。

在Main函数中添加如下代码。注意加亮部分的代码,这里与适配器方式的程序不同的是,没有插入CTI事件。

var inputObservable = new InputObservable(10);

using (var server = Server.Create("DefaultInstance"))

{

    var application = server.CreateApplication("Observable Application");

    var stream = inputObservable.ToPointStream(application,

            e => PointEvent.CreateInsert(DateTime.Now, new HelloPayload { value = e }),

            AdvanceTimeSettings.StrictlyIncreasingStartTime,

            "Observable Stream");

    var query = from e in stream

                where e.value > 50

    select e;

}

7、定义OutputObserver类实现Iobserver接口。

class OutputObserver : IObserver<HelloPayload>

{

    public virtual void OnCompleted()

    {

        Console.WriteLine("Stopping query...");

    }

    public virtual void OnError(Exception e)

    {

        Console.WriteLine("Unexpected error occured");

    }

    public virtual void OnNext(HelloPayload payload)

    {

        Console.WriteLine("DataOutput: {0}", payload.value);

    }

}

8、将OutputObserver绑定到query。

在main函数中添加如下代码:

var outputObserver = new OutputObserver();

var outputObservable = query.ToObservable();

outputObservable.Subscribe(outputObserver);

Console.ReadLine();

Subscribe函数执行后,就会开始接收数据输入,并调用OnNext方法输出。注意最后一句话的作用是让程序继续运行直至数据输入完成,Observable调用Observer的OnComplete方法,或者用户中途通过输入任意字符停止Observer。

9、程序运行的结果如下:

软件测试工程师 金晶

HelloInsight.zip