次の方法で共有


StreamInsight

イベント ストリームを使いこなす: 高速概算集計

Michael Meijer

サンプル コードのダウンロード

 

クリックストリーム、センサー データ、クレジットカードの取引データ、インターネット トラフィックなど、大量のイベントがほぼ無限に続くストリームがあるとします。このようなストリームは、すべてのイベントを格納したり、複数回のパスで分析したりすることは不可能です。最新の数イベントを含むウィンドウを使って分析を簡略化するのはどうでしょう。

ストリームの中で最も新しいイベント N 個を含む大きなウィンドウの中から、関心のあるイベントの数をカウントするとします。最も簡単なのは、N 個すべてのイベントをメモリ内に収め、すべてを反復してカウントする方法です。新しいイベントが到着するとウィンドウがスライドし、最も古いイベントの有効期限が切れ、新しいイベントが挿入されます。新しいウィンドウで最初からカウントし直すと、最初と最後以外の N-2 個のイベントをカウントし直すという無駄な時間が費やされることになります。これでは面倒です。今回は、このようなやり方で必要になる細かい処理に費やされるメモリ領域や処理時間を削減しながら、一般的なハードウェアで数千イベント/秒のイベント レートをサポートするデータ構造について説明します。また、マイクロソフトのストリーミング データ プロセッサ StreamInsight 2.1 向けに、ユーザー定義ストリーム演算子にこのデータ構造を埋め込む方法を C# で示します。今回は中級レベルのプログラミング スキルが必要で、StreamInsight の経験があるとさらに役立つでしょう。

集計とは

StreamInsight について説明する前に、集計という一見ささいな問題について考えてみます。説明を簡単にするために、ストリーム内で関心のないイベントはペイロードとして 0 を持ち、関心のあるイベントはペイロードとして 1 を持つとします (何に "関心がある" かは問いません)。最も新しい N 個のイベントを含み、カウントベース (固定サイズ) のウィンドウで、1 の数をカウントします。最もありふれたやり方をすれば、O(N) の時間と領域が必要になります。

勘の鋭い読者なら、連続する各ウィンドウ間でカウントを保持し、新しく 1 がきたらをカウントを増やし、期限切れになるのが 1 だったらカウントを減らして、処理済みの N-2 個のイベントはウィンドウ間で共有する方法をお考えになるかもしれません。すばらしい考え方です。これでカウントの管理に必要な時間は、O(1) になります。しかし、期限が切れるイベントをカウントから減らすべきかどうか判断できるでしょうか。実際のイベントを把握しなければ、カウントを管理できません。残念ながら、期限切れになるイベントを把握するには、メモリ内にウィンドウをすべて保持しておく必要があり、O(N) の領域は必要です。別の考え方として、関心のないイベントをフィルターで除外し、残りの関心のあるイベントのみをカウントする方法があります。しかし、これでも計算処理の複雑さは緩和されず、ウィンドウのサイズが可変になるという問題が生じます。

このように面倒なメモリを使いこなすことは可能でしょうか。もちろんできます。ただし、正確さを犠牲にし、処理時間とメモリ領域の間で妥協が必要です。Mayur Datar、Aristides Gionis、Piotr Indyk、および Rajeev Motwani の各氏は、先進的な論文「Maintaining Stream Statistics over Sliding Windows」(スライド ウィンドウでストリーム統計を管理する、stanford.io/SRjWT0、英語) で、指数ヒストグラムというデータ構造について説明しています。このデータ構造では、有限相対誤差 ε 以内で最新 N 個のイベントの概算カウントを保持します。つまり、常に次の条件が維持されます。

|exact count – approximate count|  ≤ ε, where 0 < ε < 1 
       exact count

