非同期要求-応答パターン

Azure
Azure Logic Apps

フロントエンド ホストからバックエンド処理を分離します。その場合バックエンド処理を非同期にする必要がありますが、引き続きフロントエンドには明確な応答が必要です。

コンテキストと問題

最新のアプリケーション開発では、クライアント アプリケーション (多くの場合、Web クライアント (ブラウザー) で実行されるコード) が、ビジネス ロジックと作成機能を提供するためにリモート API に依存することが通常となっています。 これらの API は、アプリケーションに直接関連している場合もあれば、サード パーティによって提供される共有サービスである場合もあります。 一般に、これらの API 呼び出しは HTTP(S) プロトコルを介して行われ、REST セマンティクスに従います。

ほとんどの場合、クライアント アプリケーションの API は、100 ミリ秒以下の単位ですばやく応答するように設計されています。 次のような多くの要因によって、応答待機時間が影響を受ける可能性があります。

  • アプリケーションのホスト スタック。
  • セキュリティ コンポーネント。
  • 呼び出し元とバックエンドの相対的な地理的位置。
  • ネットワーク インフラストラクチャ。
  • 現在の負荷。
  • 要求ペイロードのサイズ。
  • 処理キューの長さ。
  • バックエンドが要求を処理する時間。

これらのいずれかの要因によって、応答に待機時間が加わる可能性があります。 一部はバックエンドをスケール アウトすることで軽減できます。 ネットワーク インフラストラクチャなどの他の要因は、アプリケーション開発者がほとんど制御できません。 ほとんどの API では、同じ接続を介して応答が到着するまでに十分な速さで応答できます。 アプリケーション コードは、非ブロッキング方式で同期 API 呼び出しを行うことができます。これは、非同期処理の外観をしており、I/O バインド操作に推奨されます。

ただし、シナリオによっては、バックエンドによって実行される作業が、数秒単位で長時間実行される場合や、数分または数時間実行されるバックグラウンド プロセスになる場合があります。 その場合は、作業が完了するまで待機してから、要求に応答することはできません。 この状況は、すべての同期要求-応答パターンの潜在的な問題点です。

一部のアーキテクチャでは、メッセージ ブローカーを使用して要求ステージと応答ステージを分離することによって、この問題を解決しています。 この分離は、多くの場合、キュー ベースの負荷平準化パターンを使用することで実現されます。 この分離により、クライアント プロセスとバックエンド API を個別にスケーリングできます。 しかし、この分離により、クライアントで成功通知が必要になる場合に、この手順を非同期にする必要があるため、さらに複雑になります。

クライアント アプリケーションについて説明した同じ考慮事項の多くが、マイクロサービス アーキテクチャなどの分散システムにおけるサーバー間の REST API 呼び出しなどにも適用されます。

解決策

この問題の 1 つの解決策は、HTTP ポーリングを使用することです。 コールバック エンドポイントを提供したり、長時間実行する接続を使用したりするのは困難な場合があるため、クライアント側コードにはポーリングが役立ちます。 コールバックが可能な場合でも、必要な追加のライブラリとサービスによって、大幅に複雑さが加わることがあります。

  • クライアント アプリケーションは、API への同期呼び出しを行い、バックエンドで長時間実行する操作をトリガーします。

  • API は、可能な限りすばやく同期的に応答します。 それは、HTTP 202 (Accepted) 状態コードを返し、要求が処理のために受信されたことを確認します。

    注意

    API では、長時間実行されるプロセスを開始する前に、要求と実行されるアクションの両方を検証する必要があります。 要求が無効な場合、HTTP 400 (Bad Request) などのエラーコードですぐに応答します。

  • 応答は、長時間実行される操作の結果を確認するために、クライアントがポーリングできるエンドポイントを指す場所参照を保持します。

  • API では、メッセージ キューなどの別のコンポーネントに処理をオフロードします。

  • 状態エンドポイントの呼び出しが正常に行われるたびに、HTTP 200 が返されます。 作業がまだ保留中の間、状態エンドポイントは作業がまだ進行中であることを示すリソースを返します。 作業が完了したら、状態エンドポイントでは、完了を示すリソースを返すか、別のリソース URL にリダイレクトすることができます。 たとえば、非同期操作によって新しいリソースが作成された場合、状態エンドポイントはそのリソースの URL にリダイレクトします。

