Compartilhar via


Tempo avançado do aplicativo

Os desenvolvedores do StreamInsight devem equilibrar as necessidades de fontes de dados que possam ter dados fora de ordem com os requisitos para processar eventos de uma forma altamente dinâmica. Embora ajude a reduzir a latência mais rapidamente, adiantar a hora do aplicativo reduz a janela para dados tardios (ou seja, a capacidade de dados chegarem fora de ordem). O StreamInsight fornece várias maneiras de justificar a hora do aplicativo. Este tópico descreve os diferentes níveis e políticas de avanço do tempo do aplicativo que podem ser definidos no nível de adaptador e com associações de consulta.

Noções básicas sobre o modelo temporal

O modelo temporal do StreamInsight se baseia apenas no tempo do aplicativo e nunca no tempo do sistema. Isso significa que todos os operadores temporais referenciam o carimbo de data/hora dos eventos e nunca o relógio do sistema do computador host. Como resultado, aplicativos devem comunicar a hora do seu aplicativo atual ao servidor StreamInsight. O tempo do aplicativo para determinado aplicativo depende de muitos aspectos diferentes no contexto do aplicativo. Basicamente, é responsabilidade do desenvolvedor do aplicativo fornecer o tempo do aplicativo apropriado para o servidor StreamInsight. As considerações principais para o tempo do aplicativo são:

  • Fontes de dados

    Quando fontes de dados comunicarem informações temporais, esses dados poderão ser usados para identificar o ponto no tempo em que todos os eventos foram recebidos da fonte de dados. Esse ponto no tempo constitui a hora do aplicativo atual em relação a essa fonte de dados. Observe que fontes de dados diferentes podem continuar em velocidades diferentes.

  • Dados fora de ordem

    Com algumas fontes de dados, os eventos nem sempre chegam na ordem dos seus carimbos de data/hora. Ou seja, os dados estão fora de ordem. O StreamInsight pode acomodar dados fora de ordem e garante que os resultados não dependam da ordem na qual os eventos chegam ao servidor StreamInsight. Os desenvolvedores do StreamInsight podem adiantar a hora do aplicativo com um pouco de folga para permitir que eventos fora de ordem surjam para as fontes de dados que têm eventos tardios.

  • Dinamismo do resultado

    As consultas do StreamInsight produzem resultados que são conhecidos como precisos até a hora do aplicativo atual. Isso significa que os resultados emergem de consultas do StreamInsight à medida que eles são finalizados pelo progresso da hora do aplicativo global.

incrementos de tempo atuais

Durante o processamento da consulta, a hora do aplicativo é controlada por eventos CTI (incremento de tempo atual). Um CTI é um evento de pontuação que é um componente central do modelo temporal do StreamInsight. Os CTIs são usados para confirmar sequências de eventos e divulgar resultados computados na saída da consulta, confirmando para o servidor StreamInsight que certas partes da linha do tempo não terão mais alterações. Portanto, é crucial enfileirar os CTIs junto com eventos no fluxo de evento de entrada para gerar qualquer resultado e liberar o estado de operadores com monitoração de estado.

Ao enfileirar o CTI, a entrada promete não produzir nenhum evento subsequente que possa influenciar o período antes do carimbo de data e hora do CTI. Isto implica que, depois que um CTI foi enfileirado na entrada:

  • Para eventos do formato Ponto, Intervalo ou início de Borda: a hora de início do evento precisa ser igual ou posterior ao CTI.

  • Para eventos do formato Borda de fim: a hora de término do evento precisa ser igual ou posterior ao CTI.

Se estas regras forem violadas, estaremos falando de uma violação de CTI. Abaixo descrevemos como estas violações são tratadas.

Há três métodos para inserir CTIs em um fluxo de entrada.

  1. Enfileirar os CTIs programaticamente através do adaptador de entrada, análogo ao enfileiramento de eventos.

  2. Gerar CTIs de forma declarativa com determinada frequência. Isso pode ser especificado através de AdvanceTimeGenerationSettings na fábrica do adaptador ou como parte da associação de consulta.

  3. Definir um fluxo de entrada separado como uma fonte de CTI. Isso só pode ser especificado na associação de consulta.

Sempre que são implementados os métodos 2 e 3, uma política para violações de CTI também deve ser implementada. Na próxima seção, você encontra uma discussão sobre AdvanceTimeGenerationSettings e as políticas de violação. As seções subsequentes descrevem como usar configurações de tempo avançado na fábrica do adaptador, bem como na associação de consulta.

Geração de CTI