概念的には、ヒストグラムは複数のバケットにイベントを格納します。各バケットは最初にイベントを 1 つ含み、カウント 1 と、イベントのタイムスタンプを保持します。イベントが到着すると、期限が切れるバケット (期限が切れるイベントを含むバケット) を削除します。到着したイベントが関心のあるイベントの場合のみ、バケットを作成します。時間の経過と共に複数のバケットが作成されるので、メモリを節約するためにマージします。バケットは、最新のバケットから最後のバケットまで、カウントが指数的 (1, 1, ..., 2, 2, ..., 4, 4, ..., 8, 8) に増加するようにマージします。そうすると、サイズ N のウィンドウ内でのバケット数は対数的になります。もっと正確に言うと、O(1⁄ε  log N) の時間と領域を保持する必要があります。最後のバケット以外は、期限が切れていないイベントのみを含みます。最後のバケットには、期限が切れていないイベントを少なくとも 1 つ含みます。カウントは推定値となるため、全体のカウントを概算する際に誤差が生じます。したがって、最後のバケットは、相対誤差を上限として、十分小さく保つ必要があります。

ここからは、C# での指数ヒストグラムの実装を示しますが、数学については最低限の説明にとどめます。難解な詳細については前述の論文をお読みください。コードを示し、例を図解します。ヒストグラムは、後半で開発する StreamInsight ユーザー定義ストリーム演算子のビルディング ブロックになります。

バケットにするかどうか

以下は、Bucket クラスです。

[DataContract]
public class Bucket
{
  [DataMember]
  private long timestamp;
  [DataMember]
  private long count;
  public long Timestamp {
    get { return timestamp; }
    set { timestamp = value; } }
  public long Count { get { return count; } set { count = value; } }
}

クラスには、(関心のある) イベントのカウントと、最新イベントのタイムスタンプを含めます。前述のように、最後のバケットだけに期限が切れるイベントを含むことができますが、ほかのバケットはイベントを少なくとも 1 つ含み、すべて期限が切れていないイベントである必要があります。したがって、最後のバケット以外のカウントは正確です。最後のバケットのカウントは、ヒストグラムから推定する必要があります。期限が切れるイベントのみを含むバケットは、バケット自体が期限切れになり、ヒストグラムから削除することができます。

指数ヒストグラムは、2 つの操作だけを使用して、最新 N 個のイベントの中で関心のあるイベントのカウントが相対誤差の上限 ε を超えないことを保証します。1 つ目の操作は、新しいイベントと期限が切れるイベントでヒストグラムを更新し、バケットのメンテナンスを行います。2 つ目の操作は、バケットの概算カウントを照会します。ヒストグラム クラスの概要を、図 1 に示します。バケットのリンク リスト以外の主な変数には、ウィンドウのサイズ (n)、相対誤差の上限 (epsilon)、およびすべてのバケット カウントをキャッシュした合計 (total) があります。コンストラクターでは、ウィンドウ サイズ、相対誤差の上限を指定して、空のリストを初期値とするバケットが設定されます。

図 1 指数ヒストグラム クラスの概要

[DataContract]
public class ExponentialHistogram
{
  [DataMember]
  private long n;
  [DataMember]
  private double epsilon;
  [DataMember]
  private long total;
  [DataMember]
  private LinkedList<Bucket> buckets;
  public ExponentialHistogram(long n, double epsilon)
  {
    this.n = n;
    this.epsilon = epsilon;
    this.buckets = new LinkedList<Bucket>();
  }
  public void Update(long timestamp, bool e) { ... }
  protected void ExpireBuckets(long timestamp) { ... }
  protected void PrependNewBucket(long timestamp) { ... }
  protected void MergeBuckets() { ... }
  public long Query() { ... }
}

ヒストグラムのメンテナンスは、次の Update メソッドで実行します。

public void Update(long timestamp, bool eventPayload)
{
  RemoveExpiredBuckets(timestamp);
  // No new bucket required; done processing
  if (!eventPayload)
    return;
  PrependNewBucket(timestamp);
  MergeBuckets();
}

