Compartilhar via


Operadores de fluxo definidos pelo usuário

Um operador de fluxo definido pelo usuário permite a definição do processamento personalizado dos fluxos de eventos.

Padrão de uso

Em uma consulta, você chama um operador de fluxo definido pelo usuário utilizando o método de extensão Scan do CepStream. Você fornece o fluxo de entrada e o estado inicial do operador, conforme mostrado no exemplo a seguir.

var query = input.Scan(new SmoothingOperator(0.7));

O autor do operador deriva uma nova classe dos tipos abstratos CepPointStreamOperator ou CepEdgeStreamOperator. O novo tipo encapsula a máquina de estado do operador. Uma chamada para o construtor desse tipo é passada para o operador de exame para estabelecer o estado inicial do operador.

Características de um operador de fluxo definido pelo usuário

Um operador de fluxo definido pelo usuário permite a interação do usuário com o fluxo de eventos de modo processual. Assim, ele representa um limite entre o mecanismo de processamento de eventos do StreamInsight e o CLR, semelhante aos adaptadores e às agregações ou operadores definidos pelo usuário. Em todos esses casos, o mecanismo e o desenvolvedor estão cumprindo um contrato sobre as propriedades temporais de um fluxo. O mecanismo do StreamInsight oferece as seguintes garantias ao operador de fluxo definido pelo usuário a fim para se comportar de forma determinística em uma sequência específica de eventos de entrada de consulta:

  • Um operador de fluxo definido pelo usuário receberá os eventos ordenados por hora de sincronização (hora de início dos eventos pontuais e eventos de borda de início; hora de término dos eventos de borda de fim). Não há suporte para os eventos de intervalo porque eles não têm uma representação direta ordenada por hora de sincronização, já que cada evento indica dois momentos determinados, um inicial e um final.

  • Somente os eventos de inserção são passados para um operador de fluxo definido pelo usuário. Os eventos CTI (incremento de tempo atual) do fluxo de entrada são transparentes para o operador de fluxo definido pelo usuário, mas eles ainda determinam como o operador de fluxo definido pelo usuário percebe a passagem de tempo (consulte NextCti a seguir).

  • Um operador de fluxo definido pelo usuário pode ser desativado pelo StreamInsight dependendo do local em que ele permite isso (consulte IsEmpty a seguir). Um operador de fluxo desativado definido pelo usuário pode ser reciclado pelo StreamInsight.

  • Cada evento de inserção faz com que ProcessEvent seja chamado, seguido pela sondagem das propriedades NextCti e IsEmpty.

Entrada e saída de um operador de fluxo definido pelo usuário

Um operador de fluxo definido pelo usuário processa um evento de entrada por vez. Em resposta a cada evento de entrada, ele pode produzir 0-* eventos de saída. O operador também pode atualizar seu estado interno em resposta a uma entrada. Um evento de entrada pode ser um CTI, gerado na solicitação do operador para indicar a passagem de tempo, ou uma inserção. As entradas são anotadas temporariamente.

Em contraposição, um evento de saída é simplesmente uma carga de evento. Não há oportunidade para inserir carimbo de data/hora em eventos de saída ou injetar CTIs no fluxo de saída. Os eventos de saída são gerados como eventos pontuais, com carimbos de data/hora que se baseiam nos carimbos de data/hora dos eventos de entrada correspondentes.

Manipulando a hora em um operador de fluxo definido pelo usuário

Quando você cria um novo operador de fluxo definido pelo usuário, seu código precisará processar apenas a carga dos eventos. A hora é manipulada exclusivamente pelo StreamInsight. Os eventos de entrada são recebidos na ordem. O carimbo de data/hora de cada evento de saída se baseia no carimbo de data/hora do evento de entrada correspondente. Por exemplo, se um evento de fim de borda disparar um evento de saída, este receberá o carimbo de data/hora do evento de fim de borda. Portanto, o operador pode ser influenciado pela hora, mas não pode controlá-la.

O operador de fluxo definido pelo usuário não recebe CTIs diretamente do fluxo de entrada no método ProcessEvent(), mas tem capacidade de reagir à passagem de hora através da propriedade NextCti. Essa propriedade é sondada pelo mecanismo após cada chamada para ProcessEvent(). O operador de fluxo definido pelo usuário pode retornar um carimbo de hora que indica o próximo carimbo de data/hora do CTI que ele receberá como chamada para ProcessEvent().

Somente os CTIs que foram solicitados através da definição da propriedade NextCti serão passados para ProcessEvent. Esses CTIs não serão propagados fora do operador de fluxo definido pelo usuário.

Implementando um operador de fluxo definido pelo usuário

Para criar um novo operador de fluxo definido pelo usuário, derive uma nova classe das classes base abstratas CepPointStreamOperator ou CepEdgeStreamOperator.

  • Se você derivar da classe base abstrata CepPointStreamOperator, o operador verá os eventos de entrada como eventos pontuais. No entanto, isso não será um erro se os eventos não forem pontuais. O operador verá somente as horas de início.

  • Se você derivar da classe base abstrata CepEdgeStreamOperator, o operador verá as bordas de início e de fim dos eventos de entrada.