A geração de CTIs (descrito anteriormente, nos métodos 2 e 3) tem duas dimensões:

  1. A frequência de geração, que é especificada como um inteiro N ou como um intervalo de tempo T. A política de frequência de geração insere um CTI após a ocorrência da contagem de evento (N) ou o intervalo de tempo (T).

  2. O carimbo de data/hora dos CTIs gerados, que é especificado como um atraso em relação ao último evento recebido.

Além disso, você pode usar um sinalizador booliano para indicar se um CTI final com um carimbo de data/hora de infinidade positiva deve ser inserido quando a consulta é desativada. Isso é usado para liberar todos os eventos restantes dos operadores da consulta.

A geração de CTI é definida por meio da classe AdvanceTimeGenerationSettings, cujo construtor obtém a frequência, o atraso e o sinalizador, conforme mostrado no exemplo a seguir.

var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(5), true);

Esse exemplo instrui o mecanismo a inserir um CTI depois de cada 10 eventos provenientes da origem do evento. O CTI transporta um carimbo de data/hora da último hora do evento menos cinco segundos. Esse mecanismo de atraso implementa efetivamente um período de cortesia de forma que a origem do evento possa enfileirar eventos tardios sem violar a semântica de CTI (desde que os eventos nunca mais atrasem mais de cinco segundos). Quando a consulta correspondente for desativada, um CTI com hora infinita será enfileirado.

Observe que, durante a especificação de uma frequência para a geração de CTI por meio de AdvanceTimeSettings, as bordas de fim não são levadas em consideração. Elas também não são consideradas durante o uso de uma duração como uma frequência. Apenas as bordas de início são levadas em consideração no caso de eventos de borda para frequência e duração.

Políticas de violação de CTI

É possível que uma origem de evento viole a semântica de CTI, enviando eventos com um carimbo de data/hora anterior aos CTIs inseridos. As configurações de hora avançada permitem a especificação de uma política para tratar tais ocorrências. A política pode ter um destes dois valores:

  • Descartar

    Eventos que violam o CTI inserido são descartados e não são enfileirados na consulta.

  • Ajustar

    Eventos que violem o CTI inserido serão modificados se o seu tempo de vida se sobrepuser ao carimbo de data e hora do CTI. Ou seja, o carimbo de data/hora de início dos eventos é definido como o carimbo de data/hora de CTI mais recente, de modo que estes eventos se tornem válidos. Se a hora de início e de término de um evento cair antes do carimbo de data/hora de CTI, o evento será descartado.

Configurações de tempo avançado do adaptador.

As configurações de tempo avançado do aplicativo podem ser especificadas na definição da fábrica do adaptador. Da mesma forma que o método Create() da fábrica é chamado sempre que é criada uma instância do adaptador, um método correspondente é chamado para definir as configurações de avanço de hora da instância do adaptador. Para fazer isso, use a interface ITypedDeclareAdvanceTimeProperties para um adaptador tipado (ou IDeclareAdvanceTimeProperties para um adaptador não tipado), conforme é mostrado no exemplo a seguir.

public class MyInputAdapterFactory : ITypedInputAdapterFactory<MyInputConfig>,
                                     ITypedDeclareAdvanceTimeProperties<MyInputConfig>

Essa interface exige a implementação do método a seguir como parte da fábrica.

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(MyInputConfig configInfo, EventShape eventShape)
{
    var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(0), true);
    var ats = new AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);
    return ats;
}

O método DeclareAdvanceTimeProperties() é chamado para cada adaptador recém-instanciado com a mesma estrutura de configuração e parâmetro de forma de evento especificado na chamada de método Create() correspondente. Isso permite ao autor do adaptador derivar as configurações de geração de CTI corretas a partir das informações de configuração, sem exigir que o gravador e binder de consultas conheça as particularidades das configurações de tempo avançado.

O construtor AdapterAdvanceTimeSettings requer o objeto AdvanceTimeGenerationSettings e a política de violação descritos anteriormente.

Geração de CTI na associação de consulta

Da mesma forma que AdapterAdvanceTimeSettings, a emissão de CTIs pode ser especificada de forma declarativa na associação de consulta, conforme é mostrado no exemplo a seguir. Isso permite ao usuário que associa a consulta definir o comportamento de tempo do aplicativo independentemente da implementação do adaptador.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

O construtor AdvanceTimeSettings usa estes três argumentos:

  1. Um objeto AdvanceTimeGenerationSettings

  2. Um objeto AdvanceTimeImportSettings

  3. A política de violação

Observe que as configurações de geração ou os argumentos de configurações de importação podem ser definidos como nulos, mas não ambos. Além disso, eles podem ser especificados juntos. A próxima seção apresenta a classe AdvanceTimeImportSettings.