実測時間ではなく、離散時間のタイムスタンプを受け取り、最新 N 個のイベントを決定します。このタイムスタンプを使用して、期限が切れるバケットを見つけて削除します。新しいイベントのペイロードが 0 (false) の場合、処理は行いません。新しいイベントのペイロードが 1 (true) の場合、新しいバケットを作成し、バケットのリストに追加します。工夫しているのは、バケットのマージです。Update メソッドから呼び出されているメソッを順番に説明します。

以下は、バケットを削除するコードです。

protected void RemoveExpiredBuckets(long timestamp)
{
  LinkedListNode<Bucket> node = buckets.Last;
  // A bucket expires if its timestamp
  // is before or at the current timestamp - n
  while (node != null && node.Value.Timestamp <= timestamp - n)
  {
    total -= node.Value.Count;
    buckets.RemoveLast();
    node = buckets.Last;
  }
}

削除の処理は、最も古い (最後の) バケットから始め、期限切れにならないバケットを最初に見つけた時点で終わります。最新イベントのタイムスタンプが期限を過ぎた各バケット (現在のタイムスタンプからウィンドウ サイズを引いた値よりも小さいタイムスタンプを持つバケット) を、リストから削除します。この比較に離散型タイムスタンプを使用します。すべてのバケット カウントの合計 (total) から、期限が切れるバケットのカウントを減算します。

期限が切れるイベントとバケットを処理したら、新しいイベントを処理します。

protected void PrependNewBucket(long timestamp)
{
  Bucket newBucket = new Bucket()
  {
    Timestamp = timestamp,
    Count = 1
  };
  buckets.AddFirst(newBucket);
  total++;
}

ペイロードが 1 (true) のイベントの新しいバケットは、カウント 1 と現在のタイムスタンプを指定して作成します。新しいバケットをバケットのリストに追加し、すべてのバケット カウントの合計 (total) を 1 つ増やします。

メモリ領域の節約と誤差の上限の処理は、バケットのマージで行います。このコードを、図 2 にリストします。バケットは、連続するバケットのカウントが 1, 1, ..., 2, 2, ..., 4, 4, ..., 8, 8 と指数的に増加するようにマージします。同じカウントを持つバケットの数は、相対誤差の上限 ε の選択によって決まります。バケットの総数はウィンドウのサイズ n とともに対数的に増加するため、メモリ領域が節約されます。マージできるバケットの数に制限はありませんが、最後のバケットのカウントは相対誤差の上限を超えないように (他のバケット カウントの合計と比較して) 十分小さく保ちます。

図 2 ヒストグラムでのバケットのマージ

protected void MergeBuckets()
{
  LinkedListNode<Bucket> current = buckets.First;
  LinkedListNode<Bucket> previous = null;
  int k = (int)Math.Ceiling(1 / epsilon);
  int kDiv2Add2 = (int)(Math.Ceiling(0.5 * k) + 2);
  int numberOfSameCount = 0;
  // Traverse buckets from first to last, hence in order of
  // descending timestamp and ascending count
  while (current != null)
  {
    if (previous != null && previous.Value.Count == current.Value.Count)
      numberOfSameCount++;
    else
      numberOfSameCount = 1;
    // Found k/2+2 buckets of the same count?
    if (numberOfSameCount == kDiv2Add2)
    {
      // Merge oldest (current and previous) into current
      current.Value.Timestamp = previous.Value.Timestamp;
      current.Value.Count = previous.Value.Count + current.Value.Count;
      buckets.Remove(previous);
      // A merged bucket can cause a cascade of merges due to
      // its new count, continue iteration from merged bucket
      // otherwise the cascade might go unnoticed
      previous = current.Previous;
    }
    else
    {
      // No merge, continue iteration with next bucket 
      previous = current;
      current = current.Next;
    }
  }
}

