異步要求-回復模式

Azure
Azure Logic Apps

將後端處理與前端主機分離,其中後端處理必須是異步的,但前端仍然需要明確的回應。

內容和問題

在新式應用程式開發中,用戶端應用程式通常是在 Web 用戶端(瀏覽器)中執行的程式碼,依賴遠端 API 來提供商業規則和撰寫功能。 這些 API 可能與應用程式直接相關,或可能是第三方提供的共享服務。 這些 API 呼叫通常會透過 HTTP(S) 通訊協議進行,並遵循 REST 語意。

在大部分情況下,用戶端應用程式的 API 設計成以 100 毫秒或更少的順序快速回應。 許多因素可能會影響回應延遲,包括:

  • 應用程式的裝載堆疊。
  • 安全性元件。
  • 呼叫端和後端的相對地理位置。
  • 網路基礎結構。
  • 目前的負載。
  • 要求承載的大小。
  • 處理佇列長度。
  • 後端處理要求的時間。

這些因素中的任何一個都可以將延遲新增至回應。 有些可以藉由相應放大後端來減輕。 其他,例如網路基礎結構,基本上無法控制應用程式開發人員。 大部分的 API 可以快速回應,讓回應透過相同的連線傳回。 應用程式程式代碼可以透過非封鎖方式進行同步 API 呼叫,以呈現異步處理的外觀,這是 I/O 系結作業的建議。

不過,在某些情況下,後端所完成的工作可能會長時間執行、依秒順序執行,或可能是以分鐘或甚至數小時執行的背景進程。 在此情況下,在回應要求之前,等候工作完成並不可行。 這種情況是任何同步要求-回復模式的潛在問題。

某些架構會使用訊息代理程式來分隔要求和響應階段,以解決此問題。 此區隔通常是使用 佇列型負載撫平模式來達成。 此區隔可讓客戶端進程和後端 API 獨立調整。 但是,當用戶端需要成功通知時,此區隔也會帶來額外的複雜性,因為此步驟需要變成異步。

用戶端應用程式所討論的許多相同考慮也適用於分散式系統中的伺服器對伺服器 REST API 呼叫,例如,在微服務架構中。

解決方案

此問題的其中一個解決方案是使用 HTTP 輪詢。 輪詢對客戶端程式代碼很有用,因為很難提供回呼端點或使用長時間執行的連線。 即使可能回呼,所需的額外連結庫和服務有時會增加太多額外的複雜度。

  • 用戶端應用程式對 API 進行同步呼叫,並在後端觸發長時間執行的作業。

  • API 會儘快同步回應。 它會傳回 HTTP 202(已接受)狀態代碼,並確認已收到要求進行處理。

    注意

    API 應該先驗證要求和要執行的動作,再啟動長時間執行的進程。 如果要求無效,請立即回復錯誤碼,例如 HTTP 400(不正確的要求)。

  • 回應會保存位置參考,指向用戶端可以輪詢的端點,以檢查長時間執行作業的結果。

  • API 會將處理卸除至另一個元件,例如消息佇列。

  • 針對每個成功呼叫狀態端點,它會傳回 HTTP 200。 當工作仍在擱置中時,狀態端點會傳回資源,指出工作仍在進行中。 工作完成後,狀態端點可以傳回表示完成的資源,或重新導向至另一個資源 URL。 例如,如果異步操作建立新的資源,狀態端點會重新導向至該資源的URL。

下圖顯示一般流程:

異步 HTTP 要求的要求和回應流程

  1. 用戶端會傳送要求並接收 HTTP 202(已接受)回應。
  2. 用戶端會將 HTTP GET 要求傳送至狀態端點。 工作仍在擱置中,因此此呼叫會傳回 HTTP 200。
  3. 在某些時候,工作已完成,狀態端點會傳回 302 (Found) 重新導向至資源。
  4. 用戶端會在指定的 URL 擷取資源。

