次の方法で共有


非同期要求-応答パターン

バックエンド処理を非同期的に実行する必要があるが、フロントエンドが明確な応答を必要とする場合は、フロントエンド ホストからバックエンド処理を切り離します。

コンテキストと問題

最新のアプリケーション開発では、クライアント アプリケーションは多くの場合、ビジネス ロジックと作成機能を提供するためにリモート API に依存しています。 多くのアプリケーションは Web ブラウザーでコードを実行し、他の環境でもクライアント コードをホストしています。 API は、アプリケーションに直接関連している場合や、外部サービスから共有サービスとして動作する場合があります。 ほとんどの API 呼び出しでは、HTTP または HTTPS を使用し、REST セマンティクスに従います。

ほとんどの場合、クライアント アプリケーションの API は約 100 ミリ秒 (ミリ秒) 以下で応答します。 多くの要因が応答の待機時間に影響する可能性があります。

  • アプリケーションのホスティング スタック
  • セキュリティ コンポーネント
  • 呼び出し元とバックエンドの相対的な地理的位置
  • ネットワーク インフラストラクチャ
  • 現在の負荷
  • 要求ペイロードのサイズ
  • 処理キューの長さ
  • バックエンドが要求を処理する時間

これらの要因により、応答に待機時間が追加される可能性があります。 バックエンドをスケールアウトすることで、いくつかの要因を軽減できます。 ネットワーク インフラストラクチャなどのその他の要因は、アプリケーション開発者の制御外です。 ほとんどの API は、応答が同じ接続経由で返されるのに十分な速さで応答します。 アプリケーション コードは、非同期処理の外観を提供するために、非ブロッキングの方法で同期 API 呼び出しを行うことができます。 入出力 (I/O) バインド操作には、この方法をお勧めします。

一部のシナリオでは、バックエンドは実行時間が長く、数秒かかる作業を行います。 その他のシナリオでは、バックエンドは長時間実行されるバックグラウンド作業を数分または長期間実行します。 このような場合は、応答を送信する前に作業が完了するまで待つ必要はありません。 このような状況では、同期要求/応答パターンに問題が発生する可能性があります。

一部のアーキテクチャでは、メッセージ ブローカーを使用して要求ステージと応答ステージを分離することによって、この問題を解決しています。 多くのシステムでは、 Queue-Based 負荷平準化パターンによってこの分離が実現されます。 この分離により、クライアント プロセスとバックエンド API を個別にスケーリングできます。 また、クライアントが成功通知を必要とする場合は、そのステップも非同期にする必要があるため、複雑さが増します。

クライアント アプリケーションに適用される同じ考慮事項の多くは、マイクロサービス アーキテクチャなど、分散システムのサーバー間 REST API 呼び出しにも適用されます。

ソリューション

この問題の 1 つの解決策は、HTTP ポーリングを使用することです。 コールバック エンドポイントが使用できない場合、または実行時間の長い接続が複雑すぎる場合は、クライアント側のコードにポーリングが適しています。 コールバックが可能な場合でも、必要な追加のライブラリとサービスによって複雑さが増す可能性があります。

次の手順では、ソリューションについて説明します。

  • クライアント アプリケーションは API を同期的に呼び出して、バックエンドで実行時間の長い操作をトリガーします。

  • API は、可能な限りすばやく同期的に応答します。 HTTP 202 (Accepted) 状態コードを返して、処理要求を受信したことを確認します。

    API は、実行時間の長いプロセスを開始する前に、要求と実行されるアクションを検証します。 要求が有効でない場合は、HTTP 400 (無効な要求) などのエラー コードですぐに応答します。

  • 応答には、クライアントがポーリングして実行時間の長い操作の結果を確認できるエンドポイントを指す場所参照が含まれます。

  • API は、メッセージ キューなどの別のコンポーネントに処理をオフロードします。

  • 状態エンドポイントの呼び出しが成功した場合、エンドポイントは HTTP 200 (OK) を返します。 作業の進行中、エンドポイントはその状態を示すリソースを返します。 作業が完了すると、エンドポイントは完了を示すリソースを返すか、別のリソース URL にリダイレクトします。 たとえば、非同期操作で新しいリソースが作成された場合、状態エンドポイントはそのリソースの URL にリダイレクトされます。

