Durable Functions におけるタスク ハブ (Azure Functions)

Durable Functions の "タスク ハブ" は、保留中のすべての作業を含め、ストレージ内にあるアプリケーションの現在の状態を表すものです。 関数アプリの実行中、オーケストレーション、アクティビティ、エンティティ関数の進行状況は、タスク ハブに継続的に格納されます。 これにより、アプリケーションが何らかの理由で一時的に停止または中断した後、再起動が必要になった場合でも、中断したところから処理を再開することができます。 また、関数アプリを使ってコンピューティング worker を動的にスケーリングすることもできます。

関数アプリの概念とタスク ハブの概念を示す図。

概念的には、タスク ハブには次の情報が格納されます。

  • すべてのオーケストレーションとエンティティ インスタンスのインスタンス状態
  • 以下を含む、処理されるメッセージ
    • 実行を待機しているアクティビティを表すアクティビティ メッセージ
    • インスタンスへの配信を待機している任意のインスタンス メッセージ

アクティビティ メッセージとインスタンス メッセージの違いは、アクティビティ メッセージはステートレスなので、どこでも処理できるのに対し、インスタンス メッセージはインスタンス ID で識別される特定のステートフル インスタンス (オーケストレーションまたはエンティティ) に配信する必要がある点です。

内部的には、各ストレージ プロバイダーは、インスタンスの状態とメッセージを表現するために異なる組織を使うことがあります。 たとえば、Azure Storage プロバイダーの場合は Azure Storage Queues に、MSSQL プロバイダーの場合はリレーショナル テーブルにメッセージを格納します。 これらの違いは、アプリケーションの設計に関する限り重要ではありませんが、パフォーマンス特性に影響するものもあります。 この点については、後述する「ストレージでの表現」で説明します。

作業項目

タスク ハブのアクティビティ メッセージとインスタンス メッセージは、関数アプリが処理する必要がある作業を表しています。 関数アプリは、実行中に、タスク ハブから "作業項目" を継続的にフェッチします。 各作業項目は 1 つ以上のメッセージを処理しています。 2 種類の作業項目があります。

  • アクティビティ作業項目: アクティビティ関数を実行してアクティビティ メッセージを処理します。
  • オーケストレーター作業項目: オーケストレーターまたはエンティティ関数を実行して 1 つ以上のインスタンス メッセージを処理します。

worker は、構成された worker ごとのコンカレンシー制限に従って、複数の作業項目を同時に処理できます。

worker は作業項目を完了すると、その効果をタスク ハブにコミットします。 これらの効果は、実行された関数の種類によって異なります。

  • アクティビティ関数が完了すると、結果を含むインスタンス メッセージが作成され、親オーケストレーター インスタンスに送信されます。
  • オーケストレーター関数を使って、オーケストレーションの状態と履歴を更新します。新しいメッセージを作成することもできます。
  • 完了したエンティティ関数を使って、エンティティの状態を更新します。新しいインスタンス メッセージを作成することもできます。

オーケストレーションの場合、各作業項目はそのオーケストレーションの実行の 1 つのエピソードを表します。 オーケストレーターが処理する新しいメッセージが生成されると、エピソードが開始されます。 このようなメッセージは、オーケストレーションを開始する必要があることを示す場合、アクティビティ、エンティティ呼び出し、タイマー、またはサブオーケストレーションが完了したことを示す場合、外部イベントを表す場合があります。 このメッセージによって、オーケストレーターが結果を処理し、次のエピソードを続行する作業項目がトリガーされます。 そのエピソードは、オーケストレーターが完了するか、新しいメッセージを待機する必要があるポイントに到達した時点で終了します。

実行例

ファンアウトファンイン オーケストレーションについて考えてみましょう。2 つのアクティビティを並行して開始し、その両方が完了するまで待機するというものです。

[FunctionName("Example")]
public static async Task Run([OrchestrationTrigger] IDurableOrchestrationContext context)
{
    Task t1 = context.CallActivityAsync<int>("MyActivity", 1);
    Task t2 = context.CallActivityAsync<int>("MyActivity", 2);
    await Task.WhenAll(t1, t2);
}

