Azure の詳細

Microsoft Azure サービスを使用したテレメトリの取り込みと分析

**Bruno Terkaly
Ricardo Villalobos
Thomas Conte

Bruno Terkaly and Ricardo Villalobos[レギュラーのコラムニストの Bruno Terkaly と Ricardo Villalobos は、今月のコラムをゲスト コラムニストに委ねました。Bruno と Ricardo は次回のコラムで戻る予定です。編集長]

センサーを備えたデバイスはテレメトリ データを生成します。このテレメトリ データの解釈が今月のテーマです。私たちの身の回りを見てみると、たとえば、ドライバーは車のダッシュボードを見て、自分の運転が燃費や交通量に与える影響を確認します。産業界を考えると、たとえば、オペレーターはある機械の温度を工場のフロアにある別の機械の平均温度と比較して、故障するリスクを判断したり、メンテナンスの時期を予測します。

このようなシナリオでは、接続された大量のデバイスからのテレメトリ データが必要になります。もっと重要なのは、こうしたデータを分析して意味ある情報にまとめて目に見えるかたちにすることです。このような膨大な量のデータを扱う場合、Hadoop などのビッグ データのフレームワークが、インストールされたデバイスの数に応じてスケールアップできる、堅牢なデータ処理基盤を構築します。

今回は、Microsoft Azure サービス バスを使用してシンプルなテレメトリ取り込みアーキテクチャを作成する方法を取り上げます。さらに、HDInsight という Microsoft Azure Hadoop サービスを使用して、取り込んだデータをスケーラブルな方法で分析します。

ソリューションのアーキテクチャ

前回のコラムで Bruno Terkaly と Ricardo Villalobos が、サービス バスを使用してコマンド チャネルを確立し、接続されたオブジェクトと通信する方法について紹介しました。今回は、サービス バスをミドルウェア層として使用して、デバイスから送信されるテレメトリ メッセージをバッファリングします。

各デバイスはサービス バスと直接通信し、テレメトリ メッセージを専用トピックに送信します (図 1 参照)。その後、1 つまたは複数のサブスクリプションが、送信されたメッセージをキューからワーカー ロールに取り出し、Blob ストレージにフラット ファイルとして保存します。続いて、Hadoop クラスターが、保存された入力ファイルを使用して分析や計算を行います。

Basic Flow of Big Data Telemetry Solution
図 1 ビッグ データ テレメトリ ソリューションの基本的な流れ

このアーキテクチャのメリットは、さまざまな部分を互いに分離できることです。サービス バスはミドルウェアとして動作し、ワーカーの読み取り速度が遅ければ、データをバッファリングします。開発者はキューの長さを監視して、これをワーカー層の自動スケーリングの基礎として使用することができます。

サブスクリプションでは、着信データにシンプルなフィルター処理を施し、バックエンドのさまざまな処理層にルーティングすることもできます。たとえば、リアルタイムの警告システムにメッセージを送信する Urgent サブスクリプションや、すべてのデータをキャプチャし、後で分析するための Everything サブスクリプションなどが考えられます。

ワーカーはデータをストレージ (Hadoop 分散ファイル システム (HDFS) または Blob ストレージ) に移動するだけなので、Hadoop の処理部分とは分離できます。つまり、データの着信間隔とは無関係に実行できます。Hadoop クラスターを永遠に実行し続けることも選択できます。その結果、小さなバッチを絶えず処理して、演算の遅れを減らすことができます。また、毎日 1 回だけ HDInsight クラスターを起動して、1 回のバッチですべての計算を実行してコストの削減を図ることもできます。さらには、両者を混在させて利用することも可能です。

サービス バスを使用したテレメトリ データの取り込み

Azure サービス バスでは、HTTP または AMQP のいずれかのプロトコルでトピックにメッセージを送信します。接続型のデバイスの多くは帯域幅に制限があるため、AMQP の方がいくぶんメリットがあります。AMQP は、効率がよく、信頼性が高い、移植可能なバイナリ形式のプロトコルです。多くの言語、ランタイム環境、および OS 用にライブラリも用意されています。そのため、デバイスをサービス バスに直接接続してテレメトリ メッセージを送信する際に柔軟性が得られます。