次の図は、一般的なフローを示しています。

非同期 HTTP 要求の要求と応答フローを示す図。

クライアント、API エンドポイント、状態エンドポイント、およびリソース URI を示すシーケンス図。 クライアントは、HTTP 202 を返す POST 要求を API エンドポイントに送信します。 その後、クライアントは、状態エンドポイントに繰り返し GET 要求を送信します。 最初の応答は HTTP 200 を返し、それ以降の応答は HTTP 302 (Found) を返します。 クライアントは、HTTP 200 を返すリソース URI への GET 要求でリダイレクトに従います。 この図は、ポーリングを使用した非同期要求パターンと、完了したリソースへの最終的なリダイレクトを示しています。

  1. クライアントは要求を送信し、HTTP 202 応答を受信します。

  2. クライアントは、HTTP GET 要求を状態エンドポイントに送信します。 作業は保留中であるため、この呼び出しは HTTP 200 を返します。

  3. 作業が完了し、状態エンドポイントから HTTP 302 (Found) が返され、リソースにリダイレクトされます。

  4. クライアントは、指定された URL にあるリソースをフェッチします。

問題と考慮事項

このパターンを実装する方法を決定するときは、次の点を考慮してください。

  • HTTP 経由でこのパターンを実装する方法は複数存在し、アップストリーム サービスでは常に同じセマンティクスが使用されるわけではありません。 たとえば、ほとんどのサービスは、HTTP 202 ではなく、リモート プロセスが完了していない場合に GET メソッドから HTTP 404 (Not Found) を返します。 標準の REST セマンティクスによると、呼び出しの結果がまだ存在しないため、HTTP 404 が正しい応答です。

  • HTTP 202 応答は、クライアントがポーリングする場所と頻度を示します。 これには、次のヘッダーが含まれています。

    ヘッダー 説明 メモ
    Location クライアントが応答状態をポーリングする URL この URL には、Shared Access Signature トークンを指定できます。 バレット キー パターンは、この場所でアクセス制御が必要な場合に適切に機能します。 このパターンは、応答ポーリングを別のバックエンドに移動する必要がある場合にも適用されます。
    Retry-After 処理が完了するタイミングの見積もり このヘッダーは、ポーリングクライアントがバックエンドに対して過剰な要求を送信するのを防ぎます。

    この応答を設計するときは、予想されるクライアントの動作を検討してください。 制御するクライアントは、これらの応答値に正確に従うことができます。 コードなしのツールやAzure Logic Appsなどのローコード ツールを使用して構築されたクライアントなど、他のユーザーが作成するクライアントは、HTTP 202 に対して独自の処理を適用できます。

  • 使用する基になるサービスによっては、処理プロキシを使用して応答ヘッダーまたはペイロードを調整することが必要になる場合があります。

  • 完了後に状態エンドポイントがリダイレクトされる場合は、サポートするセマンティクスに応じて、 HTTP 302 または HTTP 303 (その他を参照) が有効なリターン コードになります。

  • サーバーが要求を処理した後、 ヘッダーで指定されたリソースは、200、201 (作成済み)、204 (コンテンツなし) などの HTTP 状態コードを返します。

  • 処理中にエラーが発生した場合は、 ヘッダーが指定したリソース URL でエラーを保持し、エラーに一致する 4xx 状態コードをそのリソースから返します。

  • ソリューションはすべて同じ方法でこのパターンを実装するわけではありません。また、一部のサービスには追加ヘッダーまたは代替ヘッダーが含まれています。 たとえば、Azure Resource Managerはこのパターンの変更されたバリアントを使用します。 詳細については、「Resource Manager非同期操作を参照してください。

  • レガシ クライアントでは、このパターンをサポートしていない可能性があります。 その場合、元のクライアントから非同期処理を非表示にするために、非同期 API に処理プロキシを配置する必要がある場合があります。 たとえば、Logic Apps ではこのパターンがネイティブにサポートされており、非同期 API と同期呼び出しを行うクライアントの間の統合レイヤーとして使用できます。 詳細については、「 Webhook アクション パターンを使用して実行時間の長いタスクを実行する」を参照してください。

  • 一部のシナリオでは、クライアントが長時間実行される要求をキャンセルする方法を提供する必要がある場合があります。 その場合、バックエンド サービスは何らかの形式のキャンセル命令をサポートする必要があります。