O exemplo anterior especifica que deve-se gerar e inserir um CTI com cada evento, com o carimbo de data/hora do evento (sem atraso). O objeto AdvanceTimeSettings pode ser passado como um último parâmetro opcional para o método CepStream.Create() conforme mostrado no exemplo a seguir.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

var inputstream = CepStream<MyPayloadType>.Create("inputStream",
                                                  typeof(MyInputAdapterFactory),
                                                  new MyConfiguration(),
                                                  EventShape.Point,
                                                  ats);

Ele também pode ser usado no modelo de desenvolvimento de binder de consulta:

queryBinder.BindProducer<MyPayloadType>("filterInput",
                                        inputAdapter,
                                        new MyConfiguration(),
                                        EventShape.Point,
                                        ats);

Sincronizando com outro fluxo

Quando usados durante a associação de consulta, além de (ou em vez de) gerar os CTIs com base em uma frequência, eles podem ser copiados de outro fluxo de entrada na consulta usando AdvanceTimeImportSettings. Esse recurso habilita a sincronização de dois fluxos, conforme mostrado no exemplo a seguir.

var dataStream = CepStream<DataType>.Create("dataStream ",
                                            typeof(DataInputAdapterFactory),
                                            new MyDataAdapterConfiguration(),
                                            EventShape.Point);

var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);

var lookupStream = CepStream<LookupType>.Create("lookupStream",
                                                typeof(ReferenceInputAdapterFactory),
                                                new MyReferenceConfiguration(),
                                                EventShape.Edge,
                                                ats);

var joined = from eLeft in dataStream
             join eRight in lookupStream
             where ...

Esse exemplo demonstra um caso de uso típico no qual um fluxo de dados "rápido" precisa ser unido a um fluxo de referência "lento". O fluxo lento pode corresponder a dados de pesquisa que são alterados com menor frequência do que o fluxo rápido. Para tornar a saída de produção de junção tão rápida quanto sua entrada mais rápida, o fluxo de entrada lento é sincronizado ao fluxo rápido através da importação de seus CTIs. Neste exemplo, considera-se o adaptador como local onde ocorre a manipulação do tempo do fluxo rápido do aplicativo.

Dinamismo do resultado

O parâmetro de atraso das configurações de geração de tempo avançado especifica o carimbo de data/hora dos CTIs inseridos. É importante compreender a semântica precisa de CTIs na estrutura do StreamInsight para obter o efeito desejado para o dinamismo da saída. Um CTI afirma ao mecanismo que tudo na linha do tempo estritamente antes do carimbo de data/hora do CTI está confirmado. Esteja ciente das seguintes implicações dessa diferença:

Por exemplo, considere um fluxo de entrada de eventos pontuais e uma configuração de geração de CTI com frequência 1 (cada evento) e atraso 0. Isso gera CTIs com exatamente o mesmo carimbo de data/hora de cada evento de ponto. Porém, isso significa que o último evento de ponto será confirmado apenas com o próximo CTI, pois seu carimbo de data/hora não está estritamente antes do CTI correspondente. Para confirmar cada evento de ponto assim que ele for emitido pelo adaptador, os CTIs devem receber carimbo de data/hora logo após os eventos de ponto. Isso é convertido em um atraso negativo de uma escala, conforme mostrado no exemplo a seguir.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true);

Os CTIs e os operadores de consulta

Os CTIs são enfileirados pelo adaptador de entrada ou injetados conforme a descrição acima. Eles se propagam pela consulta e são processados de maneira diferente por determinados operadores. Operadores de junção, por exemplo, liberam seus resultados até o CTI mais antigo de qualquer um dos lados. Os operadores de união liberam o resultado mais antigo dos CTIs mais recentes de qualquer um dos lados. A consulta inteira só liberará seu resultado somente até o CTI mais recente.

Por outro lado, determinados operadores têm um efeito em carimbos de data/hora de CTI. As janelas de salto passam os CTIs de uma janela para o início da janela porque o resultado da operação acima da janela pode ser alterado enquanto os eventos continuam entrando nessa janela. Os métodos ShiftEventTime() e AlterEventLifeTime() alteram a hora de início dos eventos, e a mesma transformação se aplicará aos CTIs.

Consulte também

Conceitos

Criando adaptadores de entrada e saída

Conceitos do servidor StreamInsight

Histórico de alterações

Conteúdo atualizado

Adicionada a seção "CTIs e operadores de consulta".

Adicionadas informações na seção "Geração de CTI" de que as bordas de fim não são levadas em conta durante a especificação de uma frequência de CTI por meio de AdvanceTimeSettings.