もっと正式に言うと、リストの最初 (最新) から最後 (最古) までのバケットのカウントが減少しないようにします。バケット カウントを 2 のべき乗に制限します。k = 1⁄ε と k⁄2 を整数値にするか、後者を置き換えます。最後のバケットのカウントを除き、最小でも k⁄2 個、最大でも k⁄2 + 1 個のバケットのカウントが同じになるようにします。同じカウントのバケットが k⁄2 + 2 個になる場合は、常に、最も古い 2 つのバケットを 1 つのバケットにマージし、最も古いバケットのカウントの 2 倍のカウントと、最新のタイムスタンプを含めます。2 つのバケットをマージしたら、必ず、マージしたバケットから処理を続行します。このマージは、マージの連鎖を引き起こす可能性があります。マージした場合以外は、次のバケットから処理を続行します。

カウントの概算がどのようになるかを理解するために、ヒストグラムの Query メソッドをご覧ください。

public long Query()
{
  long last = buckets.Last != null ? buckets.Last.Value.Count : 0;
  return (long)Math.Ceiling(total - last / 2.0);
}

最後のバケットまでのバケット カウントの合計は正確です。最後のバケットは、最低でも 1 つ期限が切れていないイベントを含む必要があり、すべてのイベントの期限が切れると、そのバケットは期限切れで削除されます。カウントは、期限が切れるイベントを含む可能性があるために推定値になります。最後のバケットの実際のカウントを最後のバケットのカウントの半分と推定すれば、この推定の絶対誤差はバケットのカウントの半分を超えることはありません。全体のカウントは、すべてのバケット カウントの合計 (total) から、最後のバケットのカウントの半分を差し引くことで推定します。絶対誤差が相対誤差の上限を超えないようにするには、最後のバケットによる影響を、他のバケット カウントの合計に比べて十分小さくする必要があります。このことは、マージのプロシージャで保証しています。

ここまでのコードと説明では、ヒストグラムの動作に混乱を生じたでしょうか。次の例をご覧ください。

ウィンドウ サイズ n = 7、相対誤差の上限 ε = 0.5 (つまり、k = 2) を指定して新しく初期化したヒストグラムを考えてみます。このヒストグラムは、図 3 のようになります。このヒストグラムを図解したのが図 4 です。図 3 では、タイムスタンプ 5、7、および 9 でマージが行われています。タイムスタンプ 9 ではマージが連鎖して行われています。期限が切れるバケットは、タイムスタンプ 13 のバケットです。このバケットについて、この後詳しく説明します。

図 3 指数ヒストグラムの例

タイムスタンプ イベント

バケット (タイムスタンプ、カウント)

最新 “ … “ 最古

合計 クエリ 正確な値

相対

誤差

1 0   0 0 0 0
2 1 (2,1) 1 1 1 0
3 1 (3,1) “ (2,1) 2 2 2 0
4 0 (3,1) “ (2,1) 2 2 2 0

5

(マージ)

1 (5,1) “ (3,1) “ (2,1) 3 2 3 0.333...
(5,1) “ (3,2)
6 1 (6,1) “ (5,1) “ (3,2) 4 3 4 0.25

7

(マージ)

1 (7,1) “ (6,1) “ (5,1) “ (3,2) 5 4 5 0.2
(7,1) “ (6,2) “ (3,2)
8 1 (8,1) “ (7,1) “ (6,2) “ (3,2) 6 5 6 0.166...

9

(マージ)

(連鎖マージ)

1 (9,1) “ (8,1) “ (7,1) “ (6,2) “ (3,2) 7 5 6 0.166...
(9,1) “ (8,2) “ (6,2) “ (3,2)
(9,1) “ (8,2) “ (6,4)
10 0 (9,1) “ (8,2) “ (6,4) 7 5 5 0
11 0 (9,1) “ (8,2) “ (6,4) 7 5 5 0
12 0 (9,1) “ (8,2) “ (6,4) 7 5 4 0.25
13 0 (9,1) “ (8,2) 3 2 3 0.333...

A Schematic Overview of the Histogram Depicted in Figure 3
図 4 ヒストグラム (図 3) の図解