次の図は、一般的なフローを示しています。

非同期 HTTP 要求の要求と応答のフロー

  1. クライアントは要求を送信し、HTTP 202 (Accepted) 応答を受信します。
  2. クライアントは、HTTP GET 要求を状態エンドポイントに送信します。 作業がまだ保留中のため、この呼び出しは HTTP 200 を返します。
  3. ある時点で作業が完了すると、状態エンドポイントは、リソースにリダイレクトする 302 (Found) を返します。
  4. クライアントは、指定された URL にあるリソースをフェッチします。

問題と注意事項

  • HTTP 経由でこのパターンを実装できる多くの方法があり、すべてのアップストリーム サービスで同じセマンティクスを使用するとは限りません。 たとえば、リモート プロセスが終了していない場合、ほとんどのサービスでは GET メソッドから HTTP 202 応答を返しません。 純粋な REST セマンティクスに従うと、それらは HTTP 404 (Not Found) を返す必要があります。 この応答は、呼び出しの結果がまだ存在しないと見なす場合に意味があります。

  • HTTP 202 応答では、クライアントが応答をポーリングする場所と頻度を示す必要があります。 次の追加のヘッダーが必要です。

    ヘッダー 説明 Notes
    Location クライアントが応答状態をポーリングする必要がある URL。 この URL は、この場所でアクセス制御が必要な場合に適切なバレー キー パターンを持つ SAS トークンの可能性があります。 バレー キー パターンは、応答ポーリングを別のバックエンドにオフロードする必要がある場合にも有効です。
    Retry-After 処理が完了するタイミングの見積もり このヘッダーは、ポーリング クライアントが、再試行でバックエンドを過負荷にしないように設計されています。
  • 使用する基になるサービスに応じて、応答ヘッダーまたはペイロードを操作するために、処理プロキシやファサードを使用する必要がある場合があります。

  • 状態エンドポイントで完了時にリダイレクトする場合、サポートする正確なセマンティクスに応じて、HTTP 302 または HTTP 303 のいずれかが適切なリターン コードになります。

  • 処理が成功したら、Location ヘッダーで指定されたリソースは、200 (OK)、201 (Created)、204 (No Content) などの適切な HTTP 応答コードを返す必要があります。

  • 処理中にエラーが発生した場合、Location ヘッダーに記述されているリソース URL でエラーを保持し、そのリソースからクライアントに適切な応答コード (4xx コード) を返すことが理想的です。

  • すべてのソリューションでこのパターンを同じ方法で実装するわけではなく、一部のサービスでは、追加または代替ヘッダーを含めます。 たとえば、Azure Resource Manager では、このパターンの変更されたバリエーションを使用します。 詳細については、Azure Resource Manager の非同期操作に関するページを参照してください。

  • レガシ クライアントでは、このパターンをサポートしていない可能性があります。 その場合は、元のクライアントからの非同期処理を隠すために、非同期 API にファサードを配置する必要がある場合があります。 たとえば、このパターンをネイティブにサポートしている Azure Logic Apps を、非同期 API と同期呼び出しを行うクライアント間の統合レイヤーとして使用できます。 「webhook アクション パターンで長時間タスクを実行する」を参照してください。

  • 一部のシナリオでは、クライアントが長時間実行される要求をキャンセルする方法を提供する必要がある場合があります。 その場合、バックエンド サービスで何らかの形式のキャンセル命令をサポートしている必要があります。

このパターンを使用する状況

このパターンは次の場合に使用します。

  • コールバック エンドポイントを提供するのが困難であったり、長時間実行接続を使用すると、著しく複雑さが増したりするブラウザー アプリケーションなどのクライアント側コード。

  • クライアント側でのファイアウォールの制限のため、HTTP プロトコルのみが使用可能で、リターン サービスでコールバックを起動できないサービス呼び出し。

  • Websocket や webhook などの最新のコールバックテクノロジをサポートしていないレガシ アーキテクチャと統合する必要があるサービス呼び出し。

