流处理源示例
StreamingFeeds 示例演示如何管理包含大量项的联合源。 在服务器上,此示例演示如何延迟在源中创建各个 SyndicationItem 对象,一直到项就要被写入网络流之前。
在客户端上,该示例演示如何使用自定义联合源格式化程序读取网络流中的各个项,以使当前读取的源永远不会完全缓冲到内存中。
为了最充分地演示联合 API 的流处理功能,此示例使用了一个似乎不可能的方案,在这个方案中,服务器公开一个包含无数个项的源。 在这种情况下,服务器继续在源中生成新的项,一直到它认为客户端已从源中读取指定数目(默认为 10)的项为止。 为简单起见,客户端和服务器是在同一个进程中实现的,并且使用一个共享 ItemCounter
对象跟踪客户端生成的项数。 ItemCounter
类型存在的唯一目的是为了使示例方案能够完全终止,它并不是所演示的模式的核心元素。
该演示使用 Visual C# 迭代器(使用 yield return
关键字构造)。 有关迭代器的详细信息,请参阅 MSDN 上的“使用迭代器”主题。
服务
服务实现包含一个操作的基本 WebGetAttribute 协定,如下面的代码所示。
[ServiceContract]
interface IStreamingFeedService
{
[WebGet]
[OperationContract]
Atom10FeedFormatter StreamedFeed();
}
服务通过使用 ItemGenerator
类创建一个可能无限的 SyndicationItem 实例流(使用迭代器)来实现此协定,如下面的代码所示。
class ItemGenerator
{
public IEnumerable<SyndicationItem> GenerateItems()
{
while (counter.GetCount() < maxItemsRead)
{
itemsReturned++;
yield return CreateNextItem();
}
}
...
}
当服务实现创建源时,使用 ItemGenerator.GenerateItems()
的输出,而不是使用缓冲的项集合。
public Atom10FeedFormatter StreamedFeed()
{
SyndicationFeed feed = new SyndicationFeed("Streamed feed", "Feed to test streaming", null);
//Generate an infinite stream of items. Both the client and the service share
//a reference to the ItemCounter, which allows the sample to terminate
//execution after the client has read 10 items from the stream
ItemGenerator itemGenerator = new ItemGenerator(this.counter, 10);
feed.Items = itemGenerator.GenerateItems();
return feed.GetAtom10Formatter();
}
因此,项流永远不会完全缓冲到内存中。 通过在 ItemGenerator.GenerateItems()
方法内的 yield return
语句中设置一个断点,并注意此断点是服务在返回 StreamedFeed()
方法的结果后第一次遇到,可以观察到此行为。
客户端
此示例中的客户端使用自定义 SyndicationFeedFormatter 实现,该实现延迟各个项在源中的具体化,而不是将它们缓冲到内存中。 自定义 StreamedAtom10FeedFormatter
实例的使用如下所示。
XmlReader reader = XmlReader.Create("http://localhost:8000/Service/Feeds/StreamedFeed");
StreamedAtom10FeedFormatter formatter = new StreamedAtom10FeedFormatter(counter);
SyndicationFeed feed = formatter.ReadFrom(reader);
通常,在从网络中读取源的全部内容并将内容缓冲到内存中之前,并不会返回对 ReadFrom(XmlReader) 的调用。 但是,StreamedAtom10FeedFormatter
对象重写了 ReadItems(XmlReader, SyndicationFeed, Boolean) 以返回迭代器,而不是返回缓冲的集合,如下面的代码所示。
protected override IEnumerable<SyndicationItem> ReadItems(XmlReader reader, SyndicationFeed feed, out bool areAllItemsRead)
{
areAllItemsRead = false;
return DelayReadItems(reader, feed);
}
private IEnumerable<SyndicationItem> DelayReadItems(XmlReader reader, SyndicationFeed feed)
{
while (reader.IsStartElement("entry", "http://www.w3.org/2005/Atom"))
{
yield return this.ReadItem(reader, feed);
}
reader.ReadEndElement();
}
因此,在遍历 ReadItems()
结果的客户端应用程序准备好使用每个项之前,并不会从网络中读取每个项。 通过在 StreamedAtom10FeedFormatter.DelayReadItems()
内的 yield return
语句中设置一个断点,并注意此断点是对 ReadFrom()
的调用完成后第一次遇到,可以观察到此行为。
下面的说明演示如何生成并运行示例。 请注意,尽管服务器在客户端读取 10 个项后停止生成项,但输出显示客户端读取的项数远远超过 10 个。 这是因为示例使用的网络绑定以 4 KB 段传输数据。 因此,客户端在有机会读取一个项之前,已收到 4KB 的项数据。 这是正常的行为(以合理大小的段发送经过流处理的 HTTP 数据可以提高性能)。
设置、生成和运行示例
若要生成 C# 或 Visual Basic .NET 版本的解决方案,请按照 Building the Windows Communication Foundation Samples中的说明进行操作。
要使用单机配置或跨计算机配置来运行示例,请按照运行 Windows Communication Foundation 示例中的说明进行操作。