このオーケストレーションは、クライアントによって開始された後、関数アプリによって一連の作業項目として処理されます。 完了した各作業項目によって、コミット時にタスク ハブの状態が更新されます。 手順は次のとおりです。

  1. あるクライアントが、インスタンス ID "123" の新しいオーケストレーションを開始するよう要求します。 クライアントがこの要求を完了すると、タスク ハブにはオーケストレーション状態のプレースホルダーとインスタンス メッセージが格納されます。

    workitems-illustration-step-1

    ラベル ExecutionStarted は、オーケストレーションの履歴に参加するさまざまなメッセージとイベントを識別する、多数ある履歴イベント種類の 1 つです。

  2. worker は "オーケストレーター作業項目" を実行して ExecutionStarted メッセージを処理します。 オーケストレーション コードの実行を開始するオーケストレーター関数を呼び出します。 このコードを使って、2 つのアクティビティをスケジュールし、結果を待機している間は実行を停止します。 worker がこの作業項目をコミットすると、タスク ハブのコンテンツは次のようになります

    workitems-illustration-step-2

    ランタイム状態は Running になり、2 つの新しい TaskScheduled メッセージが追加され、履歴には 5 つのイベント OrchestratorStartedExecutionStartedTaskScheduledTaskScheduledOrchestratorCompleted が含まれるようになりました。 これらのイベントは、このオーケストレーションの実行の最初のエピソードを表しています。

  3. worker は "アクティビティ作業項目" を実行し、TaskScheduled メッセージの 1 つを処理します。 入力 "2" を指定してアクティビティ関数を呼び出します。 アクティビティ関数が完了すると、その結果を含む TaskCompleted メッセージが作成されます。 worker がこの作業項目をコミットすると、タスク ハブのコンテンツは次のようになります

    workitems-illustration-step-3

  4. worker は "オーケストレーター作業項目" を実行して TaskCompleted メッセージを処理します。 オーケストレーションがまだメモリにキャッシュされている場合は、そのまま実行を再開できます。 そうでなければ、worker はまず履歴を再生してオーケストレーションの現在の状態を復旧します。 次にオーケストレーションを継続し、アクティビティの結果を配信します。 この結果を受け取った後も、オーケストレーションはまだ他のアクティビティの結果を待機しているので、もう一度実行を停止します。 worker がこの作業項目をコミットすると、タスク ハブのコンテンツは次のようになります

    workitems-illustration-step-4

    オーケストレーションの履歴には、さらに 3 つのイベント OrchestratorStartedTaskCompletedOrchestratorCompleted が含まれています。 これらのイベントは、このオーケストレーションの実行の 2 つ目のエピソードを表しています。

  5. worker は "アクティビティ作業項目" を実行し、残りの TaskScheduled メッセージを処理します。 入力 "1" を指定してアクティビティ関数を呼び出します。 worker がこの作業項目をコミットすると、タスク ハブのコンテンツは次のようになります

    workitems-illustration-step-5

  6. worker はもう 1 つの "オーケストレーター作業項目" を実行して TaskCompleted メッセージを処理します。 この 2 つ目の結果を受け取った後、オーケストレーションは完了します。 worker がこの作業項目をコミットすると、タスク ハブのコンテンツは次のようになります

    workitems-illustration-step-6

    ランタイム状態は Completed になり、オーケストレーションの履歴にはさらに 4 つのイベント OrchestratorStartedTaskCompletedExecutionCompletedOrchestratorCompleted が含まれています。 これらのイベントは、このオーケストレーションの実行の 3 つ目かつ最後のエピソードを表しています。

このオーケストレーションの実行の最終履歴には、12 個のイベント OrchestratorStartedExecutionStartedTaskScheduledTaskScheduledOrchestratorCompletedOrchestratorStartedTaskCompletedOrchestratorCompletedOrchestratorStartedTaskCompletedExecutionCompletedOrchestratorCompleted が含まれます。

注意