このアプローチをテストするために、Raspberry Pi ボードを使用し、Apache Qpid Proton AMQP ライブラリを使用して、温度などのセンサー データをフィードしました。Proton は、さまざまな環境でコンパイルして AMQP メッセージを送信できる必要最小限の機能を備えた移植可能なライブラリです。Proton は、Azure サービス バスと完全に相互運用可能です。Proton AMQP ライブラリの詳細については、bit.ly/1icc6Ag (英語) を参照してください。

今回の例では、Raspberry Pi ボードで Proton ライブラリを直接コンパイルしました。Python バインディングを使用してシンプルなスクリプトを記述し、USB シリアル ポートからセンサーの読み取り値をキャプチャし、キャプチャしたデータを Azure サービス バスに送信します (図 2 参照)。

図 2 センサーからの読み取り値をキャプチャするための Raspberry Pi 読み取りの Python コード

#!/usr/bin/python
import sys
import commands
import re
import uuid
import serial
from proton import *
# Device ID
id = uuid.getnode()
# Topic address
address = "amqps://owner:key@address.servicebus.windows.net/telemetry"
# Open serial port
ser = serial.Serial('/dev/ttyACM0', 9600)
# Create Proton objects
messenger = Messenger()
while True:
  # Read values from Arduino in the form K1:V1_K2:V2_...
  temp = ser.readline().rstrip('\r\n')
  print temp
  # Create AMQP message
  message = Message()
  # Initialize properties
  message.properties = dict()
  message.properties[symbol("did")] = symbol(id)
  # Map string to list, symbolize, create dict and merge
  pairs=map(lambda x:x.split(':'), temp.split('_'))
  symbols = map(lambda x:(symbol(x[0]),int(x[1])), pairs)
  message.properties.update(dict(symbols))
  message.address = address
  messenger.put(message)
  messenger.send()

Python スクリプトは、Azure サービス バスの "telemetry" というトピックを直接アドレス指定します。そのためには、サービス バスの標準認証トークンを含み、AMQP プロトコルを使用して指定する接続文字列を使用します。現実の環境では、もっと洗練された認証メカニズムを使用して、接続パラメーターが漏えいしないようにする必要があります。

数多くの Raspberry デバイスがデータの収集を始めるとします。各 Raspberry デバイスは Device ID (DID) を送信します。この ID を後ほど再度使用して平均温度を計算します。今回の例では、UUID モジュールを使用して DID を生成し、システムの MAC アドレスを取得します。

USB 経由で Raspberry Pi に接続される Arduino Esplora ボードは、各デバイスからの読み取り値を集めます。Esplora は、組み込みセンサーを備えた一体型のボードです。このボードは、温度などの環境パラメーターを簡単に読み取ってシリアル バスに送信します。次に、USB ケーブルのもう一方の端点にある Python スクリプトが出力値を読み取ります。図 3 に、センサー値をシリアル ポートに出力する Arduino スキーマの例を示します。

図 3 Raspberry Pi の読み取り値を収集する Arduino コード

void loop()
{
  int celsius = Esplora.readTemperature(DEGREES_C);
  int loudness = Esplora.readMicrophone();
  int light = Esplora.readLightSensor();
  Serial.print("T:");
  Serial.print(celsius);
  Serial.print("_");
  Serial.print("M:");
  Serial.print(loudness);
  Serial.print("_");
  Serial.print("L:");
  Serial.print(light);
  Serial.println();
  // Wait a second
  delay(1000);
}

ビッグ データの配置を選択する

データ分析に使用する Hadoop ソリューションの種類にはいくつか選択肢があります。配置の種類を選択することで、分析に必要なデータの収集方法と場所を決定します。

Azure は、HDInsight による魅力的なソリューションを提供します。このソリューションは、Hadoop フレームワークをサービスとして公開します。Hortonworks Data Platform (HDP) for Windows に基づくこの Hadoop ディストリビューションには、ジョブが Azure Blob ストレージの入力データに直接アクセスできるようにするコネクターが含まれています。

つまり、入力ファイルを受け取るために Hadoop クラスターを稼働させる必要はありません。Blob ストレージ コンテナーにファイルをアップロードします。このコンテナーは、HDInsight が後で使用します。ファイルのバッチを分析する場合、HDInsight クラスターを数分で起動し、一連のジョブを数時間実行後に、シャットダウンします。これにより、計算時間のコストが低くなります。