このパターンを使用する場合

このパターンは次の状況で使用します。

  • ブラウザー アプリケーションなどのクライアント側コードを操作すると、これらの制約によってコールバック エンドポイントの提供が困難になる、または実行時間の長い接続が複雑になりすぎる。

  • HTTP プロトコルのみを使用するサービスを呼び出すと、クライアント側のファイアウォール制限のため、リターン サービスはコールバックを送信できません。

  • WebSocket や webhook などの最新のコールバック メカニズムをサポートしていないレガシ アーキテクチャと統合します。

このパターンは、次の場合に適さない場合があります。

  • 代わりに、Azure Event Gridなどの非同期通知用に構築されたサービスを使用できます。

  • 応答は、クライアントにリアルタイムでストリーミングする必要があります。

  • クライアントは多くの結果を収集する必要があり、それらの結果の待機時間が重要です。 むしろ、サービスバスパターンの使用を検討してください。

  • WebSocket や SignalR などのサーバー側の永続的なネットワーク接続を使用できます。 これらの接続を使用して、呼び出し元に結果を通知できます。

  • ネットワーク設計では、非同期コールバックまたは Webhook を受信するためのオープン ポートがサポートされています。

ワークロード設計

アーキテクトは、ワークロードの設計で非同期 Request-Reply パターンを使用して、Azure Well-Architected Framework の柱で説明されている目標と原則に対処する方法を評価する必要があります。

支柱 このパターンが柱の目標をサポートする方法
パフォーマンス効率 は、スケーリング、データ、およびコードの最適化を通じて、ワークロード の需要を効率的に満たすのに役立ちます。 即時応答を必要としないプロセスの要求フェーズと応答フェーズを切り離すことで、応答性とスケーラビリティを向上させます。 非同期アプローチを使用すると、コンカレンシーが向上し、容量が使用可能になったときにサーバーのスケジュールが機能します。

PE:05 スケーリングとパーティショニング
PE:07 コードとインフラストラクチャ

設計上の決定と同様に、このパターンが導入する可能性がある他の柱の目標に対するトレードオフを検討してください。

次のコードは、Azure Functionsを使用してこのパターンを実装するアプリケーションからの抜粋を示しています。 このソリューションには、次の 3 つの機能があります。

  • 非同期 API エンドポイント
  • 状態エンドポイント
  • キューに置かれた作業項目を受け取って実行するバックエンド関数

Functions の非同期要求応答パターンの構造の図。

手順 1 では、クライアントが API を呼び出します。 手順 2 では、API によってキューにメッセージが配置されます。 手順 3 では、API は状態エンドポイントをクライアントに返します。 手順 4 では、ワーカーはキューからメッセージを受信します。 手順 5 では、ワーカーがメッセージを処理し、結果を BLOB ストレージに書き込みます。 手順 6 では、クライアントは状態エンドポイントを呼び出します。 手順 7 では、ステータス エンドポイントが BLOB ストレージで結果をチェックします。

GitHub logo. このサンプルは、GitHub で入手できます。

AsyncProcessingWorkAcceptor 関数

