Airflow ログと Azure Monitor を統合する方法
この記事では、Azure Monitor で Microsoft Azure Data Manager for Energy インスタンスに関する Airflow ログの収集を始める方法について説明します。 この統合機能は、Airflow DAG (Directed Acyclic Graph: 有向非巡回グラフ) の実行エラーをデバッグするのに役立ちます。
前提条件
既存の Log Analytics ワークスペース。 このワークスペースは、Log Analytics ワークスペースで Kusto 照会言語 (KQL) クエリ エディターを使って Airflow ログのクエリを実行するために使われます。 役に立つリソース: Azure portal で Log Analytics ワークスペースを作成する。
既存の ストレージ アカウント: Airflow ログの JSON ダンプを格納するために使われます。 ストレージ アカウントは、Log Analytics ワークスペースと同じサブスクリプションに含まれている必要はありません。
ストレージ アカウントでログを収集するための診断設定の有効化
すべての Azure Data Manager for Energy インスタンスには、Azure Data Factory Workflow Orchestration Manager (Apache Airflow を使用) インスタンスが組み込まれています。 内部のトラブルシューティングとデバッグのために Airflow ログを収集します。 Airflow ログは、次の方法で Azure Monitor と統合できます。
- ストレージ アカウント
- Log Analytics ワークスペース
上の 2 つのオプションのいずれかを使ってログにアクセスするには、診断設定を作成する必要があります。 各診断設定には、3 つの基本的な部分があります。
部分 | 説明 |
---|---|
Name | これは診断ログの名前です。 ログごとに一意の名前を設定するようにします。 |
カテゴリ | 各送信先に送信するログのカテゴリ。 カテゴリのセットは、Azure サービスごとに異なります。 参照: サポートされているリソース ログのカテゴリ |
変換先 | ログを送信する 1 つ以上の送信先。 可能な同じ送信先のセットを、すべての Azure サービスで共有します。 各診断設定では、1 つまたは複数の送信先を定義できますが、特定の種類の送信先は 1 つだけです。 ストレージ アカウント、Event Hubs 名前空間、またはイベント ハブにする必要があります。 |
診断設定を設定するには、次の手順のようにします。
Microsoft Azure Data Manager for Energy の概要ページを開きます。
左側のパネルで [診断設定] を選びます
[診断設定を追加する] を選択します
[ログ] の下の [Airflow Task Logs] (Airflow タスク ログ) を選びます
[ストレージ アカウントへのアーカイブ] を選択します
ログをアーカイブするサブスクリプションとストレージ アカウントを確認します。
ストレージ アカウント内を移動して Airflow ログをダウンロードする
Airflow タスク ログをストレージ アカウントにアーカイブするための診断設定が作成されたら、ストレージ アカウントの [概要] ページに移動できます。 その後、左側のパネルの [ストレージ ブラウザー] を使って、調査する適切な JSON ファイルを見つけることができます。 年から月そして日へと移動して、異なるディレクトリ間を直感的に閲覧できます。
左側のパネルで使用できるコンテナー間を移動します。
右側の情報ペインを開きます。 ログ ファイルをローカル環境に保存するための [ダウンロード] ボタンが含まれています。
ダウンロードしたログは、任意のエディターで分析できます。
ログを Log Analytics ワークスペースと統合するための診断設定の有効化
Microsoft Azure Data Manager for Energy インスタンスの概要ページの左側のパネルにある [診断設定] を使って、Airflow ログと Log Analytics ワークスペースを統合できます。
Log Analytics ワークスペースに統合された Airflow ログの操作
Kusto 照会言語 (KQL) を使用して、Log Analytics ワークスペースから、収集された Airflow ログの目的のデータを取得します。 事前構築済みのサンプル クエリを Log Analytics ワークスペースに読み込むか、独自のクエリを作成できます。
事前構築済みクエリの読み込み: リソースのメニューからログを選択します。 Log Analytics の [クエリ] ウィンドウが開き、リソースの種類に対して事前構築済みのクエリが表示されます。 使用可能なクエリを参照します。 実行するものを見つけて、[実行] を選びます。 クエリがクエリ ウィンドウに追加されて、結果が返されます。
クエリ エディターでクエリを記述する: Log Analytics ワークスペースのクエリ エディターで、次のクエリのコピー、貼り付け、編集を行ったり、独自のクエリを KQL に書き込んだりすることができます。
サンプル クエリ
このクエリは、ERROR レベルのすべての Airflow ログを返します。 結果をフィルター処理するには、Azure Data Manager for Energy インスタンス名の where 句と DAG 実行の関連付け ID を追加 (コメント解除) します。
OEPAirFlowTask
| extend ResourceName = tostring(split(_ResourceId , '/')[-1])
// | where ResourceName == "<the name of ADME instance>" // to filter on resourceName replace <...> and uncomment line
// | where CorrelationId == "<DAG run's runId>" // to filter on correlationID replace <...> with correlationId (same as runId) - we have created a duplicate for to maintain consistency of column name across all services
| where LogLevel == "ERROR"
| project TimeGenerated, DagName, LogLevel, DagTaskName, CodePath, Content
このクエリでは、前述の Azure Data Manager for Energy リソース内のすべての DAG 実行とそれに対応する関連付け ID が一覧表示されます。
OEPAirFlowTask
| extend ResourceName = tostring(split(_ResourceId , '/')[-1])
// | where ResourceName == "<the name of ADME instance>" // to filter on resourceName replace <...> and uncomment line
| distinct DagName, CorrelationId // correlationId is same as runId - we have created a duplicate for consistency in search across logs of all services
| sort by DagName asc
Log Analytics ワークスペースで収集された Airflow ログを使用してマニフェスト インジェスト プロセスのトラブルシューティングを行うのに役立つドキュメントを追加しました。
次のステップ
リソース ログの収集を開始したので、ログ データで興味深いデータが特定されたときに事前に通知するログ クエリ アラートを作成します。