一方、HDP などの Hadoop の標準ディストリビューションまたは Azure 仮想マシン (VM) の Cloudera Distribution を配置することを選択すると、クラスターを最新の状態に保持する必要があります。また、正しく構成して最適な操作を行えるようにしなければなりません。HDInsight に含まれないカスタム Hadoop コンポーネント (HBase など) をストレージ メカニズムとして使用する場合、このアプローチに意味があります。

Blob ストレージにテレメトリ データを保存する

Azure サービス バスからデータを抽出するのは簡単です。サブスクリプションの "リーダー" または "リスナー" としてワーカー ロールを使用します。その後、HDInsight が使用できる入力ファイルにメッセージを蓄積します。

まず、Azure サービス バス トピックで 1 つまたは複数のサブスクリプションを設定します。これにより、データ ストリームを分割またはルーティングするときに、要件に応じてある程度自由が利きます。少なくとも、"すべてをキャッチする" サブスクリプションを作成して、すべての受信メッセージを保存することを考えます。Azure サービス バスのサブスクリプションでフィルターを使用することもできます。これは、特定のメッセージ用に追加ストリームを作成します。図 4 に、C# および Azure サービス バス SDK ライブラリを使用してトピックとサブスクリプションを作成する例を示します。

図 4 Azure サービス バス サブスクリプション

var namespaceManager = 
  NamespaceManager.CreateFromConnectionString(connectionString);
// Create the Topic
if (!namespaceManager.TopicExists("telemetry"))
{
  namespaceManager.CreateTopic("telemetry");
}
// Create a "catch-all" Subscription
if (!namespaceManager.SubscriptionExists("telemetry", "all"))
{
  namespaceManager.CreateSubscription("telemetry", "all");
}
// Create an "alerts" subscription
if (!namespaceManager.SubscriptionExists("telemetry", "alert"))
{
  SqlFilter alertFilter = new SqlFilter("type = 99");
  namespaceManager.CreateSubscription("telemetry", 
  "alert", alertFilter);
}

Azure サービス バス サブスクリプションを作成したら、メッセージを受信および保存できるようになります。今回の例では、コンピューターにも人間にもわかりやすい CSV 形式を使用します。着信メッセージをできるだけ迅速に読み取るため、ワーカーでは多くのタスクを作成します (今回の例では 10 個)。また、1 つずつ読み取る代わりに、Async メソッドを使用してメッセージをバッチで読み取ります。"all" サブスクリプションと "telemetry" トピックでメッセージを受信します (図 5 参照)。

図 5 サブスクリプションからメッセージを受信して Blob ストレージに保存する

SubscriptionClient client = 
  SubscriptionClient.CreateFromConnectionString(connectionString, 
  "telemetry", "all", ReceiveMode.ReceiveAndDelete);
List<Task> tasks = new List<Task>();
for (int i = 0; i < NBTASKS; i++)
{
  var id = i; // Closure alert
  Task t = Task.Run(async () =>
  {
    BlobStorageWriter writer = new BlobStorageWriter(id);
    while (true)
    {
      var messages = await client.ReceiveBatchAsync(BATCH_SIZE);
      foreach (var message in messages)
      {
        try
        {
          await writer.WriteOneLine(TelemetryMessage.Stringify(message));
        }
        catch (Exception ex)
        {
          Trace.TraceError(ex.Message);
        }
      }
    }
  });
  tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());

TelemetryMessage.Stringify メソッドは、テレメトリ データを含む CSV 形式のテキスト行を単純に返します。また、Azure サービス バスのヘッダーから、メッセージ ID またはキューに格納された時刻といった有益なフィールドを抽出することもできます。

BlobStorageWriter.WriteOneLine ジョブは、Blob に行を直接書き込みます。10 個のタスクを並列処理できるので、同じ番号の Blob が一度に影響を受けます。WriteOneLine では、HDInsight のために随時ファイルをローテーションしてピックアップするようにします。いつ新しいファイルに切り替えるかを決定するために、ファイルに書き込まれた行数と、Blob が作成されてからの時間という 2 つのパラメータを使用します (たとえば、1 時間ごとまたは 1,000,000 行に達したときに新しいファイルを作成する)。このメソッドは、非同期呼び出しを使用して、Blob ストリームへのメッセージ書き込み中のブロッキングを回避します (図 6 参照)。

図 6 Azure の Blob にメッセージからのデータを書き込む