このパターンは、次の場合に適していない可能性があります。

  • Azure Event Grid など、非同期通知用に構築されたサービスを代わりに使用できます。
  • 応答は、クライアントにリアルタイムでストリーミングする必要があります。
  • クライアントでは多くの結果を収集する必要があり、それらの結果の受信待機時間が重要になります。 代わりに、Service Bus パターンを検討します。
  • WebSocket や SignalR などのサーバー側の永続的なネットワーク接続を使用できます。 これらのサービスを使用して、呼び出し元に結果を通知できます。
  • ネットワーク設計により、ポートを開いて非同期コールバックや webhook を受信することができます。

ワークロード設計

設計者は、Azure Well-Architected Framework の柱で説明されている目標と原則に対処するために、ワークロードの設計で非同期リクエスト・返信パターンをどのように使用できるかを評価する必要があります。 次に例を示します。

重要な要素 このパターンが柱の目標をサポートする方法
パフォーマンスの効率化は、スケーリング、データ、コードを最適化することによって、ワークロードが効率的にニーズを満たすのに役立ちます。 即答を必要としないプロセスについて、相互作用の要求フェーズと応答フェーズを分離することで、システムの応答性とスケーラビリティが向上します。 非同期のアプローチとして、サーバ側での同時性を最大限に高め、キャパシティに応じて作業を完了するようスケジュールできます。

- PE:05 スケーリングとパーティショニング
- PE:07 コードとインフラストラクチャ

設計決定と同様に、このパターンで導入される可能性のある他の柱の目標とのトレードオフを考慮してください。

次のコードは、Azure Functions を使用して、このパターンを実装するアプリケーションからの抜粋を示しています。 このソリューションには、次の 3 つの関数があります。

  • 非同期 API エンドポイント。
  • 状態エンドポイント。
  • キューに置かれた作業項目を取得して、それらを実行するバックエンド関数。

関数内の非同期要求応答パターンの構造のイメージ

GitHub ロゴ このサンプルは GitHub で入手できます。

AsyncProcessingWorkAcceptor 関数

AsyncProcessingWorkAcceptor 関数は、クライアント アプリケーションから作業を受け入れ、それを処理のためにキューに配置するエンドポイントを実装します。

  • 関数は、要求 ID を生成し、それをメタデータとしてキュー メッセージに追加します。
  • HTTP 応答には、状態エンドポイントを指す場所ヘッダーが含まれます。 要求 ID は URL パスの一部です。
public static class AsyncProcessingWorkAcceptor
{
    [FunctionName("AsyncProcessingWorkAcceptor")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] CustomerPOCO customer,
        [ServiceBus("outqueue", Connection = "ServiceBusConnectionAppSetting")] IAsyncCollector<ServiceBusMessage> OutMessages,
        ILogger log)
    {
        if (String.IsNullOrEmpty(customer.id) || string.IsNullOrEmpty(customer.customername))
        {
            return new BadRequestResult();
        }

        string reqid = Guid.NewGuid().ToString();

        string rqs = $"http://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{reqid}";

        var messagePayload = JsonConvert.SerializeObject(customer);
        var message = new ServiceBusMessage(messagePayload);
        message.ApplicationProperties.Add("RequestGUID", reqid);
        message.ApplicationProperties.Add("RequestSubmittedAt", DateTime.Now);
        message.ApplicationProperties.Add("RequestStatusURL", rqs);

        await OutMessages.AddAsync(message);

        return new AcceptedResult(rqs, $"Request Accepted for Processing{Environment.NewLine}ProxyStatus: {rqs}");
    }
}

AsyncProcessingBackgroundWorker 関数

AsyncProcessingBackgroundWorker 関数は、キューから操作を取得し、メッセージ ペイロードに基づいて何らかの作業を行い、その結果をストレージ アカウントに書き込みます。

public static class AsyncProcessingBackgroundWorker
{
    [FunctionName("AsyncProcessingBackgroundWorker")]
    public static async Task RunAsync(
        [ServiceBusTrigger("outqueue", Connection = "ServiceBusConnectionAppSetting")] BinaryData customer,
        IDictionary<string, object> applicationProperties,
        [Blob("data", FileAccess.ReadWrite, Connection = "StorageConnectionAppSetting")] BlobContainerClient inputContainer,
        ILogger log)
    {
        // Perform an actual action against the blob data source for the async readers to be able to check against.
        // This is where your actual service worker processing will be performed

        var id = applicationProperties["RequestGUID"] as string;

        BlobClient blob = inputContainer.GetBlobClient($"{id}.blobdata");

        // Now write the results to blob storage.
        await blob.UploadAsync(customer);
    }
}

