持続的オーケストレーション
Durable Functions は Azure Functions の拡張機能です。 オーケストレーター関数を使用すると、関数アプリ内の他の持続的関数の実行を調整できます。 オーケストレーター関数には次のような特性があります。
- オーケストレーター関数では、手続き型コードを使用して関数のワークフローが定義されています。 宣言型スキーマまたはデザイナーは必要ありません。
- オーケストレーター関数では、他の持続的関数を同期および非同期のどちらでも呼び出すことができます。 呼び出された関数からの出力を、ローカル変数に確実に保存できます。
- オーケストレーター関数は持続性と信頼性を備えています。 関数で "await" または "yield" が実行されると、実行の進行状況に自動的にチェックポイントが設定されます。 プロセスがリサイクルされるときや VM が再起動されるときに、ローカルの状態が失われることがありません。
- オーケストレーター関数は長時間実行される可能性があります。 "オーケストレーション インスタンス" の全体的な継続時間は、秒、日、月の単位になる場合、またはいつまでも終了しない場合があります。
この記事では、オーケストレーター関数の概要と、オーケストレーター関数によってアプリ開発のさまざまな課題を解決する方法について説明します。 Durable Functions アプリで使用可能な関数の種類をまだよく理解していない場合は、最初に Durable Functions の種類に関する記事をご覧ください。
オーケストレーション ID
オーケストレーションの各 "インスタンス" にはインスタンスの識別子があります ("インスタンス ID" とも呼ばれます)。 既定では、各インスタンス ID は自動生成される GUID です。 ただし、ユーザーが生成した文字列値をインスタンス ID にすることもできます。 各オーケストレーション インスタンス ID は、タスク ハブ内で一意である必要があります。
インスタンス ID に関するいくつかのルールを次に示します。
- インスタンス ID は、1 から 100 文字にする必要があります。
- インスタンス ID の先頭文字を
@
にすることはできません。 - インスタンス ID に、
/
、\
、#
、?
の各文字を含めることはできません。 - インスタンス ID に、制御文字を含めることはできません。
Note
一般に、可能な限り自動生成されたインスタンス ID を使用することをお勧めします。 ユーザー生成のインスタンス ID は、オーケストレーション インスタンスと、外部アプリケーション固有のエンティティ (注文書やドキュメントなど) との間に一対一のマッピングが存在するシナリオのためのものです。
また、文字制限ルールの実際の適用は、アプリで使用されているストレージ プロバイダーによって異なる場合があります。 正しい動作と互換性を確保するために、前に示したインスタンス ID 規則に従うことを強くお勧めします。
ほとんどのインスタンス管理操作で、オーケストレーション インスタンス ID は必須のパラメーターです。 また、トラブルシューティングや分析を目的とする Application Insights でのオーケストレーション追跡データによる検索などの診断にも重要です。 このため、生成されたインスタンス ID は、後で簡単に参照できる外部の場所 (データベースやアプリケーション ログなど) に保存することをお勧めします。
[信頼性]
オーケストレーター関数は、イベント ソーシング設計パターンを使用して、この関数の実行状態を確実に管理します。 Durable Task Framework では、オーケストレーションの現在の状態を直接格納する代わりに、追加専用ストアを使用して、関数オーケストレーションによって実行されるすべてのアクションが記録されます。 追加専用ストアには、実行時のすべての状態の "ダンプ" に比べて、多くのメリットがあります。 メリットには、パフォーマンス、スケーラビリティ、および応答性の向上が含まれます。 トランザクション データの最終的な一貫性、完全な監査証跡、および監査履歴も取得できます。 監査証跡では、信頼性の高い補正アクションがサポートされます。
Durable Functions では、イベント ソーシングが透過的に使用されます。 バックグラウンドで、オーケストレーター関数内の await
(C#) または yield
(JavaScript/Python) 演算子によって、オーケストレーター スレッドの制御が Durable Task Framework ディスパッチャーに戻ります。 Java の場合、特別な言語キーワードはありません。 代わりに、タスクで .await()
を呼び出すと、カスタム Throwable
を介してディスパッチャーに制御が戻されます。 次に、このディスパッチャーは、オーケストレーター関数がスケジュールした新しいアクション (1 つ以上の子関数を呼び出す、永続タイマーをスケジュールする、など) をストレージにコミットします。 透過コミット アクションは、追加専用ログと同様に、すべての新しいイベントをストレージに追加することで、オーケストレーション インスタンスの実行履歴を更新します。 同様に、コミット アクションでは、実際の作業をスケジュールするためにストレージにメッセージが作成されます。 この時点では、メモリからオーケストレーター関数をアンロードできます。 既定では、Durable Functions は Azure Storage を実行時状態ストアとして使用しますが、他のストレージ プロバイダーもサポートされています。
オーケストレーション関数にさらに処理すべき作業 (応答メッセージの受信や永続タイマーの終了など) が与えられると、オーケストレーターが起動し、関数全体が始めから再実行され、ローカルの状態が再構築されます。 このリプレイ中にコードが関数を呼び出そうとする (または他の非同期作業を行おうとする) と、Durable Task Framework によって現在のオーケストレーションの実行履歴が確認されます。 アクティビティ関数が既に実行され、いくつかの結果が生成されていることがわかった場合は、関数の結果をリプレイし、オーケストレーター コードの実行を続行します。 リプレイは、関数コードが完了するまで、または新しい非同期作業がスケジュールされるまで続行されます。
Note
リプレイ パターンが正常かつ確実に動作するためには、オーケストレーター関数のコードが "決定論的" である必要があります。 非決定論的オーケストレーター コードでは、ランタイム エラーや他の予期しない動作が発生する可能性があります。 オーケストレーター関数のコードの制限について詳しくは、オーケストレーター関数のコードの制約に関するトピックをご覧ください。
Note
オーケストレーター関数によってログ メッセージを出力される場合、リプレイ動作によって重複するログ メッセージが生成される可能性があります。 この動作が発生する理由とそれを回避する方法の詳細については、ログ記録に関するセクションをご覧ください。
オーケストレーションの履歴
Durable Task Framework のイベント ソーシング動作は、記述したオーケストレーター関数のコードと密接に結び付けられています。 次のオーケストレーター関数のような、アクティビティ チェーン オーケストレーター関数があるものとします。
Note
Azure Functions の Node.js プログラミング モデルのバージョン 4 は一般提供されています。 新しい v4 モデルは、JavaScript と TypeScript の開発者にとって、より柔軟で直感的なエクスペリエンスが得られるように設計されています。 v3 と v4 の違いの詳細については、移行ガイドを参照してください。
次のコード スニペットでは、JavaScript (PM4) は、新しいエクスペリエンスであるプログラミング モデル V4 を示しています。
[FunctionName("HelloCities")]
public static async Task<List<string>> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var outputs = new List<string>();
outputs.Add(await context.CallActivityAsync<string>("SayHello", "Tokyo"));
outputs.Add(await context.CallActivityAsync<string>("SayHello", "Seattle"));
outputs.Add(await context.CallActivityAsync<string>("SayHello", "London"));
// returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
return outputs;
}
Durable Task Framework では、アクティビティ関数がスケジュール設定されるたびに、関数の実行状態のチェックポイントが、持続的なストレージ バックエンド (既定では Azure Table Storage) に記録されます。 この状態は、オーケストレーションの履歴と呼ばれます。
履歴テーブル
一般的に、Durable Task Framework は、チェックポイントごとに次の処理を実行します。
- 実行履歴を永続ストレージに保存します。
- オーケストレーターが起動する必要がある関数のメッセージをエンキューします。
- オーケストレーター自体のメッセージ (たとえば、持続的タイマー メッセージを) エンキューします。
チェックポイントが完了したら、さらに多くの作業が必要になるまで、オーケストレーター関数をメモリから自由に削除できるようになります。
Note
Azure Storage では、Table Storage へのデータ保存とキューへのデータ保存の間で、トランザクションが保証されません。 エラーを処理するために、Durable Functions Azure Storage プロバイダーでは最終的な一貫性パターンが使用されます。 こうしたパターンにより、チェックポイントの途中でクラッシュまたは接続の切断が発生しても、データが失われることはありません。 Durable Functions MSSQL ストレージ プロバイダーなど、代替のストレージ プロバイダーでは、より強力な整合性の保証が得られる場合があります。
完了すると、Azure Table Storage の上記の関数の履歴は次の表のようになります (わかりやすくするために一部省略されています)。
PartitionKey (InstanceId) | EventType | Timestamp | 入力 | 名前 | 結果 | Status |
---|---|---|---|---|---|---|
eaee885b | ExecutionStarted | 2021-05-05T18:45:28.852Z | null | HelloCities | ||
eaee885b | OrchestratorStarted | 2021-05-05T18:45:32.362Z | ||||
eaee885b | TaskScheduled | 2021-05-05T18:45:32.67Z | SayHello | |||
eaee885b | OrchestratorCompleted | 2021-05-05T18:45:32.67Z | ||||
eaee885b | TaskCompleted | 2021-05-05T18:45:34.201Z | """Hello Tokyo!""" | |||
eaee885b | OrchestratorStarted | 2021-05-05T18:45:34.232Z | ||||
eaee885b | TaskScheduled | 2021-05-05T18:45:34.435Z | SayHello | |||
eaee885b | OrchestratorCompleted | 2021-05-05T18:45:34.435Z | ||||
eaee885b | TaskCompleted | 2021-05-05T18:45:34.763Z | """Hello Seattle!""" | |||
eaee885b | OrchestratorStarted | 2021-05-05T18:45:34.857Z | ||||
eaee885b | TaskScheduled | 2021-05-05T18:45:34.857Z | SayHello | |||
eaee885b | OrchestratorCompleted | 2021-05-05T18:45:34.857Z | ||||
eaee885b | TaskCompleted | 2021-05-05T18:45:34.919Z | """Hello London!""" | |||
eaee885b | OrchestratorStarted | 2021-05-05T18:45:35.032Z | ||||
eaee885b | OrchestratorCompleted | 2021-05-05T18:45:35.044Z | ||||
eaee885b | ExecutionCompleted | 2021-05-05T18:45:35.044Z | "[""Hello Tokyo!"",""Hello Seattle!"",""Hello London!""]" | 完了 |
列の値に関する注意点:
- PartitionKey:オーケストレーションのインスタンス ID が含まれます。
- EventType:イベントの種類を表します。 すべての履歴イベントの種類の詳細については、こちらを参照してください。
- [タイムスタンプ] : 履歴イベントの UTC タイムスタンプ。
- Name:呼び出された関数の名前。
- 入力:関数の JSON 形式の入力。
- Result:関数の出力、つまり、戻り値。
警告
これはデバッグ ツールとしては便利ですが、このテーブルにあまり依存しないようにしてください。 Durable Functions 拡張機能の刷新に伴って変更される可能性があります。
タスクの完了を待機した後に関数が再開されるたびに、Durable Task Framework によってオーケストレーター関数が最初から再実行されます。 再実行のたびに、実行履歴が調べられて、現在の非同期タスクが完了しているかどうかが確認されます。 実行履歴にタスクが既に完了したことが示されている場合、フレームワークはそのタスクの出力を再生し、次のタスクに進みます。 実行履歴全体が再生されるまで、このプロセスが続けられます。 現在の実行履歴が再生されると、ローカル変数が以前の値に復元されます。
機能とパターン
次のセクションでは、オーケストレーター関数の機能とパターンについて説明します。
サブオーケストレーション
オーケストレーター関数では、アクティビティ関数を呼び出すことができますが、別のオーケストレーター関数を呼び出すこともできます。 たとえば、オーケストレーター関数のライブラリから大規模なオーケストレーションを作成できます。 つまり、オーケストレーター関数の複数のインスタンスを並列で実行できます。
詳細と例については、サブオーケストレーションに関する記事をご覧ください。
持続的タイマー
オーケストレーションでは、持続的タイマーをスケジュールして、遅延を実装したり、非同期アクションに対するタイムアウト処理を設定したりすることができます。 言語ネイティブの "sleep" API ではなく、オーケストレーター関数で持続的タイマーを使用します。
詳細と例については、持続的タイマーに関する記事をご覧ください。
外部イベント
オーケストレーター関数は、オーケストレーション インスタンスを更新するために、外部イベントを待ち受けできます。 Durable Functions のこの機能は、多くの場合、人による操作または他の外部コールバックを処理するときに便利です。
詳細と例については、外部イベントに関する記事をご覧ください。
エラー処理
オーケストレーター関数では、プログラミング言語のエラー処理機能を使用できます。 オーケストレーションのコードでは、try
/catch
のような既存のパターンがサポートされています。
オーケストレーター関数では、呼び出されるアクティビティまたはサブオーケストレーター関数に再試行ポリシーを追加することもできます。 アクティビティまたはサブオーケストレーター関数が例外で失敗した場合、指定した再試行ポリシーによって自動的に遅延し、指定した回数まで実行を再試行できます。
Note
オーケストレーター関数でハンドルされない例外が発生した場合、オーケストレーションのインスタンスは Failed
状態で完了します。 失敗したオーケストレーションのインスタンスを再試行することはできません。
詳細と例については、エラー処理に関する記事をご覧ください。
重要なセクション (Durable Functions 2.x、現在 .NET のみ)
オーケストレーションのインスタンスはシングルスレッドであるため、オーケストレーションの "内部" の競合状態について考慮する必要はありません。 ただし、オーケストレーションから外部システムとやりとりする場合、競合状態が発生する可能性があります。 外部システムと相互作用するときの競合状態を軽減するため、オーケストレーター関数では .NET の LockAsync
メソッドを使用して "クリティカル セクション" を定義できます。
次のサンプル コードで示すオーケストレーター関数では、クリティカル セクションが定義されています。 クリティカル セクションに入るには、LockAsync
メソッドを使用します。 このメソッドでは、持続的にロック状態を管理する持続エンティティへの 1 つ以上の参照を渡す必要があります。 クリティカル セクション内のコードを実行できるこのオーケストレーションのインスタンスは、一度に 1 つだけです。
[FunctionName("Synchronize")]
public static async Task Synchronize(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var lockId = new EntityId("LockEntity", "MyLockIdentifier");
using (await context.LockAsync(lockId))
{
// critical section - only one orchestration can enter at a time
}
}
LockAsync
では、持続的ロックが取得されて、破棄されたときにクリティカル セクションが終了する IDisposable
が返されます。 この IDisposable
の結果を using
ブロックと共に使用して、クリティカル セクションの構文表現を取得できます。 オーケストレーター関数がクリティカル セクションに入ると、1 つのインスタンスだけがそのコード ブロックを実行できます。 他のインスタンスがそのクリティカル セクションに入ろうとしても、前のインスタンスがそのクリティカル セクションを終了するまでブロックされます。
クリティカル セクションの機能は、持続性エンティティに対する変更を調整する場合にも役立ちます。 クリティカル セクションについて詳しくは、持続性エンティティの "エンティティ調整" に関するトピックをご覧ください。
Note
クリティカル セクションは Durable Functions 2.0 で使用できます。 現在、この機能が実装されているのは .NET インプロセス オーケストレーションだけです。 エンティティとクリティカル セクションは、dotnet 分離ワーカーの Durable Functions ではまだ使用できません。
HTTP エンドポイントの呼び出し (Durable Functions 2.x)
オーケストレーター関数のコードの制約に関する記事で説明されているように、オーケストレーター関数は I/O を行うことを許可されていません。 この制限に対する一般的な回避策は、I/O を行う必要があるすべてのコードをアクティビティ関数内にラップすることです。 外部システムとのやりとりが頻繁に行われるオーケストレーションでは、アクティビティ関数を使用して、HTTP の呼び出しを行い、その結果をオーケストレーションに返します。
この一般的なパターンを簡略化するため、オーケストレーター関数では、CallHttpAsync
メソッドを使用して、HTTP API を直接呼び出すことができます。
[FunctionName("CheckSiteAvailable")]
public static async Task CheckSiteAvailable(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
Uri url = context.GetInput<Uri>();
// Makes an HTTP GET request to the specified endpoint
DurableHttpResponse response =
await context.CallHttpAsync(HttpMethod.Get, url);
if ((int)response.StatusCode == 400)
{
// handling of error codes goes here
}
}
基本的な要求と応答のパターンのサポートに加えて、このメソッドでは、一般的な非同期 HTTP 202 ポーリング パターンの自動処理がサポートされており、マネージド ID を使用した外部サービスによる認証もサポートされています。
詳細と例については、HTTP 機能に関する記事をご覧ください。
Note
オーケストレーター関数からの HTTP エンドポイントの直接呼び出しは、Durable Functions 2.0 以降で使用できます。
複数のパラメーターを渡す
アクティビティ関数に複数のパラメーターを直接渡すことはできません。 オブジェクトの配列または複合オブジェクトを渡すことをお勧めします。
.NET では、ValueTuple オブジェクトを使用することもできます。 次の例では、C# 7 で追加された ValueTuple の新機能を使用しています。
[FunctionName("GetCourseRecommendations")]
public static async Task<object> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string major = "ComputerScience";
int universityYear = context.GetInput<int>();
object courseRecommendations = await context.CallActivityAsync<object>(
"CourseRecommendations",
(major, universityYear));
return courseRecommendations;
}
[FunctionName("CourseRecommendations")]
public static async Task<object> Mapper([ActivityTrigger] IDurableActivityContext inputs)
{
// parse input for student's major and year in university
(string Major, int UniversityYear) studentInfo = inputs.GetInput<(string, int)>();
// retrieve and return course recommendations by major and university year
return new
{
major = studentInfo.Major,
universityYear = studentInfo.UniversityYear,
recommendedCourses = new []
{
"Introduction to .NET Programming",
"Introduction to Linux",
"Becoming an Entrepreneur"
}
};
}