最初のイベントでは動きがありません。5 個目のイベントで同じカウント 1 を含むバケット数が k⁄2 + 2 個になるため、最も古いバケットのマージが行われます。7 個目のイベントでも再びマージが行われます。9 個目のイベントでもマージが行われますが、このマージによって新たなマージが連鎖的に引き起こされます。7 個目のイベントの後、最初のイベントの期限が切れます。12 個目のイベントまでは、タイムスタンプが期限切れになるバケットはありません。13 個目のイベントで、タイムスタンプ 6 のバケットは、期限が切れていないイベントを最低でも 1 つ含むという条件が成立しなくなるため、バケット自体が期限切れになります。この表の相対誤差は、相対誤差の上限より小さいことは明らかです。

図 4 の点線で囲んだ範囲はその時点のウィンドウ サイズです。ウィンドウは複数のバケットを含み、対象とするイベントの範囲を示しています。実線で囲んだ範囲がバケットで、上部にタイムスタンプ、下部にカウントを示しています。状況 A は、タイムスタンプ 7 の時点のヒストグラムを示し、カウントしたイベントを矢印で示しています。状況 B は、タイムスタンプ 9 の時点のヒストグラムを示し、最後のバケットに期限切れのイベントが含まれることを表しています。状況 C は、タイムスタンプ 13 の時点のヒストグラムを示し、タイムスタンプ 6 のバケットが期限切れになったことを表しています。

これらをすべて 1 つにまとめ、指数ヒストグラム用に小さなデモ プログラムを作成しました (この記事に添えたコード サンプルでソース コードを確認してください)。デモ プログラムの実行結果を図 5 に示します。カウントベース ウィンドウのサイズ N は 100 万個のイベントを表し、1 億個のイベントから成るストリームのシミュレーションを行っています。各イベントのペイロードが 0 になるか 1 になるかの確率は 50% です。相対誤差の上限 ε は任意に 1% (精度 99%) を選択して、ペイロード 1 のイベントの概算カウントを推定しています。ヒストグラムのメモリ節約量はウィンドウに比べて大きく、バケット数はウィンドウに含まれるイベント数に比べて非常に少なくなっています。Intel 2.4 GHz デュアルコア プロセッサと 3 GB の RAM を搭載するコンピューターで Windows 7 を実行している場合、10 万イベント/秒のイベント レートが実現されました。

Empirical Results for the Exponential Histogram
図 5 指数ヒストグラムの実験結果

洗練された StreamInsight

Microsoft StreamInsight とはどのようなもので、今回のテーマにどのように利用できるのでしょう。ここからは StreamInsight の基礎をいくつか紹介します。StreamInsight とは、堅牢でパフォーマンスが高く、オーバーヘッドが少ないうえにほぼ待ち時間がない、柔軟性が極めて高いストリーム処理エンジンです。現バージョンは 2.1 です。試用版もありますが、完全版には SQL Server のライセンスが必要です。このエンジンは、スタンドアロンのサービスとしても、埋め込み型のインプロセスとしても実行できます。

ストリーミング データ処理の中心となるのは、イベントの時系列ストリームによるモデルです。概念的には、時間の経過と共に到着し、ほぼ無限に続く大量のデータ コレクションです。株価、天気観測データ、電力使用量、Web のクリック数、インターネット トラフィック、高速道路の料金所などが一例です。ストリームに含まれる各イベントには、メタデータとデータのペイロードを含むヘッダーがあります。イベントのヘッダーでは、最低限、タイムスタンプを保持します。イベントには、一定のペースで到着するもの、断続的に到着するもの、1 秒間に大量かつ突発的に到着するものもあります。その形式には、ある時点に限定されるもの、特定の間隔で有効になるもの、不定期 (エッジ) に有効になるものがあります。ストリームに含まれるイベント以外に、Common Time Increment (CTI) という特殊な区切りイベントがあり、これはエンジンが発行します。この CTI のタイムスタンプより前のタイムスタンプを指定して、イベントをストリームに挿入することはできません。CTI イベントは、事実上、不規則に到着するイベントの範囲を決めます。ありがたいことに、このようなことは StreamInsight が行います。