public async Task WriteOneLine(string line)
{
  var bytes = Encoding.UTF8.GetBytes(string.Format("{0}\n", line));
  await destinationStream.WriteAsync(bytes, 0, bytes.Length);
  TimeSpan ts = DateTime.Now - startBlobTime;
  if (++linesWritten > MAX_LINES || ts.TotalSeconds > MAX_SECONDS)
  {
    Trace.TraceInformation(
      "Wrote " + linesWritten + " lines to " + currentBlob.Name);
    GetNextBlob();
    linesWritten = 0;
  }
}

結果のファイルには、次に示すように、テレメトリ メッセージから抽出したデータが含まれます。

145268284e8e498282e20b01170634df,test,24,980,21,2014-03-14 13:43:32
dbb52a3cf690467d8401518fc5e266fd,test,24,980,21,2014-03-14 13:43:32
e9b5f508ef8c4d1e8d246162c02e7732,test,24,980,21,2014-03-14 13:43:32

抽出したデータには、メッセージ ID、デバイス ID、3 つの読み取り値、メッセージがキューに格納された日付などが含まれています。この形式は次の手順で簡単に解析できます。

HDInsight を使用してデータを分析する

HDInsight の最もすばらしいメリットは、コマンドラインから直接完全な Hadoop クラスターを開始し、ジョブを実行して、クラスターのプロビジョニングを解除できることです。VM にログオンしたり、構成をカスタマイズする必要はまったくありません。Windows で Windows PowerShell を使用するか、Mac または Linux でクロス プラットフォームのコマンドライン ツールを使用して HDInsight をプロビジョニングおよび管理できます。

統合 Azure PowerShell コマンドレットは、bit.ly/1tGirZk からダウンロードできます。このようなコマンドレットには、Azure インフラストラクチャを管理するために必要なすべてのコマンド (HDInsight のクラスターなど) が含まれています。発行の設定をインポートして既定のサブスクリプションを選択したらと、1 行のコマンドだけで新しい HDInsight クラスターを作成できます。

New-AzureHDInsightCluster -Name "hditelemetry" -Location "North Europe" -DefaultStorageAccountName "telemetry.blob.core.windows.net" -DefaultStorageAccountKey "storage-account-key" -DefaultStorageContainerName "data" -ClusterSizeInNodes 4

このコマンドは、HDInsight クラスターに、ファイル システムのルートとして既存のストレージ アカウントとコンテナーを使用するように指示します。これが、取り込みプロセスが生成するすべてのテレメトリ データにアクセスする方法です。データ量や必要とする並列処理の程度に応じて、クラスターで使用するワーカー ノード数を選択することもできます。

クラスターを稼働したら、リモート デスクトップ アクセスを有効にします。このようにすると、他のユーザーがヘッド ノードにログオンし、標準の Hadoop コマンドやツールを使用して対話型セッションを開始できます。ただし、Windows PowerShell を利用して Map Reduce ジョブ、Hive ジョブ、または Pig ジョブを起動するリモート コマンドを使用すると、さらに高速になります。

Pig ジョブを使用して平均温度を計算します。Pig は当初、Yahoo で開発されました。Pig のおかげで、Hadoop を使用するユーザーは、Hadoop を使用した大きなデータ セットの分析に専念し、マッパーやリデューサーのプログラムを記述するためにかける時間を短縮できます。通常の Pig スクリプトには次の 3 つのステップがあります。

  1. 操作するデータを読み込む
  2. 一連のデータ変換を実行する (一連のマッパーやリデューサーのタスクに変換される)
  3. 画面に結果をダンプするか、ファイルに保存する

次の例では、Exploratory Data Analysis (EDA) フェーズで Pig インタープリターを使用し、スクリプトをインタラクティブに実行することによってこの手順を実現する代表的な方法を示します。

data = load '/telemetry*.csv' using PigStorage(',') as (id:chararray, did:chararray, temp:int, light:int, mic:int, timestamp:datetime);
data1 = group data by did;
data2 = foreach data1 generate group as did, COUNT(data), AVG(data.temp);
dump data2;