表示されるスケジュールは 1 つだけではありません。わずかに異なるスケジュールが多数あります。 たとえば、2 つ目のアクティビティが先に完了した場合、TaskCompleted インスタンス メッセージは両方とも 1 つの作業項目によって処理される可能性があります。 その場合、エピソードは 2 つしかないので、実行履歴は少し短くなり、次の 10 個のイベントが含まれます: OrchestratorStartedExecutionStartedTaskScheduledTaskScheduledOrchestratorCompletedOrchestratorStartedTaskCompletedTaskCompletedExecutionCompletedOrchestratorCompleted

タスク ハブの管理

次は、タスク ハブの作成または削除、複数の関数アプリを実行する場合のタスク ハブの正しい使い方、タスク ハブのコンテンツを検査する方法について詳しく見てみましょう。

作成と削除

関数アプリの初回起動時には、必要なリソースをすべて含む空のタスク ハブがストレージに自動的に作成されます。

既定の Azure Storage プロバイダーを使う場合、追加の構成は必要ありません。 それ以外の場合は、ストレージ プロバイダーがタスク ハブに必要なストレージ リソースを適切にプロビジョニングしてアクセスできるように、ストレージ プロバイダーの構成手順に関する記事に従ってください。

注意

タスク ハブは、関数アプリを停止または削除しても自動的には削除され "ません"。 そのデータを保持する必要がなくなった場合は、タスク ハブ、そのコンテンツ、または含まれているストレージ アカウントを手動で削除する必要があります。

ヒント

開発シナリオでは、クリーンな状態からの再起動が必要になることがよくあります。 これをすばやく行うには、構成したタスク ハブの名前を変更するだけ済みます。 こうすることで、アプリケーションの再起動時に新しい空のタスク ハブが強制的に作成されます。 この場合、古いデータは削除されないことに注意してください。

複数の関数アプリ

複数の関数アプリがストレージ アカウントを共有している場合、各関数アプリには個別のタスク ハブ名を構成する "必要があります"。 この要件はステージング スロットにも適用されます。各ステージング スロットは、一意のタスク ハブ名で構成する必要があります。 単一のストレージ アカウントには、複数のタスク ハブを含めることができます。 通常、この制限は他のストレージ プロバイダーにも適用されます。

次の図は、共有ストレージ アカウントと専用 Azure Storage アカウントの各関数アプリにタスク ハブが 1 つあることを示しています。

共有ストレージ アカウントと専用ストレージ アカウントの図

Note

タスク ハブ共有ルールの例外は、リージョンのディザスター リカバリー用にアプリを構成しているかどうかという点です。 詳細については、ディザスター リカバリーと地理的分散に関する記事を参照してください。

コンテンツ検査

タスク ハブのコンテンツを検査するには、一般的な方法がいくつかあります。

  1. 関数アプリ内では、インスタンス ストアのクエリを実行するメソッドがクライアント オブジェクトに用意されています。 サポートされているクエリの種類の詳細については、インスタンス管理に関する記事を参照してください。
  2. 同様に、HTTP API には、オーケストレーションとエンティティの状態のクエリを実行する REST 要求が用意されています。 詳細については、HTTP の API リファレンスに関する記事を参照してください。
  3. Durable Functions Monitor ツールを使うと、タスク ハブを検査できます。また、視覚的な表示のためのさまざまなオプションが用意されています。

一部のストレージ プロバイダーでは、基となるストレージに直接アクセスしてタスク ハブを検査することもできます。

  • Azure Storage プロバイダーを使っている場合、インスタンス状態はインスタンス テーブル履歴テーブルに格納され、Azure Storage Explorer などのツールを使って検査することができます。
  • MSSQL ストレージ プロバイダーを使っている場合、SQL クエリとツールを使って、データベース内のタスク ハブのコンテンツを検査することができます。

ストレージでの表現

各ストレージ プロバイダーは、ストレージ内のタスク ハブを表現するために異なる内部組織を使います。 この組織を理解することは、必須ではありませんが、関数アプリのトラブルシューティング時や、パフォーマンス、スケーラビリティ、コスト目標を確保しようとするときに役立ちます。 そのため、ストレージ プロバイダーごとに、データがストレージ内でどのように編成されるかについて簡単に説明します。 さまざまなストレージ プロバイダーのオプションとその比較については、Durable Functions ストレージ プロバイダーに関する記事を参照してください。

