使用英语阅读

通过


Orleans 中的流式处理

Orleans v.1.0.0 添加了对编程模型的流式处理扩展的支持。 流式处理扩展提供一组抽象和 API,使流的设计思路和用法变得更简单、更可靠。 开发人员可以通过流式处理扩展编写以结构化方式对一系列事件进行操作的反应式应用程序。 流提供程序的扩展性模型使编程模型可以保持兼容,并可以在各种现有队列技术(例如事件中心服务总线Azure 队列Apache Kafka)之间移植。 无需编写特殊代码或运行专用进程即可与此类队列交互。

为何应该考虑 Orleans 流?

如果你已了解流处理并熟悉事件中心KafkaAzure 流分析Apache StormApache Spark 流式处理.NET 中的反应式扩展 (Rx) 等技术,你可能会问自己为何应考虑 Orleans 流。 为何我们还需要另一个流处理系统,以及执行组件与流之间有怎样的关系?“为何需要 Orleans 流?”回答了此问题。

编程模型

Orleans 流编程模型背后有多个原理:

  1. Orleans 流是虚拟的。 也就是说,流始终存在。 不能显式创建或销毁流,并且流永远不能失败。
  2. 流由流 ID 标识,流 ID 只是由 GUID 和字符串组成的逻辑名称。
  3. Orleans 流允许在时间和空间上将数据的生成与其处理相解耦。 这意味着,流生成者和流使用者可以位于不同的服务器或不同的时区中,并且可以承受故障。
  4. Orleans 流是轻型、动态的。 Orleans 流式处理运行时可以处理大量频繁传入和传出的流。
  5. Orleans 流绑定是动态的。 Orleans 流式处理运行时可以处理 grain 频繁与流连接和断开连接的情况。
  6. Orleans 流式处理运行时以透明方式管理流使用生命周期。 应用程序订阅流后,将接收流的事件,即使存在故障。
  7. Orleans 流在 grain 和 Orleans 客户端之间以统一的方式工作。

编程 API

应用程序使用实现 Orleans.Streams.IAsyncObserver<T>Orleans.Streams.IAsyncObservable<T> 接口的 Orleans.Streams.IAsyncStream<T> 来与流交互。 这些 API 类似于众所周知的 .NET 中的反应式扩展 (Rx)

在以下典型示例中,设备生成一些数据,这些数据作为 HTTP 请求发送到云中运行的服务。 在前端服务器中运行的 Orleans 客户端接收此 HTTP 调用,并将数据发布到匹配的设备流中:

C#
public async Task OnHttpCall(DeviceEvent deviceEvent)
{
     // Post data directly into the device's stream.
     IStreamProvider streamProvider =
        GrainClient.GetStreamProvider("MyStreamProvider");

    IAsyncStream<DeviceEventData> deviceStream =
        streamProvider.GetStream<DeviceEventData>(
            deviceEvent.DeviceId, "MyNamespace");

     await deviceStream.OnNextAsync(deviceEvent.Data);
}

在下面的另一个示例中,聊天用户(实现为 Orleans Grain)加入聊天室,获取此聊天室中所有其他用户生成的聊天消息流的句柄,并订阅该流。 请注意,聊天用户不需要知道聊天室 grain 本身(我们的系统中可能没有此类 grain)或该组中生成消息的其他用户。 不用说,若要发布到聊天流,用户不需要知道谁当前订阅了该流。 此示例演示了聊天用户如何在时间和空间上完全解耦。

C#
public class ChatUser: Grain
{
    public async Task JoinChat(Guid chatGroupId)
    {
        IStreamProvider streamProvider =
            base.GetStreamProvider("MyStreamProvider");

        IAsyncStream<string> chatStream =
            streamProvider.GetStream<string>(chatGroupId, "MyNamespace");

        await chatStream.SubscribeAsync(
            async (message, token) => Console.WriteLine(message))
    }
}

快速入门示例

快速入门示例很好地提供了在应用程序中使用流的整个工作流的快速概述。 阅读此示例后,应该阅读流编程 API 以更深入地了解概念。

流编程 API

流编程 API 提供编程 API 的详细说明。

流提供程序

流可以通过各种形状和形式的物理通道传送,并可以采用不同的语义。 Orleans 流式处理可以通过流提供程序的概念来支持这种多样性,这是系统中的一个扩展点。 Orleans 目前实现两个流提供程序:基于 TCP 的简单消息流提供程序和基于 Azure 队列的 Azure 队列流提供程序。 有关流提供程序的更多详细信息,请参阅流提供程序

流语义

流订阅语义:

Orleans 流保证流订阅操作的顺序一致性。 具体而言,当某个使用者订阅某个流时,一旦成功解析代表订阅操作的 Task,该使用者就会看到其订阅后生成的所有事件。 此外,可倒退流允许使用 StreamSequenceToken 从过去的任意时间点订阅。 有关详细信息,请参阅 Orleans 流提供程序

单个流事件传递保证:

单个事件传递保证取决于单个流提供程序。 一些保证机制根据“尽力而为”的原则提供“最多传递一次”保证(例如简单消息流 (SMS)),而另一些机制则提供“最少传递一次”保证(例如 Azure 队列流)。 甚至可以构建一个流式处理提供程序来保证“正好传递一次”(我们还没有此类提供程序,但你可以构建一个)。

事件传递顺序:

事件顺序也取决于特定的流提供程序。 在 SMS 流中,生成者通过控制其发布事件的方式,来显式控制使用者看到的事件的顺序。 Azure 队列流不保证 FIFO 顺序,因为基础 Azure 队列不保证在发生故障的情况下的顺序。 应用程序还可以使用 StreamSequenceToken 来控制其流传递顺序。

流实现

Orleans 流实现提供内部实现的概要概述。

代码示例

有关如何在 grain 中使用流式处理 API 的更多示例,请参阅此文。 我们计划在将来创建更多示例。

另请参阅