さまざまな種類の入力ソースや出力ストリームのシンクは、なんらかの方法でこのようなモデルに対応する必要があります。(クエリ可能な) 時系列ストリームに含まれるイベントは、IQStreamable<TPayload> でキャプチャされます。イベントのペイロードは、概念的には、列挙によって取り出され、監視によってストリームに書き込まれます。したがって、基になるデータは、それぞれ IEnumerable<T>/IQueryable<T> (Reactive Extension) または IObservable<T>/IQbservable<T> (Reactive Extension) で公開でき、公開するデータの型を指定してパラメーター化できます。これらのインターフェイスにより、時系列の側面の管理が処理エンジンに委ねられ、さまざまなインターフェイスとの間で相互に変換されます。

ソースとシンクはインターフェイスの境界上で機能しますが、実際の処理はクエリ内で行われます。クエリは、LINQ で記述する構成要素の基本単位です。1 つ以上のストリームに含まれるイベントを連続的に処理し、別のストリームに出力します。クエリは、イベントのプロジェクト、フィルター、グループ適用、マルチキャスト、演算/集計、結合、ユニオン、およびウィンドウに使用します。演算子は、ユーザーが定義できます。演算子は、イベントが到着するときに、イベント (増分) またはウィンドウ (非増分) に作用します。

ウィンドウ分割は順番に行われます。ウィンドウを作成するには、ストリームをイベントの有限なサブセットに分割します。このサブセットは、連続するウィンドウ間でオーバーラップすることができます。ウィンドウの作成により、事実上ウィンドウのストリームが生成されます。これは、StreamInsight の IQWindowedStreamable<TPayload> によって行われます。現時点では、カウントベースのウィンドウ、時間ベースのウィンドウ、およびスナップショット ウィンドウの 3 種類のウィンドウ作成の考え方がサポートされています。カウントベースのウィンドウは、最新の N 個のイベントを含み、新しいイベントが到着するとスライドし、最も古いイベントを期限切れにして最新イベントを挿入します。時間ベースのウィンドウは、最新の時間間隔内に発生した最新イベントを含み、一定間隔でスライドします (ホッピングやタンブリングとも呼ばれます)。スナップショット ウィンドウは、イベントの開始時刻と終了時刻に従ってウィンドウが設定されます。最も近いイベントの開始時刻と終了時刻のペアごとにウィンドウが作成されます。イベントとは無関係に、タイムラインに沿った間隔によって設定される時間ベースのウィンドウとは異なり、スナップショット ウィンドウはタイムラインに沿って固定されません。