関数は、クライアント アプリケーションからの作業を受け入れ、処理のためにエンキューするエンドポイントを実装します。

  • 関数は、要求 ID を生成し、それをメタデータとしてキュー メッセージに追加します。

  • HTTP 応答には、状態エンドポイントを指す ヘッダーが含まれています。 要求 ID が URL パスに表示されます。

    public class AsyncProcessingWorkAcceptor(ServiceBusClient _serviceBusClient)
    {
        [Function("AsyncProcessingWorkAcceptor")]
        public async Task<IActionResult> RunAsync([HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] HttpRequest req, [FromBody] CustomerPOCO customer)
        {
            if (string.IsNullOrEmpty(customer.id) || string.IsNullOrEmpty(customer.customername))
            {
                return new BadRequestResult();
            }

            var reqid = Guid.NewGuid().ToString();

            string scheme = Environment.GetEnvironmentVariable("AZURE_FUNCTIONS_ENVIRONMENT") == "Development" ? "http" : "https";
            var rqs = $"{scheme}://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{reqid}";

            var messagePayload = JsonConvert.SerializeObject(customer);
            var message = new ServiceBusMessage(messagePayload);
            message.ApplicationProperties.Add("RequestGUID", reqid);
            message.ApplicationProperties.Add("RequestSubmittedAt", DateTime.Now);
            message.ApplicationProperties.Add("RequestStatusURL", rqs);
            var sender = _serviceBusClient.CreateSender("outqueue");

            await sender.SendMessageAsync(message);
            return new AcceptedResult(rqs, $"Request Accepted for Processing{Environment.NewLine}ProxyStatus: {rqs}");
        }
    }

AsyncProcessingBackgroundWorker 関数

関数は、キューから操作を読み取り、メッセージ ペイロードに基づいて処理し、結果をストレージ アカウントに書き込みます。

    public class AsyncProcessingBackgroundWorker(BlobContainerClient _blobContainerClient)
    {
        [Function(nameof(AsyncProcessingBackgroundWorker))]
        public async Task Run([ServiceBusTrigger("outqueue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
        {
            var requestGuid = message.ApplicationProperties["RequestGUID"].ToString();
            string blobName = $"{requestGuid}.blobdata";

            await _blobContainerClient.CreateIfNotExistsAsync();

            var blobClient = _blobContainerClient.GetBlobClient(blobName);
            using (MemoryStream memoryStream = new MemoryStream())
            using (StreamWriter writer = new StreamWriter(memoryStream))
            {
                writer.Write(message.Body.ToString());
                writer.Flush();
                memoryStream.Position = 0;

                await blobClient.UploadAsync(memoryStream, overwrite: true);
            }
        }
    }

AsyncOperationStatusChecker 関数

関数は、状態エンドポイントを実装します。 この関数は、要求の状態を確認します。

  • 要求が完了すると、関数は 応答にバレット キー を返すか、呼び出しをすぐにバレット キー URL にリダイレクトします。

  • 要求が保留中の場合、関数は現在の状態を 含む HTTP 200 コードを返します。

    public class AsyncOperationStatusChecker(ILogger<AsyncOperationStatusChecker> _logger)
    {  
        [Function("AsyncOperationStatusChecker")]
        public async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "RequestStatus/{thisGUID}")] HttpRequest req,
             [BlobInput("data/{thisGUID}.blobdata", Connection = "DataStorage")] BlockBlobClient inputBlob, string thisGUID)
        {
            OnCompleteEnum OnComplete = Enum.Parse<OnCompleteEnum>(req.Query["OnComplete"].FirstOrDefault() ?? "Redirect");
            OnPendingEnum OnPending = Enum.Parse<OnPendingEnum>(req.Query["OnPending"].FirstOrDefault() ?? "OK");

            _logger.LogInformation($"C# HTTP trigger function processed a request for status on {thisGUID} - OnComplete {OnComplete} - OnPending {OnPending}");

            // Check whether the blob exists.
            if (await inputBlob.ExistsAsync())
            {
                // If the blob exists, the function uses the OnComplete parameter to determine the next action.
                return await OnCompleted(OnComplete, inputBlob, thisGUID);
            }
            else
            {
                // If the blob doesn't exist, the function uses the OnPending parameter to determine the next action.
                string scheme = Environment.GetEnvironmentVariable("AZURE_FUNCTIONS_ENVIRONMENT") == "Development" ? "http" : "https";
                string rqs = $"{scheme}://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{thisGUID}";

                switch (OnPending)
                {
                    case OnPendingEnum.OK:
                        {
                            // Return an HTTP 200 status code.
                            return new OkObjectResult(new { status = "In progress", Location = rqs });
                        }

                    case OnPendingEnum.Synchronous:
                        {
                            // Back off and retry. Time out if the back-off period reaches one minute.
                            int backoff = 250;

                            while (!await inputBlob.ExistsAsync() && backoff < 64000)
                            {
                                _logger.LogInformation($"Synchronous mode {thisGUID}.blob - retrying in {backoff} ms");
                                backoff = backoff * 2;
                                await Task.Delay(backoff);
                            }

                            if (await inputBlob.ExistsAsync())
                            {
                                _logger.LogInformation($"Synchronous Redirect mode {thisGUID}.blob - completed after {backoff} ms");
                                return await OnCompleted(OnComplete, inputBlob, thisGUID);
                            }
                            else
                            {
                                _logger.LogInformation($"Synchronous mode {thisGUID}.blob - NOT FOUND after timeout {backoff} ms");
                                return new NotFoundResult();
                            }
                        }

                    default:
                        {
                            throw new InvalidOperationException($"Unexpected value: {OnPending}");
                        }
                }
            }
        }
        private async Task<IActionResult> OnCompleted(OnCompleteEnum OnComplete, BlockBlobClient inputBlob, string thisGUID)
        {
            switch (OnComplete)
            {
                case OnCompleteEnum.Redirect:

                    {
                        // The typical way to generate a shared access signature token in code requires the storage account key.
                        // If you need to use a managed identity to control access to your storage accounts in code, which is a recommended best practice, you should do so when possible.
                        // In this scenario, you don't have a storage account key, so you need to find another way to generate the shared access signatures.
                        // To generate shared access signatures, use a user delegation shared access signature. This approach lets you sign the shared access signature by using Microsoft Entra ID credentials instead of the storage account key.

                        BlobServiceClient blobServiceClient = inputBlob.GetParentBlobContainerClient().GetParentBlobServiceClient();
                        var userDelegationKey = await blobServiceClient.GetUserDelegationKeyAsync(DateTimeOffset.UtcNow, DateTimeOffset.UtcNow.AddDays(7));
                        // Redirect the shared access signature uniform resource identifier (URI) to blob storage.
                        return new RedirectResult(inputBlob.GenerateSASURI(userDelegationKey));
                    }

                case OnCompleteEnum.Stream:
                    {
                        // Download the file and return it directly to the caller.
                        // For larger files, use a stream to minimize RAM usage.
                        return new OkObjectResult(await inputBlob.DownloadContentAsync());
                    }

                default:
                    {
                        throw new InvalidOperationException($"Unexpected value: {OnComplete}");
                    }
            }
        }
    }

    public enum OnCompleteEnum
    {

        Redirect,
        Stream
    }

    public enum OnPendingEnum
    {

        OK,
        Synchronous
    }