Azure Storage プロバイダー

Azure Storage プロバイダーは、次のコンポーネントを使用してストレージ内のタスク ハブを表します。

  • 2 つの Azure テーブルにインスタンスの状態が格納されます。
  • 1 つの Azure キューにアクティビティ メッセージが格納されます。
  • 1 つ以上の Azure キューにインスタンス メッセージが格納されます。 これらのいわゆる "コントロール キュー" はそれぞれ、インスタンス ID のハッシュに基づいて、すべてのインスタンス メッセージのサブセットが割り当てられるパーティションを表します。
  • BLOB やサイズの大きいメッセージのリースに使用される追加の BLOB コンテナーがいくつかあります。

たとえば、PartitionCount = 4 を持つ xyz という名前が付いたタスク ハブには、次のキューとテーブルが含まれています。

4 つのコントロール キューの Azure Storage プロバイダー ストレージのストレージ構成を示す図。

次に、これらのコンポーネントとそれらが果たす役割について詳しく説明します。

Azure Storage プロバイダーでタスク ハブがどのように表現されているかについては、Azure Storage プロバイダーのドキュメントを参照してください。

Netherite ストレージ プロバイダー

Netherite は、タスク ハブのすべての状態を、指定した数のパーティションに分割します。 ストレージでは、次のリソースが使われます。

  • 1 つの Azure Storage BLOB コンテナー。パーティションごとにグループ化されたすべての BLOB が格納されています。
  • 1 つの Azure テーブル。パーティションに関する発行されたメトリックが格納されています。
  • パーティション間でメッセージを配信するための Azure Event Hubs 名前空間。

たとえば、PartitionCount = 32 を持つ mytaskhub というタスク ハブは、ストレージ内で次のように表されます。

32 個のパーティションがある Netherite ストレージ組織を示す図。

注意

タスク ハブの状態はすべて x-storage BLOB コンテナー内に格納されます。 DurableTaskPartitions テーブルと EventHubs 名前空間には冗長データが含まれています。そのコンテンツが失われた場合、自動的に復旧することができます。 そのため、既定の有効期限を過ぎたメッセージを保持するために、Azure Event Hubs 名前空間を構成する必要はありません。

Netherite は、パーティションの現在の状態を表すために、ログとチェックポイントに基づいたイベントソーシング メカニズムを使います。 ブロック BLOB とページ BLOB の両方が使われます。 この形式をストレージから直接読み取ることはできないため、インスタンス ストアに対してクエリを実行するときには関数アプリが実行中である必要があります。

Netherite ストレージ プロバイダーのタスク ハブの詳細については、Netherite ストレージ プロバイダーのタスク ハブ情報に関する記事を参照してください。

MSSQL ストレージ プロバイダー

すべてのタスク ハブのデータは、いくつかのテーブルを使って、1 つのリレーショナル データベースに格納されます。

  • dt.Instancesdt.History の各テーブルにはインスタンスの状態が格納されます。
  • dt.NewEvents テーブルにはインスタンス メッセージが格納されます。
  • dt.NewTasks テーブルにはアクティビティ メッセージが格納されます。

MSSQL ストレージ組織を示す図。

複数のタスク ハブが同じデータベース内に独立して共存できるように、各テーブルには主キーの一部として TaskHub 列が含まれています。 他の 2 つのプロバイダーとは異なり、MSSQL プロバイダーにはパーティションの概念がありません。

MSSQL ストレージ プロバイダーのタスク ハブの詳細については、Microsoft SQL (MSSQL) ストレージ プロバイダーのタスク ハブ情報に関する記事を参照してください。

タスク ハブ名

タスク ハブは、これらの規則に準拠する必要がある名前で識別されます。

  • 英数字のみを含む
  • 文字で始まる
  • 最小長が 3 文字で、最大長が 45 文字である

次の例に示すように、タスク ハブ名は host.json ファイルで宣言されます。