問題和考慮

  • 透過 HTTP 實作此模式的方法有很多種,並非所有上游服務都有相同的語意。 例如,當遠端進程尚未完成時,大部分服務都不會從 GET 方法傳回 HTTP 202 回應。 在純 REST 語意之後,它們應該會傳回 HTTP 404(找不到)。 當您考慮呼叫的結果尚未出現時,此回應是合理的。

  • HTTP 202 回應應該指出客戶端應該輪詢回應的位置和頻率。 它應該有下列額外的標頭:

    標頭 描述 附註
    Location 用戶端應該輪詢回應狀態的 URL。 如果此位置需要訪問控制, 此 URL 可能是具有代客金鑰模式 的 SAS 令牌。 當回應輪詢需要卸除至另一個後端時,代客密鑰模式也有效
    Retry-After 處理何時完成的估計 此標頭的設計目的是防止輪詢用戶端因重試而壓倒後端。
  • 您可能需要使用處理 Proxy 或外觀,根據所使用的基礎服務來操作回應標頭或承載。

  • 如果狀態端點在完成時重新導向, 則 HTTP 302HTTP 303 都是適當的傳回碼,視您支援的確切語意而定。

  • 成功處理時,Location 標頭所指定的資源應該會傳回適當的 HTTP 回應碼,例如 200 (確定)、201(已建立),或 204 (無內容)。

  • 如果在處理期間發生錯誤,請在位置標頭中所述的資源 URL 上保存錯誤,並在理想情況下,從該資源將適當的回應碼傳回給用戶端(4xx 程式代碼)。

  • 並非所有解決方案都會以相同的方式實作此模式,而且某些服務會包含其他或替代標頭。 例如,Azure Resource Manager 會使用此模式的修改變體。 如需詳細資訊,請參閱 Azure Resource Manager 異步操作

  • 舊版用戶端可能不支援此模式。 在此情況下,您可能需要在異步 API 上放置外觀,以隱藏原始用戶端的異步處理。 例如,Azure Logic Apps 以原生方式支援此模式,可作為異步 API 與進行同步呼叫的用戶端之間的整合層。 請參閱 使用 Webhook 動作模式執行長時間執行的工作。

  • 在某些情況下,您可能想要為用戶端提供取消長時間執行要求的方式。 在此情況下,後端服務必須支持某種形式的取消指示。

使用此模式的時機

使用下列模式:

  • 用戶端程序代碼,例如瀏覽器應用程式,很難提供回呼端點,或使用長時間執行的聯機會增加太多額外的複雜度。

  • 只有 HTTP 通訊協定可用且傳回服務因用戶端上的防火牆限制而無法引發回呼的服務呼叫。

  • 需要與不支援 WebSocket 或 Webhook 等新式回呼技術之舊版架構整合的服務呼叫。

當下列情況時,此模式可能不適合:

  • 您可以改用針對異步通知建置的服務,例如 Azure 事件方格。
  • 回應必須即時串流至用戶端。
  • 用戶端需要收集許多結果,而且收到這些結果的延遲很重要。 請改為考慮服務總線模式。
  • 您可以使用伺服器端持續性網路連線,例如 WebSocket 或 SignalR。 這些服務可用來通知呼叫端結果。
  • 網路設計可讓您開啟埠以接收異步回呼或 Webhook。

工作負載設計

架構設計人員應該評估如何在工作負載的設計中使用異步要求-回復模式,以解決 Azure 架構良好架構支柱涵蓋的目標和原則。 例如:

要素 此模式如何支援支柱目標
效能效率 可透過調整、數據、程式代碼的優化,有效率地協助您的工作負載 符合需求 將不需要立即解答之處理程式的要求和回復階段分離,可改善系統的回應性和延展性。 身為異步 appproach,您可以將伺服器端的並行存取最大化,並將工作排程在容量允許時完成。

- PE:05 調整和分割
- PE:07 程式代碼和基礎結構

如同任何設計決策,請考慮對其他可能以此模式導入之目標的任何取捨。

範例

下列程式代碼顯示使用 Azure Functions 實作此模式之應用程式的摘錄。 解決方案中有三個函式:

  • 異步 API 端點。
  • 狀態端點。
  • 後端函式,接受已排入佇列的工作專案並加以執行。

函式中異步要求回復模式結構的影像

GitHub 標誌此範例可在 GitHub取得。

AsyncProcessingWorkAcceptor 函式

AsyncProcessingWorkAcceptor 式會實作可接受用戶端應用程式工作的端點,並將它放在佇列中進行處理。

  • 函式會產生要求標識碼,並將它新增為佇列訊息的元數據。
  • HTTP 回應包含指向狀態端點的位置標頭。 要求標識碼是 URL 路徑的一部分。
public static class AsyncProcessingWorkAcceptor
{
    [FunctionName("AsyncProcessingWorkAcceptor")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] CustomerPOCO customer,
        [ServiceBus("outqueue", Connection = "ServiceBusConnectionAppSetting")] IAsyncCollector<ServiceBusMessage> OutMessages,
        ILogger log)
    {
        if (String.IsNullOrEmpty(customer.id) || string.IsNullOrEmpty(customer.customername))
        {
            return new BadRequestResult();
        }

        string reqid = Guid.NewGuid().ToString();

        string rqs = $"http://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{reqid}";

        var messagePayload = JsonConvert.SerializeObject(customer);
        var message = new ServiceBusMessage(messagePayload);
        message.ApplicationProperties.Add("RequestGUID", reqid);
        message.ApplicationProperties.Add("RequestSubmittedAt", DateTime.Now);
        message.ApplicationProperties.Add("RequestStatusURL", rqs);

        await OutMessages.AddAsync(message);

        return new AcceptedResult(rqs, $"Request Accepted for Processing{Environment.NewLine}ProxyStatus: {rqs}");
    }
}