AsyncOperationStatusChecker 関数

AsyncOperationStatusChecker 関数は、状態エンドポイントを実装します。 この関数は、まず要求が完了したかどうかをチェックします

  • 要求が完了している場合、関数は応答にバレー キーを返すか、またはその呼び出しをバレー キー URL に即時にリダイレクトします。
  • 要求がまだ保留中の場合は、現在の状態とともに 200 コードを返します。
public static class AsyncOperationStatusChecker
{
    [FunctionName("AsyncOperationStatusChecker")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "RequestStatus/{thisGUID}")] HttpRequest req,
        [Blob("data/{thisGuid}.blobdata", FileAccess.Read, Connection = "StorageConnectionAppSetting")] BlockBlobClient inputBlob, string thisGUID,
        ILogger log)
    {

        OnCompleteEnum OnComplete = Enum.Parse<OnCompleteEnum>(req.Query["OnComplete"].FirstOrDefault() ?? "Redirect");
        OnPendingEnum OnPending = Enum.Parse<OnPendingEnum>(req.Query["OnPending"].FirstOrDefault() ?? "OK");

        log.LogInformation($"C# HTTP trigger function processed a request for status on {thisGUID} - OnComplete {OnComplete} - OnPending {OnPending}");

        // Check to see if the blob is present
        if (await inputBlob.ExistsAsync())
        {
            // If it's present, depending on the value of the optional "OnComplete" parameter choose what to do.
            return await OnCompleted(OnComplete, inputBlob, thisGUID);
        }
        else
        {
            // If it's NOT present, then we need to back off. Depending on the value of the optional "OnPending" parameter, choose what to do.
            string rqs = $"http://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{thisGUID}";

            switch (OnPending)
            {
                case OnPendingEnum.OK:
                    {
                        // Return an HTTP 200 status code.
                        return new OkObjectResult(new { status = "In progress", Location = rqs });
                    }

                case OnPendingEnum.Synchronous:
                    {
                        // Back off and retry. Time out if the backoff period hits one minute.
                        int backoff = 250;

                        while (!await inputBlob.ExistsAsync() && backoff < 64000)
                        {
                            log.LogInformation($"Synchronous mode {thisGUID}.blob - retrying in {backoff} ms");
                            backoff = backoff * 2;
                            await Task.Delay(backoff);
                        }

                        if (await inputBlob.ExistsAsync())
                        {
                            log.LogInformation($"Synchronous Redirect mode {thisGUID}.blob - completed after {backoff} ms");
                            return await OnCompleted(OnComplete, inputBlob, thisGUID);
                        }
                        else
                        {
                            log.LogInformation($"Synchronous mode {thisGUID}.blob - NOT FOUND after timeout {backoff} ms");
                            return new NotFoundResult();
                        }
                    }

                default:
                    {
                        throw new InvalidOperationException($"Unexpected value: {OnPending}");
                    }
            }
        }
    }

    private static async Task<IActionResult> OnCompleted(OnCompleteEnum OnComplete, BlockBlobClient inputBlob, string thisGUID)
    {
        switch (OnComplete)
        {
            case OnCompleteEnum.Redirect:
                {
                    // Redirect to the SAS URI to blob storage

                    return new RedirectResult(inputBlob.GenerateSASURI());
                }

            case OnCompleteEnum.Stream:
                {
                    // Download the file and return it directly to the caller.
                    // For larger files, use a stream to minimize RAM usage.
                    return new OkObjectResult(await inputBlob.DownloadContentAsync());
                }

            default:
                {
                    throw new InvalidOperationException($"Unexpected value: {OnComplete}");
                }
        }
    }
}

public enum OnCompleteEnum
{

    Redirect,
    Stream
}

public enum OnPendingEnum
{

    OK,
    Synchronous
}

次のステップ

このパターンを実装するときは、次の情報を参考にしてください。