タイム スタンプの変更
このトピックの例では、演算子を使用してイベントのタイムスタンプを変更する方法を示します。イベントのタイムスタンプを変更することによって、結合、ウィンドウでの集計など、後続の演算でのイベントの効力を変更できます。次の LINQ 拡張メソッドは、この機能を実装します。
イベントの時刻の変更
ShiftEventTime() 演算子は、指定された式に従って、ストリーム内の各イベントの開始時刻を変更します。
次の例では、ストリーム内の各イベントの時刻を 15 分進めています。
// shift events by 15 minutes into the future.
var shifted = inputStream.ShiftEventTime(e => TimeSpan.FromMinutes(15));
次の例では、ストリーム内の各イベントの時刻を 1 時間戻しています。
// shift events by 1 hour into the past.
var shifted = inputStream.ShiftEventTime(e => TimeSpan.FromHours(-1));
時間の変更を指定する式は、現在のイベントの開始時刻を参照することができますが、その終了時刻またはペイロードを参照できません。変更により、イベントの存続期間またはペイロードに影響が及ぶことはありません。
値 DateTime.MinValue は、負の無限大の時刻値をエンコードすると見なされます。指定された式 (定数ではなく) で、この値を持つイベント開始時刻が参照されていた場合、その式は評価されず、開始時刻は DateTime.MinValue のままになります。それ以外の場合は式が実行時に評価されますが、オーバーフロー例外になる可能性があります。
ShiftEventTime はストリーム内のすべてのイベントの開始時刻に影響するため、指定した時間の変更は、この演算子を経由して CTI イベントにも適用されることに注意してください。
イベントの期間の変更
AlterEventDuration() 演算子は、イベントの存続期間を変更します。イベントの存続期間とは、イベントが有効である期間を示します。この期間は、イベントの開始時刻、終了時刻、またはペイロードから算出できるように、イベントの関数として定義されています。
次の例では、イベントの期間を 1 時間に設定しています。
// set event duration to 1 hour.
var onehour = inputStream.AlterEventDuration(e => TimeSpan.FromHours(1));
次の例では、イベントの期間を現在の存続期間の 2 倍に設定しています。
// double event duration.
var doubled = inputStream.AlterEventDuration(e => (e.EndTime - e.StartTime) * 2);
値 DateTime.MaxValue は、正の無限大の時刻値をエンコードすると見なされます。指定された式で、この値を持つイベント終了時刻が参照されていた場合、その式は評価されず、終了時刻は DateTime.MaxValue のままになります。
イベントの時刻と期間の同時変更
AlterEventLifetime() 演算子は、AlterEventDuration 関数と ShiftEventTime 関数の 2 つを組み合わせて、コードの表現性を高めたものです。
AlterEventLifeTime() メソッドの最初のパラメーターは、新しい開始タイムスタンプを指定し、現在のイベントの開始時刻を参照できます。このパラメーターは、UTC 時刻として指定する必要があります。2 番目のパラメーターは、新しい有効期間を指定し、現在のイベントの開始時刻、終了時刻、ペイロードの各フィールドを参照できます。
次の例では、イベントの時刻を 1 分前に戻していますが、2 番目のパラメーターに新しい存続期間を指定するときに、元の存続期間に 1 分加算しているため、イベントの終了時刻に変更はありません。
// shift event 1 minute into the past, but leave the end time (event duration) unchanged.
var newStream = inputStream.AlterEventLifetime(e => e.StartTime - TimeSpan.FromMinutes(1),
e => e.EndTime - e.StartTime + TimeSpan.FromMinutes(1));]
指定した開始時刻の変更は、この演算子を経由して CTI イベントにも適用されることに注意してください。
前の DateTime.MinValue および DateTime.MaxValue に関する説明も参照してください。
ストリームからポイント イベント ストリームへの変換
ToPointEventStream 演算子は、次の例に示されたように、エッジ イベントと期間イベントをポイント イベントに変換する (イベントの存続期間をイベントの開始時刻より 1 ティック分遅く変更する) 場合に便利な関数です。
var pointStream = inputStream.ToPointEventStream();
間隔イベントがポイント イベントに変換されるときには、イベントの開始時刻だけが保持されます。
イベントの期間のクリップ
ClipEventDuration 演算子は、2 つのストリームをパラメーターとして受け取り、1 番目のストリーム内の各イベントの存続期間を、2 番目のストリーム内の次に一致するイベントの開始時刻に従って変更します。
これまでは、固定された期間によってイベントの存続期間を変更するための演算子について説明してきました。ClipEventDuration 演算子は、他のイベントに応じてイベントの存続期間を調整する非常に柔軟な方法を提供します。通常、この演算子は、特定のストリームに対して指定され、一致条件と共に、別のストリームをパラメーターとして受け取ります。この演算子は、1 番目のストリーム内の各イベントの存続期間を、一致条件を満たす別のストリーム内の (アプリケーション時間において) "次の" イベントの開始時刻にクリップします。
たとえば、ペイロード フィールド "Id" を持つイベントを送信する 2 つのストリーム (stream1 と stream2) があるとします。次のステートメントは、stream1 内のすべてのイベントを、"Id" の値が同じである stream2 内の次のイベントにクリップします。
var clipped = stream1.ClipEventDuration(stream2, (e1, e2) => e1.Id == e2.Id);
一致条件は、両方の入力ペイロードに関する式として指定されます。このステートメントのセマンティクスを次の図に示します。
図は、stream1 内の Id が A である最初のイベントを、stream2 内の Id が A である次のイベントにクリップする方法を示しています。stream1 内の Id が B であるもう 1 つのイベントはクリップされません。これは、stream1 内のこのイベントの終了後のみに、stream2 内の次に一致するイベントが発生するためです。
このクリップ動作は、広範な適用が可能です。これによって実現できる一般的な要件の 1 つは、ポイントのストリームを連続する間隔のストリーム ("シグナル" とも呼ばれる) に変換することです。
ポイントからシグナルへの変換
この場合、まず、すべてのポイント イベントを拡張して、次のイベントに実際に達するようにする必要があります。つまり、次のイベントが発生すると想定されるときまでのイベント存続期間を決定するタイムアウトを適用する必要があります。このタイムアウトは、有限の期間にすることも無限の期間にすることもできます。ここでは、タイムアウトを 60 秒とします。
var extended = input.AlterEventDuration(e => TimeSpan.FromSeconds(60));
この準備では、ClipEventDuration 演算子を使用して、ストリーム自体をパラメーターとして指定できます。これにより、各イベントが同じストリーム内の次のイベントにクリップされ、連続する一連の間隔イベントが作成されます。クリップ操作で問題となるのは 2 番目のストリームの開始時刻だけなので、元のポイント ストリームを使用することもできます。
var signal = extended.ClipEventDuration(input, (e1, e2) => true);
ここでは、ストリーム内のすべてのイベントが 1 つのデータ ソースに関連付けられている場合のように、1 つの論理ストリームを対象としていると想定しており、一致条件は常に true に評価されます。
次の図は、ClipEventDuration 演算子によるポイントからシグナルへの変換の効果を示しています。
2 つの LINQ ステートメントを組み合わせて 1 つのステートメントにすることができます。
var signal = input.AlterEventDuration(e => TimeSpan.FromSeconds(60)).ClipEventDuration(input, (e1, e2) => true);
複数のデバイスの測定値や、複数の株価など、ストリームに複数の論理ストリームが含まれている場合は、ブール式で各キー (デバイス ID や銘柄記号) を照合する必要があります。
var signal = input.AlterEventDuration(e => TimeSpan.FromSeconds(60)).ClipEventDuration(input, (e1, e2) => e1.Symbol == e2.Symbol);
セッションの作成
ClipEventDuration のもう 1 つの使用法は、セッション中に発生したイベントに注釈を付けるためにセッション イベントを作成することです。次のイベント スキーマがあるとします。これは、ユーザーによる対話型操作のイベントを表します。
public class EventType
{
public int UserId;
public string Type;
public DateTime Time;
public byte[] data;
};
この例では、ペイロード フィールド Type は、"start"、"end"、または "other" のいずれかになり、それぞれ、ユーザー セッションの開始、セッションの終了、セッション中のユーザー イベントを表します。フィールド Time には対話型操作のタイムスタンプが格納され、data には詳細情報が格納されます。タスクは、イベントが発生したセッションの開始時刻の各イベントに注釈を付けることです。さらに、各セッションは 10 分後にタイムアウトするとします。
次の図は、このシナリオの一連のサンプル イベントを示しています。
まず、タイムアウトに応じた期間の拡張を、種類が "start" であるすべてのイベントに適用します。
var sessionStarts = from e in input
where e.Type == “start”
select e;
var sessionStartsExt = sessionStarts.AlterEventDuration(e => TimeSpan.FromMinutes(10));
次に、これらのセッション イベントを、ユーザー ID ごとに、各イベントの終了までにクリップする必要があります。
var sessionEnds = from e in input
where e.Type == “end”
select e;
var sessions = sessionStartsExt.ClipEventDuration(sessionEnds, (e1, e2) => e1.UserId == e2.UserId);
次の図は、これらのステートメントを示しています。
これで、セッション イベントを残りのイベントに結合できます。
var sessionActivity = from e in input
where e.Type == “other”
select e;
var annotated = from s1 in sessions
join s2 in sessionActivity
on s1.UserId equals s2.UserId
select new {
s2.UserId,
s2.Type,
s2.Time,
s2.Data,
SessionStart = s1.Time
}
この結合では、sessionActivity イベントおよびセッション イベントのフィールドを参照できるので、セッションの開始時刻を各 sessionActivity イベントに取り込んで、注釈を付けた sessionActivity イベントを作成できます。
結合条件は UserId が等しいことであるため、sessionActivity の UserId が Y であるイベントは、UserId が X であるこの特定のセッションでは考慮されません。
一連の LINQ ステートメントは、次のように、簡潔にまとめることができます。
var sessions = input
.Where(e => e.Type == “start”)
.AlterEventDuration(e => TimeSpan.FromMinutes(10))
.ClipEventDuration(input.Where(e => e.Type == “end”), (e1, e2) => e1.UserId == e2.UserId);
var annotated = from s1 in sessions
join s2 in input.Where(e => e.Type == “other”)
on s1.UserId equals s2.UserId
select new {
s2.UserId,
s2.Type,
s2.Time,
s2.Data,
SessionStart = s1.Time
}