一部のアプリケーションでは、ストリーム処理アプローチが必要です (Azure Stream Analytics によってなど)。しかし、厳密に継続的に実行する必要はありません。 次のような理由が考えられます:
- スケジュールに従って到着する入力データ (たとえば、毎時 0 分)
- 受信データがまばらまたは少量 (1 分あたりのレコード数が少ない)
- 時間枠機能の利点を活用しているが、本質的にはバッチで実行されるビジネス プロセス (財務や人事など)
- 小規模で実行時間の長いジョブを使用するデモンストレーション、プロトタイプ、またはテスト
これらのジョブを継続的に実行しないことによる利点はコスト削減です。これは、Stream Analytics ジョブは経過時間におけるストリーミング ユニット単位で請求されるからです。
この記事では、Azure Stream Analytics ジョブの自動一時停止を設定する方法について説明します。 スケジュールに基づいてジョブを自動一時停止および再開するタスクを構成します。 "一時停止" という用語は、ジョブの状態が「停止」であり、課金が回避されていることを意味します。
この記事では、全体的な設計、必要なコンポーネント、およびいくつかの実装の詳細について説明します。
注
ジョブを自動的に一時停止することには短所があります。 主なものは、低遅延機能およびリアルタイム機能が失われることと、ジョブの一時停止中に、入力イベントのバックログが管理されていない状態で増大するリスクの可能性です。 組織において大規模に実行されるほとんどの運用シナリオでは、自動一時停止は検討しないでください。
デザイン
この記事の例では、ジョブを N 分間実行してから、 M 分間停止します。 ジョブが一時停止されると、入力データは消費されず、アップストリームに蓄積されます。 ジョブが開始されると、そのバックログに追いつき、徐々に届くデータが処理され、そして再びシャットダウンされます。
ジョブ実行中は、タスクはメトリックが正常になるまでそのジョブを停止すべきではありません。 注目するメトリックは、入力バックログと基準値です。 少なくとも N 分間、両方がベースラインにあることを確認します。 この動作は、次の 2 つのアクションに変換されます:
- 停止したジョブは、M 分後に再起動されます。
- 実行中のジョブは、バックログと基準値のメトリックが正常になるとすぐに、N 分後にいつでも停止されます。
たとえば、N = 5 分、M = 10 分の場合を考えてみましょう。 これらの設定の場合、15 分間に受信したデータをジョブで処理するために少なくとも 5 分あります。 潜在的なコスト削減は最大 66% です。
ジョブを再起動するには、[最後に停止したとき] の開始オプションを使用します。 これは、ジョブが停止した以降にアップストリームでバックログに記録された、すべてのイベントを処理するよう Stream Analytics に指示するオプションです。
この状況には 2 つの注意点があります。 まず、入力ストリームの保有期間より長くジョブを停止状態に保つことはできません。 1 日に 1 回だけジョブを実行する場合は、イベントの保有期間を 1 日より長具する必要があります。 2 つ目としては、[最後に停止したとき] モードが受理されるためには、それまでにジョブが少なくとも 1 回は開始されている必要があります (そうでないと、文字どおり、これまで一度も停止したことがないことになるため)。 そのため、ジョブの最初の実行は手動である必要があります。または、そのケースに対応するようにスクリプトを拡張する必要があります。
最後の考慮事項は、これらのアクションをべき等になるように設定することです。 そうすることで、副作用なしでそれらを繰り返すことができ、使いやすさと回復性の両方が得られます。
コンポーネント
API 呼び出し
この記事では、次の点で Stream Analytics を操作する必要があります:
- 以下について現在のジョブの状態を取得します (Stream Analytics リソース管理)。
- ジョブが実行されている場合、
- ジョブが開始されてからの時間を取得する (ログ)。
- 現在のメトリック値を取得する (メトリック)。
- 該当する場合に、ジョブを停止する (Stream Analytics リソース管理)。
- ジョブが停止している場合、
- ジョブが停止してからの時間を取得する (ログ)。
- 該当する場合は、ジョブを開始する (Stream Analytics リソース管理)。
- ジョブが実行されている場合、
Stream Analytics リソース管理には、REST API、.NET SDK、またはいずれかの CLI ライブラリ (Azure CLIまたは PowerShell) を使用できます。
メトリックとログの場合、Azure 内のすべてが Azure Monitor の下に一元化され、同様の API サーフェスが選択できます。 API に対してクエリを実行すると、ログとメトリックには常に 1 分から 3 分の遅れがあります。 そのため、N を 5 に設定すると、通常、ジョブは実際には 6 - 8 分間実行されます。
もう 1 つの考慮事項は、メトリックは常に出力されているということです。 ジョブが停止すると、API は空のレコードを返します。 関連する値に焦点を当てるには、API 呼び出しの出力をクリーンアップする必要があります。
スクリプト言語
この記事では、PowerShellで自動一時停止を実装します。 この選択の第一の理由は、PowerShell がクロスプラットフォームになった点です。 任意のオペレーティング システムで実行できるため、デプロイが容易になります。 2 つ目の理由は、文字列ではなくオブジェクトを受け取って返す点です。 オブジェクトを使用すると、オートメーション タスクの解析と処理が簡単になります。
PowerShell では、Az PowerShell モジュールを使用します。これは、必要なすべてのものに対して Az.Monitor と Az.StreamAnalytics を開始します:
- 現在のジョブの状態を取得する Get-AzStreamAnalyticsJob
- Start-AzStreamAnalyticsJob または Stop-AzStreamAnalyticsJob
-
を使用する
InputEventsSourcesBacklogged(Stream Analytics メトリック より) -
で始まるイベント名に対する
Stop Job
ホスティング サービス
PowerShell タスクをホストするには、スケジュール実行を提供するサービスが必要です。 多くのオプションがありますが、サーバーレスオプションを2 つ次に示します。
- Azure Functions (ほぼすべてのコードを実行できるサーバーレス コンピューティング エンジン)。 毎秒までの精度で実行できるタイマー トリガーが用意されています。
- Azure Automation (クラウドのワークロードとリソースを運用するために構築されたマネージド サービス)。 その目的は適切ですが、最小のスケジュール間隔は 1 時間です (回避策を取ることで少なくできます)。
回避策が苦にならない場合は、Azure Automation の方がタスクをデプロイするための簡単な方法です。 ただし、この記事では、比較できるように、まずローカル スクリプトを記述します。 関数スクリプトを作成したら、Functions と Automation アカウントの両方にデプロイします。
開発者用ツール
Functions と Stream Analytics の両方について、Visual Studio Code を使用したローカル開発を強くお勧めします。 ローカル開発環境を使用すると、ソース コード管理を使用でき、デプロイを簡単に繰り返すことができます。 しかし、わかりやすくするために、この記事では、Azure portal でのプロセスを示します。
PowerShell スクリプトをローカルに記述する
スクリプトを開発する最良の方法は、ローカルで行うことです。 PowerShell はクロスプラットフォームであるため、任意のオペレーティング システムでスクリプトを記述してテストできます。 Windows の場合、Windows ターミナル と PowerShell 7、および Azure PowerShell を使用できます。
この記事で使用する最終的なスクリプトは、Azure Functions および Azure Automation で使用可能です。 ホスティング環境 (Functions または Automation) に固定化されている点で、次のスクリプトとは異なります。 この記事では、その側面については後ほど説明します。 まず、ローカルでのみ実行されるスクリプトのバージョンをステップ実行します。
このスクリプトは意図的に単純な形式で記述されているため、誰もが理解できます。
冒頭で必要なパラメーターを設定し、最初のジョブの状態を確認します。
# Setting variables
$restartThresholdMinute = 10 # This is M
$stopThresholdMinute = 5 # This is N
$maxInputBacklog = 0 # The amount of backlog you tolerate when stopping the job (in event count, 0 is a good starting point)
$maxWatermark = 10 # The amount of watermark you tolerate when stopping the job (in seconds, 10 is a good starting point at low Streaming Units)
$subscriptionId = "<Replace with your Subscription Id - not the name>"
$resourceGroupName = "<Replace with your Resource Group Name>"
$asaJobName = "<Replace with your Stream Analytics job name>"
$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName )/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"
# If not already logged, uncomment and run the two following commands:
# Connect-AzAccount
# Set-AzContext -SubscriptionId $subscriptionId
# Check current Stream Analytics job status
$currentJobState = Get-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
Write-Output "asaRobotPause - Job $($asaJobName) is $($currentJobState)."
ジョブが実行中の場合は、ジョブが少なくとも N 分実行されたかどうかを確認します。 また、バックログとその基準値も確認します。
# Switch state
if ($currentJobState -eq "Running")
{
# First, look up the job start time with Get-AzActivityLog
## Get-AzActivityLog issues warnings about deprecation coming in future releases. Here you ignore them via -WarningAction Ignore.
## You check in 1,000 records of history, to make sure you're not missing what you're looking for. It might need adjustment for a job that has a lot of logging happening.
## There's a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). So you move it down.
$startTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Start Job*"}
$startTimeStamp = $startTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}
# Then gather the current metric values
## Get-AzMetric issues warnings about deprecation coming in future releases. Here you ignore them via -WarningAction Ignore.
$currentBacklog = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "InputEventsSourcesBacklogged" -DetailedOutput -WarningAction Ignore
$currentWatermark = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "OutputWatermarkDelaySeconds" -DetailedOutput -WarningAction Ignore
# Metrics are always lagging 1-3 minutes behind, so grabbing the last N minutes actually means checking N+3. This might be overly safe and can be fine-tuned down per job.
$Backlog = $currentBacklog.Data |
Where-Object {$_.Maximum -ge 0} | # Remove the empty records (when the job is stopped or starting)
Sort-Object -Property Timestamp -Descending |
Where-Object {$_.Timestamp -ge $startTimeStamp} | # Keep only the records of the latest run
Select-Object -First $stopThresholdMinute | # Take the last N records
Measure-Object -Sum Maximum # Sum over those N records
$BacklogSum = $Backlog.Sum
$Watermark = $currentWatermark.Data |
Where-Object {$_.Maximum -ge 0} |
Sort-Object -Property Timestamp -Descending |
Where-Object {$_.Timestamp -ge $startTimeStamp} |
Select-Object -First $stopThresholdMinute |
Measure-Object -Average Maximum # Here you average
$WatermarkAvg = [int]$Watermark.Average # Rounding the decimal value and casting it to integer
# Because you called Get-AzMetric with a TimeGrain of a minute, counting the number of records gives you the duration in minutes
Write-Output "asaRobotPause - Job $($asaJobName) is running since $($startTimeStamp) with a sum of $($BacklogSum) backlogged events, and an average watermark of $($WatermarkAvg) sec, for $($Watermark.Count) minutes."
# -le for lesser or equal, -ge for greater or equal
if (
($BacklogSum -ge 0) -and ($BacklogSum -le $maxInputBacklog) -and ` # is not null and is under the threshold
($WatermarkAvg -ge 0) -and ($WatermarkAvg -le $maxWatermark) -and ` # is not null and is under the threshold
($Watermark.Count -ge $stopThresholdMinute) # at least N values
)
{
Write-Output "asaRobotPause - Job $($asaJobName) is stopping..."
Stop-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName
}
else {
Write-Output "asaRobotPause - Job $($asaJobName) is not stopping yet, it needs to have less than $($maxInputBacklog) backlogged events and under $($maxWatermark) sec watermark for at least $($stopThresholdMinute) minutes."
}
}
ジョブが停止している場合は、ログを調べて、最後の Stop Job アクションがいつ発生したかを確認します。
elseif ($currentJobState -eq "Stopped")
{
# First, look up the job start time with Get-AzActivityLog
## Get-AzActivityLog issues warnings about deprecation coming in future releases. Here you ignore them via -WarningAction Ignore.
## You check in 1,000 records of history, to make sure you're not missing what you're looking for. It might need adjustment for a job that has a lot of logging happening.
## There's a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). So you move it down.
$stopTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Stop Job*"}
$stopTimeStamp = $stopTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}
# Get-Date returns a local time. You project it to the same time zone (universal) as the result of Get-AzActivityLog that you extracted earlier.
$minutesSinceStopped = ((Get-Date).ToUniversalTime()- $stopTimeStamp).TotalMinutes
# -ge for greater or equal
if ($minutesSinceStopped -ge $restartThresholdMinute)
{
Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it is now starting..."
Start-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName -OutputStartMode LastOutputEventTime
}
else{
Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it will not be restarted yet."
}
}
else {
Write-Output "asaRobotPause - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
}
最後に、ジョブの完了をログに記録します。
# Final Stream Analytics job status check
$newJobState = Get-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
Write-Output "asaRobotPause - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
オプション 1: Azure Functions でタスクをホストする
参考として、Azure Functions チームによって包括的な PowerShell 開発者ガイドが管理されています。
まず、新しい "関数アプリ" が必要です。 関数アプリは、複数の関数をホストできるソリューションに似ています。
完全な手順はこちらで確認できますが、要点としては、Azure portal にアクセスし、以下を使用して新しい関数アプリを作成します:
- 発行: コード
- ランタイム: PowerShell Core
- バージョン: 7 以上
関数アプリをプロビジョニングしたら、その全体的な構成から始めます。
Azure Functions 用のマネージド ID
この関数には、Stream Analytics ジョブを開始および停止するためのアクセス許可が必要です。 これらのアクセス許可は、マネージド ID を使用して割り当てます。
最初の手順では、システム割り当てマネージド ID を関数に対して有効にします。そのためには、こちらの手順に従います。
これで、自動的に一時停止させたい Stream Analytics ジョブについての適切なアクセス許可を、その ID に付与できます。 このタスクでは、Stream Analytics ジョブ (関数ではなく) のポータルにある [アクセス制御 (IAM)] で、マネージド ID 型のメンバーに対する共同作成者ロールの割り当てを追加します。 前の関数の名前を選択します。
PowerShell スクリプトでは、マネージド ID が正しく設定されていることを確認するチェックを追加できます。 (最終的なスクリプトは GitHub で入手できます。)
# Check if a managed identity has been enabled and granted access to a subscription, resource group, or resource
$AzContext = Get-AzContext -ErrorAction SilentlyContinue
if (-not $AzContext.Subscription.Id)
{
Throw ("Managed identity is not enabled for this app or it has not been granted access to any Azure resources. Please see /azure/app-service/overview-managed-identity for additional details.")
}
関数が起動していることを確認するために、いくつかのログ情報を追加します:
$currentUTCtime = (Get-Date).ToUniversalTime()
# Write an information log with the current time.
Write-Host "asaRobotPause - PowerShell timer trigger function is starting at time: $currentUTCtime"
Azure Functions のパラメーター
Functions でスクリプトにパラメーターを渡す最良の方法は、関数アプリのアプリケーション設定を環境変数として使用することです。
最初の手順では、関数アプリのページにあるこの手順に従って、パラメーターを [アプリ設定] として定義します。 必要なもの:
| 名前 | 値 |
|---|---|
maxInputBacklog |
ジョブを停止するときに許容するバックログの量。 イベント数を単位とし、最初の値としては 0 が適しています。 |
maxWatermark |
ジョブを停止するときに許容する基準値。 秒単位であり、低ストリーミング ユニットでは 10 が最初の値として適しています。 |
restartThresholdMinute |
M: 停止したジョブが再開されるまでの時間 (分)。 |
stopThresholdMinute |
N: 実行中のジョブが停止するまでのクールダウンの時間 (分)。 入力バックログは、その間 0 で維持する必要があります。 |
subscriptionId |
自動一時停止される Stream Analytics ジョブのサブスクリプション ID (サブスクリプション名前ではありません)。 |
resourceGroupName |
自動一時停止される Stream Analytics ジョブのリソース グループ名。 |
asaJobName |
自動一時停止される Stream Analytics ジョブの名前。 |
次に、これらの変数を読み込むように PowerShell スクリプトを更新します:
$maxInputBacklog = $env:maxInputBacklog
$maxWatermark = $env:maxWatermark
$restartThresholdMinute = $env:restartThresholdMinute
$stopThresholdMinute = $env:stopThresholdMinute
$subscriptionId = $env:subscriptionId
$resourceGroupName = $env:resourceGroupName
$asaJobName = $env:asaJobName
PowerShell モジュールの要件
Stream Analytics コマンド (Start-AzStreamAnalyticsJob など) を使用するために Azure PowerShell をローカルにインストールする必要があったのと同じように、関数アプリ ホストに追加する必要があります:
- 関数アプリのページの [Functions] で、[アプリ ファイル] を選択し、requirements.psd1 を選択します。
-
'Az' = '6.*'行のコメントを解除します。 - その変更を有効にするには、アプリを再起動します。
関数の作成
すべての構成が完了したら、関数アプリ内に特定の関数を作成してスクリプトを実行できます。
ポータルで、タイマーでトリガーされる関数を開発します。 関数が 0 */1 * * * * で毎分トリガーされ、"毎分 0 秒に" 読み取りを行うことを確認します。
必要に応じて、スケジュールを更新して [統合] のタイマー値を変更できます。
次に、[コードとテスト] で、"run.ps1" のスクリプトをコピーしてテストできます。 または、GitHub から完全なスクリプトをコピーすることもできます。 処理中に何かが失敗した場合に適切なエラーを生成するために、ビジネス ロジックは try/catch ステートメントに移動されました。
[コードとテスト] ウィンドウで [テスト/実行] を選択して、すべてが正常に実行されることを確認できます。 [監視] ウィンドウを確認することもできますが、常に実行 2 回程度遅れています。
関数の実行に関するアラートの設定
最後に、関数が正常に実行されなかった場合は、アラートを介して通知を受け取ります。 アラートにはわずかなコストがかかりますが、よりコストを必要とする状況を防止できる可能性があります。
関数アプリのページの [ログ] で、次のクエリを実行します。 過去 5 分間に失敗したすべての実行が返されます。
requests
| where success == false
| where timestamp > ago(5min)
| summarize failedCount=sum(itemCount) by operation_Name
| order by failedCount desc
クエリ エディターで、[新しい警告ルール] を選択します。 ウィンドウが開いたら、[測定] を次のように定義します。
- メジャー: failedCount
- 集計の種類: 合計
- 集計の粒度: 5 分
次に、[アラート ロジック] を次のように設定します:
- 演算子: より大きい
- しきい値: 0
- 評価の頻度: 5 分
そこから、新しいアクション グループを再利用または作成します。 そして、構成を完了します。
アラートが正しく設定されたことを確認するには、PowerShell スクリプトの任意の場所に throw "Testing the alert" を追加してから、メールを受信するために 5 分待ちます。
オプション 2: Azure Automation でタスクをホストする
まず、新しい Automation アカウントが必要です。 Automation アカウントは、複数の Runbook をホストできるソリューションに似ています。
手順については、「Azure portal を使用して Automation アカウントを作成する」クイック スタートを参照してください。 システム割り当てマネージド ID は、[詳細] タブで直接使用できます。
参考までに、Automation チームには、PowerShell Runbook の使用を開始するためのチュートリアル があります。
Azure Automation のパラメーター
Runbook では、PowerShell の従来のパラメーター構文を使用して引数を渡すことができます:
Param(
[string]$subscriptionId,
[string]$resourceGroupName,
[string]$asaJobName,
[int]$restartThresholdMinute,
[int]$stopThresholdMinute,
[int]$maxInputBacklog,
[int]$maxWatermark
)
Azure Automation のマネージド ID
Automation アカウントは、プロビジョニング時にマネージド ID を受け取っているはずです。 ただし、必要に応じて、この手順を使用してマネージド ID を有効にすることができます。
関数の場合と同様に、自動的に一時停止させる Stream Analytics ジョブに対する適切なアクセス許可を付与する必要があります。
アクセス許可を付与するには、(Automation ページではなく) Stream Analytics ジョブのポータルにある [アクセス制御 (IAM)]で、マネージド ID 型のメンバーに対する共同作成者 ロールの割り当てを追加します。 前の Automation アカウントの名前を選択します。
PowerShell スクリプトでは、マネージド ID が正しく設定されていることを確認するチェックを追加できます。 (最終的なスクリプトは GitHub で入手できます。)
# Ensure that you don't inherit an AzContext in your runbook
Disable-AzContextAutosave -Scope Process | Out-Null
# Connect by using a managed service identity
try {
$AzureContext = (Connect-AzAccount -Identity).context
}
catch{
Write-Output "There is no system-assigned user identity. Aborting.";
exit
}
Runbook を作成しています
構成が完了したら、Automation アカウント内に特定の Runbook を作成してスクリプトを実行できます。 ここでは、要件として Azure PowerShell を追加する必要はありません。 既に組み込まれています。
ポータルの [プロセス オートメーション] の [Runbook] を選択します。 次に、[Runbook の作成] を選択し、Runbook の種類として [PowerShell] を選択し、バージョンとして 7 より上の任意のバージョン (現時点では [7.1 (プレビュー)]) を選択します。
これで、スクリプトを貼り付けてテストできます。 GitHub から完全なスクリプトをコピーできます。 処理中に何かが失敗した場合に適切なエラーを生成するために、ビジネス ロジックは try/catch ステートメントに移動されました。
[テスト ペイン] で、すべてが正しく作成されていることを確認できます。
その後、Runbook をスケジュールにリンクできるように、ジョブを発行する ([発行]を選択して) 必要があります。 スケジュールの作成とリンクは簡単なプロセスです。 今こそ、1 時間より短いスケジュール間隔を実現するための回避策があることを思い出してください。
最後に、アラートを設定できます。 最初の手順では、Automation アカウントの診断設定を使用してログを有効にします。 2 番目の手順は、Functions の場合と同様に、クエリを使用してエラーを見つけることです。
結果
Stream Analytics ジョブでは、すべてが想定どおりに実行されていることを 2 か所で確認できます。
アクティビティ ログを次に示します:
メトリックは次のとおりです:
スクリプトを理解したら、スコープを拡張するためにスクリプトを再作業するのは簡単な作業です。 1 つのジョブではなく、ジョブの一覧を対象とするようにスクリプトを簡単に更新できます。 タグ、リソース グループ、またはサブスクリプション全体を使用して、より大きなスコープを定義して処理できます。
サポートを受ける
詳細については、「Azure Stream Analytics に関する Microsoft Q&A のページ」を参照してください。
次の手順
PowerShell を使用して Azure Stream Analytics ジョブの管理を自動化する方法の基本を学習しました。 詳細については、以下の記事をお読みください: