Fluentd で収集したログを Azure Event Hubs に連携する方法
Microsoft Japan Data Platform Tech Sales Team
ログ収集に Fluentd を利用されている方は多いかと思います。その収集したログを kafka にキューイングし、必要に応じて Spark で処理をしつつストレージ等に蓄積されているケースもあるかと思いますが、Azure には大量のストリーミングデータをキューイングする Event Hubs、更にキューイングされたデータを加工した後に別サービスに出力する Stream Analytics という PaaS が用意されております。今回は Fluentd から Azure Event Hubs にログを飛ばしてキューイングし、Stream Analytics でAzure Blob Storage に出力してログを蓄積する方法について触れたいと思います。
Fluentd は他のコンポーネントと連携するためのプラグインが豊富であり、Azure Event Hubs 用のプラグインも (Microsoft が提供しているわけではないですが) github に公開され、gem リポジトリにもご登録いただいております。この Event Hubs 用プラグインを使用すると非常に簡単に Fluentd から Event Hubs にデータを流し込むことが可能です。今回は Treasure Data 殿が提供されている Fluentd の配布パッケージである td-agent にプラグインとして上記プラグインをインストールして動作確認していきます。
今回、OS は Red Hat Enterprise Linux 7.3 を用意し yum で td-agent をインストールした環境を準備しましたが、td-agent へのパスを設定しておきます。
export PATH=/opt/td-agent/embedded/bin/:$PATH
export LD_LIBRARY_PATH=/opt/td-agent/embedded/lib:$LD_LIBRARY_PATH
gem にて Azure Event Hubs のプラグインをインストールします。
gem install fluent-plugin-azureeventhubs
Td-agent のプラグインとしてインストールされたら、下記フォルダ下にライブラリ等のファイルが格納されているのでご確認ください。
/opt/td-agent/embedded/lib/ruby/gems/x.x.0/gems/fluent-plugin-azureeventhubs-x.x.x
次に Fluentd がログを送信する先の Event Hubs 環境を Azure Portal より作成します。
Event Hubs
1. Name Space の作成
Name Space の名前、リソースグループの名前、場所、Throughput Unit 数を指定します。ここで Throughput Unit とは Event Hubs の性能を制御する単位であり、1つの Throughput Unit につき以下の容量が含まれます。
- イングレス(受信): 1 秒あたり最大で 1 MB または 1,000 イベント (どちらか先に到達した方)
- エグレス(送信): 1 秒あたり最大で 2 MB
また、auto-inflate というのは性能負荷が増加した際に、設定した Throughput Unit 数まで自動で拡張してくれる機能となります。
2. Event Hubs の作成
Event Hubs の名前、およびパーティション数を指定します。Message Retention は受信したデータを Event Hubs に保持する期間であり、今回はデフォルトの1日のままにしておきます。
3. Access Policy の作成
Policy Name、および付与する権限 (ここでは Send ) を指定します。このポリシーは Fluentd から Azure Event Hubsにデータを送信する際に使用します。
4. 作成した Policy の接続文字列を確認
5. Fluentd の出力設定
/etc/td-agent/td-agent.conf に以下を追加
<match <tagname> > @type azureeventhubs_buffered connection_string Endpoint=sb:// <namespace name> .servicebus.windows.net/;SharedAccessKeyName= <policy name> ;SharedAccessKey= <shared access key> hub_name <event hubs name> </match>
6. td-agent の再起動
systemctl restart td-agent.service
以上により、Fluentd の該当する source のデータが Event Hubs に流し込まれるようになります。Service Bus Explorer を使用して Event Hubs にデータが流し込まれているかを確認することもできます。
Stream Analytics
1. Azure Event Hubs の Access Policy の作成
Policy Name、および付与する権限 (ここでは Listen ) を指定します。このポリシーは Azure Stream Analytics から Azure Event Hubs にキューイングされたデータを抽出する際に使用します。
2. Job の作成
ジョブ名、リソースグループの名前、場所を指定し Stream Analytics のジョブを作成
3. 入力の作成
入力として、上記で作成したEvent Hubs、および Event Hubs のポリシー名を指定して入力を作成。
4. 出力の作成
今回の出力先は Azure Blob Storage なため、シンクとして Blob ストレージを選択し、Blog Storage の情報(ストレージアカウント、キー、コンテナー)を入力し、必要に応じて指定したコンテナー下にどのパスで保存するかも指定します。また、出力形式として今回は CSV 形式を選択して、出力を作成します。
5. クエリの作成
今回は上記で作成した入力 (Event Hubs) からデータを受け取り、そのまま出力 (Azure Blob Storage) にデータを出力する簡単なクエリを定義し、作成します。
6. ジョブの開始
作成したジョブを開始します。
以上により、Fluentd で収集したデータを Event Hubs –> Stream Analytics を介して Azure Blog Storage に保存するストリームが出来上がりました。実際に Fluentd で収集したログなどが Azure Blob Storage に保存されていくか Azure Storage Explorer などを利用してご確認ください。
Stream Analytics では出力先を Azure Blob Storage だけではなく、以下のような出力先を指定することが可能で、一つのジョブで複数出力先にデータを出力することも可能です。また、出力形式として Avro、CSV、JSON を指定することもできます。今回は簡易なジョブ(受け取ったデータをそのまま出力)を作成しましたが、5秒間隔の集計値を出力したり、データを変換したりといったジョブを SQL ライクな Stream Analytics クエリ言語を用いて定義することも可能です。
これまで、ログ収集のプラットフォームは複雑になりがちで、運用も工数を要することが多かったかと思いますが、Event Hubs、Stream Analytics を使用することにより、Fluentd の収集機構を活かしつつ、その裏のキューイング、処理、蓄積をシンプルなアーキテクチャで実現することが可能となります。
是非ご活用いただければと思います。