ここで取り上げたのは、ごく基本的なことです。詳細については、オンラインの開発者ガイド (https://msdn.microsoft.com/ja-jp/library/ee391564(v=sql.111).aspx)、「A Hitchhiker’s Guide to StreamInsight 2.1 Queries」(StreamInsight 2.1 クエリのヒッチハイク ガイド、bit.ly/NbybvY、英語)、CodePlex の例、StreamInsight チーム ブログ (blogs.msdn.com/b/streaminsight、英語) などのいくつかのソースをご覧いただけます。

すべてを 1 つに組み立てる

これで基礎ができました。でもまだ、カウントの概算計算を StreamInsight でどのように活用できるか疑問に感じているでしょう。簡単に言うと、ペイロードに 0 か 1 を含み、特定時点に発生するイベントの (時系列) ソース ストリームが必要です。このストリームを、指数ヒストグラムを使って最新 N 個のイベントの中からペイロードが 1 のイベントの数を概算カウントするクエリに渡します。このクエリは、カウントの概算値を含み、特定時点に発生するイベントの (時系列) ストリームを生成し、シンクに設定します。

まず、カウントの概算値を求めるユーザー定義演算子を用意します。カウントベースのウィンドウを使って最新 N 個のイベントを取り出そうと考えていますか。考え直してください。それでは、メモリを節約する指数ヒストグラムのメリットが損なわれることになります。なぜなら、このウィンドウの考え方では、イベントのウィンドウ全体が強制的にメモリに保持されるためです。指数ヒストグラムでは、バケットを管理することでウィンドウと同じ概念を実現しているため、すべてをメモリに保持する必要はありません。さらに、複数のウィンドウをまたがる演算子を用意すると増分ではなくなります。つまり、イベントの到着時には処理が行われず、(次の) ウィンドウが利用できるようになったときにのみ処理が可能です。解決策は、クエリに明示的なウィンドウの考え方を導入しないユーザー定義ストリーム演算子を用意することです。このコードを、図 6 にリストします。

図 6 ユーザー定義ストリーム演算子の実装

[DataContract]
public class ApproximateCountUDSO : CepPointStreamOperator<bool, long>
{
  [DataMember]
  private ExponentialHistogram histogram;
  [DataMember]
  private long currentTimestamp;  // Current (discrete) timestamp
  public ApproximateCountUDSO(long n, double epsilon)
  {
    histogram = new ExponentialHistogram(n, epsilon);
  }
  public override IEnumerable<long> ProcessEvent(
    PointEvent<bool> inputEvent)
  {
    currentTimestamp++;
    histogram.Update(currentTimestamp, inputEvent.Payload);
    yield return histogram.Query();
  }
  public override bool IsEmpty
  {
    get { return false; }
  }
}

演算子は、抽象クラスの CepPointStreamOperator<TInputPayload, TOutputPayload> から派生します。指数ヒストグラムのインスタンス変数を用意します。DataContract 属性と DataMember 属性で装飾します。これは、たとえば回復性を目的に、StreamInsight に演算子をシリアル化する方法を指示します。演算子は、IsEmpty 演算子をオーバーライドし、演算子が空ではない (ステートフルである) ことを示します。これにより、StreamInsight がメモリ使用量を最小化する際に、この演算子で混乱を生じることを防ぎます。ProcessEvent メソッドが、この演算子の中核部分です。現在の (離散) タイムスタンプを増やし、イベント ペイロードと共にヒストグラムの更新メソッドに渡します。ヒストグラムは、バケットを処理し、カウントの概算値をクエリします。必ず yield return 構文を使用し、演算子を列挙可能にします。演算子は、一般にユーティリティ クラスに隠ぺいされたいくつかの拡張メソッドにラップされます。これをどのように行うかを、次のコードで示します。

public static partial class Utility
{
  public static IQStreamable<long> ApproximateCount(
    this IQStreamable<bool> source, long n, double epsilon)
  {
    return source.Scan(() => new ApproximateCountUDSO(n, epsilon));
  }
}

これで作業は完了です。拡張メソッドで演算子をクエリにプラグインします。この使い方を実際に示すため、いくつかの追加コードが必要です。次にちょっとしたソース ストリームを示します。

public static partial class Utility
{
  private static Random random = new Random((int)DateTime.Now.Ticks);
  public static IEnumerable<bool> EnumeratePayloads()
  {
    while (true)  // ad infinitum
    {
      bool payload = random.NextDouble() >= 0.5;
      yield return payload;
    }
  }
}

このコードは、ペイロードに 0 と 1 をランダムに生成します。yield return 構文により、列挙可能なソースに変換します。必要に応じて、ユーティリティ クラスに配置します。

悪名高い Program クラスを図 7 に示します。このクラスは、StreamInsight のインプロセス埋め込みサーバー インスタンスを作成します。ApproximateCountDemo というアプリケーション インスタンスが、名前付きストリーム、クエリなどのためのストリーミング処理 (メタデータ) コンテナーとして作成されます。前述のペイロードを生成するユーティリティ メソッドを使用して、特定時点に発生するイベントの列挙可能なソースを定義し、それを特定時点に発生するイベントの時系列ストリームに変換します。クエリを LINQ で定義し、ソース ストリームに対して概算カウントを計算する演算子を選択します。ここでは、ユーザー定義演算子用の拡張メソッドが便利です。ウィンドウ サイズと相対誤差の上限とともにブートストラップします。次に、クエリ出力を列挙可能なシンクに変換し、時系列のプロパティを取り除きます。最後に、シンクを反復処理し、パイプラインを通じてイベントをアクティブに取り出します。プログラムを実行し、画面に数値結果が出力されるのを試してみてください。

図 7 StreamInsight での埋め込みと実行

class Program
{
  public const long N = 10000;
  public const double Epsilon = 0.05;
  static void Main(string[] args)
  {
    using (Server server = Server.Create("StreamInsight21"))
    {
      var app = server.CreateApplication("ApproximateCountDemo");
      // Define an enumerable source
      var source = app.DefineEnumerable(() =>
        Utility.EnumeratePayloads());
      // Wrap the source in a (temporal) point-in-time event stream
      // The time settings determine when CTI events
      // are generated by StreamInsight
      var sourceStream = source.ToPointStreamable(e =>
        PointEvent.CreateInsert(DateTime.Now, e),
        AdvanceTimeSettings.IncreasingStartTime);
      // Produces a stream of approximate counts
      // over the latest N events with relative error bound Epsilon
      var query =
        from e in sourceStream.ApproximateCount(N, Epsilon) select e;
      // Unwrap the query's (temporal) point-in-time
      // stream to an enumerable sink
      var sink = query.ToEnumerable<long>();
      foreach (long estimatedCount in sink)
      {
        Console.WriteLine(string.Format(
          "Enumerated Approximate count: {0}", estimatedCount));
      }
    }
  }
}

簡単にまとめると、今回は指数ヒストグラム データ構造を使って、誤差に上限を設け、対数的な時間と領域の中でイベントのウィンドウに含まれるカウントを概算する方法を説明しました。このヒストグラムは、StreamInsight のユーザー定義演算子に埋め込まれます。

ヒストグラムと演算子は、時間ベースのウィンドウなど、可変サイズのウィンドウをサポートするように拡張できます。これには、ヒストグラムがウィンドウ サイズではなく、ウィンドウの間隔/タイムスパンを認識する必要があります。バケットのタイムスタンプが、新しいイベントのタイムスタンプからウィンドウのタイムスパンを差し引いたものより前になると、そのバケットは期限切れになります。分散を計算するための拡張は、Brian Babcock、Mayur Datar、Rajeev Motwani、および Liadan O’Callaghan の「Maintaining Variance and k–Medians over Data Stream Windows」(データ ストリーム ウィンドウにおける分散と k-メディアンの管理、stanford.io/UEUG0i、英語) という論文で示されています。この論文では、ヒストグラムとは別に、いわゆるシノプシス構造について説明されています。ランダム サンプル、ヘビー ヒッター、変位値などについて考えることができます。

今回付属のソース コードは、Visual Studio 2010 を使って C# 4.0 で作成しており、StreamInsight 2.1 が必要です。コードは、Microsoft Public License (Ms-PL) から無料で入手できます。これは教育目的に開発され、運用産環境向けに最適化されておらず、テストもされていないことに注意してください。

Michael Meijer は、CIMSOLUTIONS BV のソフトウェア エンジニアであり、オランダ各地の企業に IT コンサルティング サービスとソフトウェア開発ソリューションを提供しています。彼は、オランダ、エンスヘーデのトゥウェンテ大学で研究する中で StreamInsight とストリーミング データの処理に関心を抱き、コンピューター サイエンス情報システム エンジニアリングの科学修士号を取得しました。

この記事のレビューに協力してくれた技術スタッフの Erik Hegeman、Roman Schindlauer、および Bas Stemerdink に心より感謝いたします。