host.json (Functions 2.0)

{
  "version": "2.0",
  "extensions": {
    "durableTask": {
      "hubName": "MyTaskHub"
    }
  }
}

host.json (Functions 1.x)

{
  "durableTask": {
    "hubName": "MyTaskHub"
  }
}

次の host.json ファイルの例に示すように、タスク ハブはアプリの設定を使用して構成することもできます。

host.json (Functions 1.0)

{
  "durableTask": {
    "hubName": "%MyTaskHub%"
  }
}

host.json (Functions 2.0)

{
  "version": "2.0",
  "extensions": {
    "durableTask": {
      "hubName": "%MyTaskHub%"
    }
  }
}

タスク ハブ名は、MyTaskHub アプリ設定の値に設定されます。 次の local.settings.json は、MyTaskHub 設定を samplehubname として定義する方法を示しています。

{
  "IsEncrypted": false,
  "Values": {
    "MyTaskHub" : "samplehubname"
  }
}

Note

デプロイ スロットを使用する場合は、アプリ設定を使用してタスク ハブ名を構成することをお勧めします。 特定のスロットが常に特定のタスク ハブを使用するようにする場合は、"スロット固定" アプリ設定を使用します。

タスク ハブ名は、host.json に加えて、オーケストレーション クライアント バインディング メタデータでも設定できます。 これは、別の関数アプリに存在するオーケストレーションまたはエンティティにアクセスする必要がある場合に便利です。 次のコードでは、アプリ設定として構成されているタスク ハブを操作するためにオーケストレーション クライアント バインドを使用する関数の記述方法を示しています。

[FunctionName("HttpStart")]
public static async Task<HttpResponseMessage> Run(
    [HttpTrigger(AuthorizationLevel.Function, methods: "post", Route = "orchestrators/{functionName}")] HttpRequestMessage req,
    [DurableClient(TaskHub = "%MyTaskHub%")] IDurableOrchestrationClient starter,
    string functionName,
    ILogger log)
{
    // Function input comes from the request content.
    object eventData = await req.Content.ReadAsAsync<object>();
    string instanceId = await starter.StartNewAsync(functionName, eventData);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

Note

前記の C# の例は Durable Functions 2.x 用です。 Durable Functions 1.x の場合、IDurableOrchestrationContext の代わりに DurableOrchestrationContext を使用する必要があります。 バージョン間の相違点の詳細については、Durable Functions のバージョンに関する記事を参照してください。

Note

クライアント バインド メタデータでタスク ハブ名を構成する必要があるのは、ある関数アプリを使用して別の関数アプリのオーケストレーションとエンティティにアクセスする場合のみです。 クライアント関数がオーケストレーションやエンティティと同じ関数アプリで定義されている場合は、バインド メタデータでタスク ハブ名を指定するのは避ける必要があります。 既定では、すべてのクライアント バインドでは、タスク ハブのメタデータは host.json の設定から取得されます。

タスク ハブの名前は、先頭文字をアルファベットとする必要があります。また、使用できるのはアルファベットと数値だけです。 指定しない場合、次の表に示すように、既定のタスク ハブ名が使用されます。

Durable の拡張機能のバージョン 既定のタスク ハブ名
2.x Azure にデプロイする場合、タスク ハブ名は 関数アプリ の名前から派生します。 Azure の外部で実行する場合、既定のタスク ハブ名は TestHubName です。
1.x すべての環境で、既定のタスク ハブ名は DurableFunctionsHub です。

拡張機能のバージョン間の相違点の詳細については、Durable Functions のバージョンに関する記事を参照してください。

Note

この名前は共有ストレージ アカウント内に複数のタスク ハブがある場合に、それぞれのタスク ハブを区別するものです。 共有ストレージ アカウントを共有する関数アプリが複数ある場合、host.json ファイルでタスク ハブごとに異なる名前を明示的に構成する必要があります。 このようにしないと、複数の関数アプリがメッセージをめぐって互いに競合し、その結果、オーケストレーションが Pending または Running 状態で予期せず "スタック" するなど、未定義の動作が発生する可能性があります。

次のステップ