次のクラスは、ステータス チェッカーが結果 BLOB のユーザー委任用の共有アクセス署名 (共有リソース識別子 URI) を生成するために使用する拡張メソッドを提供します。

    public static class CloudBlockBlobExtensions
    {
        public static string GenerateSASURI(this BlockBlobClient inputBlob, UserDelegationKey userDelegationKey)
        {
            BlobServiceClient blobServiceClient = inputBlob.GetParentBlobContainerClient().GetParentBlobServiceClient();

            BlobSasBuilder blobSasBuilder = new BlobSasBuilder()
            {
                BlobContainerName = inputBlob.BlobContainerName,
                BlobName = inputBlob.Name,
                Resource = "b",
                StartsOn = DateTimeOffset.UtcNow,
                ExpiresOn = DateTimeOffset.UtcNow.AddMinutes(10)
            };
            blobSasBuilder.SetPermissions(BlobSasPermissions.Read);

            var blobUriBuilder = new BlobUriBuilder(inputBlob.Uri)
            {
                Sas = blobSasBuilder.ToSasQueryParameters(userDelegationKey, blobServiceClient.AccountName)
            };

            // Generate the shared access signature on the blob, which sets the constraints directly on the signature.
            Uri sasUri = blobUriBuilder.ToUri();

            // Return the URI string for the container, including the shared access signature token.
            return sasUri.ToString();
        }
    }

次のステップ

  • 実行時間の長いタスクにポーリング アクション パターンを使用する
  • 非同期 HTTP API パターン
  • Web API の設計
  • フロントエンド用バックエンド パターン
  • バレット キー パターン