Na classe derivada, você substitui as seguintes propriedades e métodos:

  • Método ProcessEvent. Gera a saída e atualiza o estado interno do operador em resposta a cada evento de entrada. ProcessEvent recebe um evento de entrada e pode retornar zero ou mais cargas de saída.

  • Propriedade IsEmpty. Indica se o estado interno do operador é vazio. Quando for true, o mecanismo de consulta do StreamInsight poderá descartar a instância do operador para reduzir a utilização da memória.

  • Se desejar, o método NextCti. Indica o próximo momento determinado em que um evento CTI será enviado ao operador. A substituição dessa propriedade permite que o operador definido pelo usuário produza a saída em determinado momento no futuro ou indique que seu estado interno é vazio após a decorrência de certo intervalo de tempo do aplicativo.

A classe derivada também deve implementar a serialização WFC. Para obter mais informações, consulte Como criar um contrato de dados básico para uma classe ou estrutura

Como o mecanismo StreamInsight interage com o operador

Para cada instância do operador, o método ProcessEvent é chamado com eventos na ordem da hora de sincronização. Para um evento pontual ou CTI, a hora de sincronização é a hora de início válida. Para um evento de borda, a hora de sincronização é a hora de início válida para bordas de início ou a hora de término válida para bordas de fim.

Após cada chamada para o método ProcessEvent, as propriedades IsEmpty e NextCti são sondadas.

Quando o operador substitui NextCti, o mecanismo garante que o próximo evento processado pelo operador será um evento de inserção com uma hora de sincronização menor que o valor NextCti ou um CTI com o valor NextCti como hora de início. Se o operador retornar um valor NextCti menor ou igual à hora de sincronização do último evento processado, ele será ignorado. A propriedade NextCti permite que o operador “traduza” o progresso da hora no fluxo de entrada em seu próprio ritmo (no formato desses CTIs internos) e reaja a esse progresso apropriadamente.

Os operadores são ativados em resposta somente aos eventos de inserção. Os CTIs não disparam a ativação. Um operador é desativado quando retorna true de IsEmpty.

Em qualquer ponto, o mecanismo pode optar por serializar o operador e liberar sua referência para a ele. Quando o operador é desserializado posteriormente, espera-se encontrá-lo no local onde ele foi deixado.

Exemplos de operadores de fluxo definidos pelo usuário

Suavização exponencial

Esse operador de fluxo definido pelo usuário trata um fluxo de eventos pontuais como uma sequência de valores e aplica a suavização exponencial. Observe que é necessária uma referência a System.Runtime.Serialization.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Implements exponential smoothing.
/// </summary>
[DataContract]
public sealed class SmoothingOperator : CepPointStreamOperator<double, double>
{
    [DataMember]
    readonly double _smoothingFactor;

    [DataMember]
    double? _previousValue;

    public SmoothingOperator(double smoothingFactor)
    {
        _smoothingFactor = smoothingFactor;
    }

    public override IEnumerable<double> ProcessEvent(PointEvent<double> inputEvent)
    {
        // The result is a function of the previous result and the current input value.
        _previousValue = _previousValue.HasValue
            ? (1.0 - _smoothingFactor) * _previousValue.Value + _smoothingFactor * inputEvent.Payload
            : inputEvent.Payload;

        yield return _previousValue.Value;
    }

    public override bool IsEmpty
    {
        get { return false; }
    }

Correspondência de padrões

Este exemplo simples de correspondência de padrões ilustra um uso alternativo de IsEmpty e NextCti. Neste exemplo, o operador procura um evento com o valor 1.0 que não seja seguido por um evento com o valor 2.0 dentro de trinta segundos. (Este exemplo é fornecido para ilustrar conceitos úteis em operadores de fluxo definidos pelo usuário. Em um aplicativo real, esse padrão é simples o suficiente para ser implementado através de operadores internos do StreamInsight.)

O exemplo anterior usou NextCti para controlar o tempo de vida de um operador. Esse exemplo também usa NextCti para esta finalidade, mas, além disso, usa NextCti para produzir a saída em resposta à passagem de tempo.

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Indicates when an event with value '1' is followed by an event with value '2'
/// within thirty seconds.
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
    [DataMember]
    DateTimeOffset? _nextCti;

    [DataMember]
    // Tracks timestamps for all events with value '1'.
    readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

    public override bool IsEmpty
    {
        // The operator is empty when we are not tracking any events with value '1'.
        get { return _active.Count == 0; }
    }

    public override DateTimeOffset? NextCti
    {
        get { return _nextCti; }
    }

    public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
    {
        // Produce output in response to the passage of time. Any active '1' event
        // not matched by a '2' event within thirty seconds matches the pattern.
        while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
        {
            yield return _active.Dequeue().UtcDateTime;
        }

        // Update operator state based on new input event.
        if (inputEvent.EventKind == EventKind.Insert)
        {
            if (inputEvent.Payload == 1)
                _active.Enqueue(inputEvent.StartTime);
            else if (inputEvent.Payload == 2)
                _active.Clear();

        }

        // Schedule wake-up after thirty seconds so that we can produce output
        // if needed.
        if (_active.Count > 0)
        {
            _nextCti = _active.Peek().AddSeconds(30);
        }
    }
}
}

Definindo um método auxiliar para simplificar o uso

Talvez seja necessário simplificar o uso do operador em uma consulta. Por exemplo, seria mais conveniente para o autor da consulta escrever input.Smooth(0.5) do que input.Scan(new SmoothingOperator(0.5)).

Você pode habilitar esse padrão simplificado criando um método de extensão personalizado como o seguinte:

        static CepStream<EventType1> Smooth(this CepStream<EventType1> source, double smoothingFactor)
        {
            if (null == smoothingFactor)
            {
                throw new ArgumentNullException("source");
            }

            return source.Scan(new SmoothingOperator(smoothingFactor));
        }