流处理源示例

StreamingFeeds 示例演示如何管理包含大量条目的订阅源。 在服务器上,此示例演示如何延迟在源中创建各个 SyndicationItem 对象,一直到项就要被写入网络流之前。

在客户端上,该示例演示如何使用自定义联合供稿格式化程序从网络流读取单个项,以便读取的供稿永远不会被完全缓冲到内存中。

为了最充分地演示联合 API 的流处理功能,此示例使用了一个似乎不可能的方案,在这个方案中,服务器公开一个包含无数个项的源。 在这种情况下,服务器会继续将新项添加到信息流中,直到确定客户端已从信息流读取指定数量的项目(默认情况下为 10)。 为简单起见,客户端和服务器都在同一进程中实现,并使用共享 ItemCounter 对象跟踪客户端生成的项数。 该 ItemCounter 类型仅用于允许示例方案完全终止,并且不是所演示模式的核心元素。

该演示使用 Visual C# 迭代器(使用 yield return 关键字构造)。 有关迭代器的详细信息,请参阅 MSDN 上的“使用迭代器”主题。

服务

该服务实现了一个由一个操作组成的基本 WebGetAttribute 契约,如以下代码所示。

[ServiceContract]
interface IStreamingFeedService
{
    [WebGet]
    [OperationContract]
    Atom10FeedFormatter StreamedFeed();
}

该服务通过使用 ItemGenerator 类通过迭代器创建可能无限的 SyndicationItem 实例流来实现此协定,如以下代码所示。

class ItemGenerator
{
    public IEnumerable<SyndicationItem> GenerateItems()
    {
        while (counter.GetCount() < maxItemsRead)
        {
            itemsReturned++;
            yield return CreateNextItem();
        }

    }
    ...
}

当服务实现创建源时,使用 ItemGenerator.GenerateItems() 的输出,而不是使用缓冲的项集合。

public Atom10FeedFormatter StreamedFeed()
{
    SyndicationFeed feed = new SyndicationFeed("Streamed feed", "Feed to test streaming", null);
    //Generate an infinite stream of items. Both the client and the service share
    //a reference to the ItemCounter, which allows the sample to terminate
    //execution after the client has read 10 items from the stream
    ItemGenerator itemGenerator = new ItemGenerator(this.counter, 10);

    feed.Items = itemGenerator.GenerateItems();
    return feed.GetAtom10Formatter();
}

因此,数据流不会完全缓冲到内存中。 通过在 yield return 方法内的 ItemGenerator.GenerateItems() 语句中设置一个断点,并注意此断点是服务在返回 StreamedFeed() 方法的结果后第一次遇到,可以观察到此行为。

客户

在此示例中,客户端使用自定义 SyndicationFeedFormatter 实现来延迟数据流中各个项的实例化,而不是将它们缓冲到内存中。 自定义 StreamedAtom10FeedFormatter 实例如下所示。

XmlReader reader = XmlReader.Create("http://localhost:8000/Service/Feeds/StreamedFeed");
StreamedAtom10FeedFormatter formatter = new StreamedAtom10FeedFormatter(counter);

SyndicationFeed feed = formatter.ReadFrom(reader);

通常,调用 ReadFrom(XmlReader) 在从网络读取完数据流的全部内容并将其缓冲到内存中之前不会返回。 但是,对象 StreamedAtom10FeedFormatter 重写 ReadItems(XmlReader, SyndicationFeed, Boolean) 以返回迭代器而不是缓冲集合,如以下代码所示。

protected override IEnumerable<SyndicationItem> ReadItems(XmlReader reader, SyndicationFeed feed, out bool areAllItemsRead)
{
    areAllItemsRead = false;
    return DelayReadItems(reader, feed);
}

private IEnumerable<SyndicationItem> DelayReadItems(XmlReader reader, SyndicationFeed feed)
{
    while (reader.IsStartElement("entry", "http://www.w3.org/2005/Atom"))
    {
        yield return this.ReadItem(reader, feed);
    }

    reader.ReadEndElement();
}

因此,只有当客户端应用程序在遍历ReadItems() 的结果并准备好使用时,才会从网络中读取每个项。 通过在 yield return 内的 StreamedAtom10FeedFormatter.DelayReadItems() 语句中设置一个断点,并注意此断点是对 ReadFrom() 的调用完成后第一次遇到,可以观察到此行为。

以下说明演示如何生成和运行示例。 请注意,尽管服务器在客户端读取了 10 个项目后停止生成项,但输出显示客户端读取的项明显超过 10 个项目。 这是因为示例使用的网络绑定以四千字节(KB)段传输数据。 因此,客户端在有机会读取一个项目之前接收 4KB 的项目数据。 这是正常行为(在合理大小的段中发送流式 HTTP 数据会增加性能)。

设置、生成和运行示例

  1. 确保已为 Windows Communication Foundation 示例 执行One-Time 安装过程。

  2. 若要生成解决方案的 C# 或 Visual Basic .NET 版本,请按照 生成 Windows Communication Foundation 示例中的说明进行操作。

  3. 若要在单台计算机或跨计算机配置中运行示例,请按照 运行 Windows Communication Foundation 示例中的说明进行操作。

另请参阅