このスクリプトを Pig インタープリターに直接入力する場合、多くの温度データ ポイントと DID ごとに測定された平均値を含む表が表示されます。ご覧のとおり、Pig 構文はかなり明白です。さまざまなデータ操作手順が明白に分けられています。

  • 最初の load ステートメントは、入力フィールドの名前と型を指定して、CSV ファイルからデータを読み込みます。
  • 次に、データを DID (デバイス) ごとにグループ化します。
  • 結果のデータセットは、COUNT や AVG のような集計関数を使用して生成します。

スクリプトの処理が終了したら、Windows PowerShell を使用してこのタスクを自動化できます。New-AzureHDInsightPigJobDefinition コマンドレットを使用して、作成されたスクリプトで Pig ジョブを初期化します。次に、Start-AzureHDInsightJob および Wait-AzureHDInsightJob を使用してジョブを開始し、ジョブの終了を待機します (図 7 参照)。さらに、Get-AzureHDInsightJobOutput を使用して結果を取得します。

図 7 HDInsight でジョブを挿入、分析、および開始する

$PigScript = "data = load '/telemetry*.csv' using PigStorage(',') as (id:chararray, did:chararray, temp:int, light:int, mic:int, timestamp:datetime);" +
"data1 = group data by did;" +
"data2 = foreach data1 generate group as did, COUNT(data), AVG(data.temp);" +
"dump data2;"
# Define a Pig job
$pigJobDefinition = New-AzureHDInsightPigJobDefinition -Query $PigScript
# Start the job
$pigJob = Start-AzureHDInsightJob -Cluster "hditelemetry" -JobDefinition $pigJobDefinition
# Wait for the job to finish
Wait-AzureHDInsightJob -Job $pigJob -WaitTimeoutInSeconds 3600
# Get the job results
Get-AzureHDInsightJobOutput -Cluster "hditelemetry" -JobId $pigJob.JobId –StandardOutput

コマンドライン コンソールに表示される結果は、次のようになります。

C:\> Get-AzureHDInsightJobOutput -Cluster "hditelemetry" -JobId $pigJob.JobId
(test,29091,24.0)
(49417795060,3942,30.08371385083714)

この場合、かなり多くのテスト測定値と Raspberry Pi からの約 4,000 件の読み取り値があります。この読み取り値の平均は 30 度です。

まとめ

Azure サービス バスは、あらゆる種類のデバイスからデータを収集するための信頼性の高い迅速な方法です。このデータを保存して分析できるようにするには、堅牢なストレージと分析エンジンが必要です。Azure HDInsight は、このレベルのストレージ用に Hadoop クラスターを作成して管理するプロセスを抽象化します。これは、Windows PowerShell または Mac/Linux Azure コマンドライン インターフェイスといったツールを使用して構成および自動化できる、非常にスケーラブルなソリューションです。

Thomas Conte は、Developer & Platform Evangelism (DPE) 部門の Microsoft Azure プラットフォームのテクニカル エバンジェリストです。彼の役割は、コード サンプル、出版物、および講演を通じて、開発者、アーキテクト、およびソフトウェア パートナーがテクノロジにアクセスすることを促進することです。彼は、オープン ソース環境のできるだけ多くのマイクロソフト以外のテクノロジで Microsoft Azure を実行しようと努めています。twitter.com/tomconte (英語) でフォローしてください。

Bruno Terkaly は、マイクロソフトの開発者エバンジェリストです。彼の豊富な知識は、数多くのプラットフォーム、言語、フレームワーク、SDK および API を使用してコードを記述してきた、何年にも上る業界での経験に基づいています。彼は、特に Microsoft Azure プラットフォームを使用して、クラウド ベースのアプリケーション構築に関するコードの記述、ブログ投稿、およびライブ プレゼンテーションを行って時間を過ごしています。ブログは、blogs.msdn.com/b/brunoterkaly (英語) で公開されています。

Ricardo Villalobos は、経験豊かなソフトウェア アーキテクトとして、多くの業界の企業用アプリケーションを 15 年以上にわたって設計および作成しています。さまざまな技術認定資格の保持者であり、ダラス大学で経営管理の修士号を取得しています。彼はマイクロソフトの DPE Globally Engaged Partners チームのクラウド アーキテクトを務め、世界中の企業の Windows Azure ソリューション実装を支援しています。ブログは blog.ricardovillalobos.com (英語) で公開されています。

この記事のレビューに協力してくれたマイクロソフト技術スタッフの Rafael Godinho および Jeremiah Talkar に心より感謝いたします。