AsyncProcessingBackgroundWorker 函式

AsyncProcessingBackgroundWorker 式會從佇列中挑選作業、根據訊息承載執行某些工作,並將結果寫入記憶體帳戶。

public static class AsyncProcessingBackgroundWorker
{
    [FunctionName("AsyncProcessingBackgroundWorker")]
    public static async Task RunAsync(
        [ServiceBusTrigger("outqueue", Connection = "ServiceBusConnectionAppSetting")] BinaryData customer,
        IDictionary<string, object> applicationProperties,
        [Blob("data", FileAccess.ReadWrite, Connection = "StorageConnectionAppSetting")] BlobContainerClient inputContainer,
        ILogger log)
    {
        // Perform an actual action against the blob data source for the async readers to be able to check against.
        // This is where your actual service worker processing will be performed

        var id = applicationProperties["RequestGUID"] as string;

        BlobClient blob = inputContainer.GetBlobClient($"{id}.blobdata");

        // Now write the results to blob storage.
        await blob.UploadAsync(customer);
    }
}

AsyncOperationStatusChecker 函式

AsyncOperationStatusChecker 式會實作狀態端點。 此函式會先檢查要求是否已完成

  • 如果要求已完成,函式會傳回代客密鑰給回應,或立即將呼叫重新導向至代客密鑰 URL。
  • 如果要求仍在擱置中,則我們應該傳回 200 個 程式代碼,包括目前的狀態
public static class AsyncOperationStatusChecker
{
    [FunctionName("AsyncOperationStatusChecker")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "RequestStatus/{thisGUID}")] HttpRequest req,
        [Blob("data/{thisGuid}.blobdata", FileAccess.Read, Connection = "StorageConnectionAppSetting")] BlockBlobClient inputBlob, string thisGUID,
        ILogger log)
    {

        OnCompleteEnum OnComplete = Enum.Parse<OnCompleteEnum>(req.Query["OnComplete"].FirstOrDefault() ?? "Redirect");
        OnPendingEnum OnPending = Enum.Parse<OnPendingEnum>(req.Query["OnPending"].FirstOrDefault() ?? "OK");

        log.LogInformation($"C# HTTP trigger function processed a request for status on {thisGUID} - OnComplete {OnComplete} - OnPending {OnPending}");

        // Check to see if the blob is present
        if (await inputBlob.ExistsAsync())
        {
            // If it's present, depending on the value of the optional "OnComplete" parameter choose what to do.
            return await OnCompleted(OnComplete, inputBlob, thisGUID);
        }
        else
        {
            // If it's NOT present, then we need to back off. Depending on the value of the optional "OnPending" parameter, choose what to do.
            string rqs = $"http://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{thisGUID}";

            switch (OnPending)
            {
                case OnPendingEnum.OK:
                    {
                        // Return an HTTP 200 status code.
                        return new OkObjectResult(new { status = "In progress", Location = rqs });
                    }

                case OnPendingEnum.Synchronous:
                    {
                        // Back off and retry. Time out if the backoff period hits one minute.
                        int backoff = 250;

                        while (!await inputBlob.ExistsAsync() && backoff < 64000)
                        {
                            log.LogInformation($"Synchronous mode {thisGUID}.blob - retrying in {backoff} ms");
                            backoff = backoff * 2;
                            await Task.Delay(backoff);
                        }

                        if (await inputBlob.ExistsAsync())
                        {
                            log.LogInformation($"Synchronous Redirect mode {thisGUID}.blob - completed after {backoff} ms");
                            return await OnCompleted(OnComplete, inputBlob, thisGUID);
                        }
                        else
                        {
                            log.LogInformation($"Synchronous mode {thisGUID}.blob - NOT FOUND after timeout {backoff} ms");
                            return new NotFoundResult();
                        }
                    }

                default:
                    {
                        throw new InvalidOperationException($"Unexpected value: {OnPending}");
                    }
            }
        }
    }

    private static async Task<IActionResult> OnCompleted(OnCompleteEnum OnComplete, BlockBlobClient inputBlob, string thisGUID)
    {
        switch (OnComplete)
        {
            case OnCompleteEnum.Redirect:
                {
                    // Redirect to the SAS URI to blob storage

                    return new RedirectResult(inputBlob.GenerateSASURI());
                }

            case OnCompleteEnum.Stream:
                {
                    // Download the file and return it directly to the caller.
                    // For larger files, use a stream to minimize RAM usage.
                    return new OkObjectResult(await inputBlob.DownloadContentAsync());
                }

            default:
                {
                    throw new InvalidOperationException($"Unexpected value: {OnComplete}");
                }
        }
    }
}

public enum OnCompleteEnum
{

    Redirect,
    Stream
}

public enum OnPendingEnum
{

    OK,
    Synchronous
}

下一步

實作此模式時,可能會有下列相關信息: