Durable Functions 中的展開傳送/收合傳送情節 - 雲端備份範例

「展開傳送/收合傳送」是指同時執行多個函式,然後對結果執行一些彙總的模式。 本文以一個範例說明如何使用 Durable Functions 來實作展開傳送/收合傳送情節。 範例是一個永久性函式,可將應用程式的所有或部分網站內容備份至 Azure 儲存體。

注意

適用於 Azure Functions 的第 4 版 Node.js 程式設計模型已正式推出。 新的 v4 模型旨在為 JavaScript 和 TypeScript 開發人員提供更靈活的直覺式體驗。 如需深入了解 v3 與 v4 之間的差異,請參閱移轉指南

在下列程式碼片段中,JavaScript (PM4) 表示程式設計模型 V4,這是新的體驗。

必要條件

案例概觀

在此範例中,函式會將指定目錄中的所有檔案,以遞迴方式上傳至 blob 儲存體。 還會計算已上傳的位元組總數。

您可以只撰寫一個函式來處理這一切。 您會遇到的主要問題是延展性。 單一函式執行只能在單一虛擬機器上執行,所以輸送量受限於該單一 VM 的輸送量。 另一個問題是可靠性。 如果半途失敗,或整個程序耗時超過 5 分鐘,備份可能會失敗,而處於只有部分完成的狀態。 於是就必須重新啟動。

更強固的方法是撰寫兩個一般函式:其中一個會列舉檔案,並將檔案名稱加入佇列中,另一個會讀取佇列,並將檔案上傳至 blob 儲存體。 此方法有較高的輸送量和可靠性,但需要您佈建及管理佇列。 更重要的是,如果您想要再多一些功能,例如報告已上傳的位元組總數,則在狀態管理協調方面會變得相當複雜。

Durable Functions 方法提供上述所有優點,而且額外負荷極低。

函式

本文說明範例應用程式中的函式如下:

  • E2_BackupSiteContent協調器函式,呼叫 E2_GetFileList 以取得要備份的檔案清單,然後呼叫 E2_CopyFileToBlob 來備份每個檔案。
  • E2_GetFileList:傳回目錄中檔案清單的活動函式
  • E2_CopyFileToBlob:將單一檔案備份至 Azure Blob 儲存體的活動函式。

E2_BackupSiteContent 協調器函式

此協調器函式基本上會執行下列動作:

  1. 接受 rootDirectory 值作為輸入參數。
  2. 呼叫函式以取得 rootDirectory 下的檔案遞迴清單。
  3. 執行多次平行函式呼叫,將每個檔案上傳到 Azure Blob 儲存體。
  4. 等候所有上傳完成。
  5. 傳回已上傳到 Azure Blob 儲存體的位元組總數。

以下是實作協調器函式的程式碼:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

請注意 await Task.WhenAll(tasks); 這一行。 未等待 E2_CopyFileToBlob 函式的所有個別呼叫,這可使其平行執行。 將這一批工作傳給 Task.WhenAll 時,將會傳回一個「直到所有複製作業都完成」才會完成的工作。 如果您熟悉 .NET 中的工作平行程式庫 (TPL),則對此不會感到陌生。 差別在於,這些工作可以在多個虛擬機器上同時執行,而 Durable Functions 擴充可確保端對端執行在處理序回收的情況下迅速恢復。

結束等候 Task.WhenAll 之後,就可知道所有函式呼叫已完成,而值也已傳回給我們。 每次呼叫 E2_CopyFileToBlob 都會傳回已上傳的位元組數,因此,只要合計所有這些傳回值,就能算出位元組總數。

協助程式活動函式

如同其他範例一樣,協助程式活動函式只不過是使用 activityTrigger 觸發程序繫結的一般函式。

E2_GetFileList 活動函式

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

注意

您可能覺得奇怪,為何不能直接將此程式碼放入協調器函式中。 您可以這樣做,但這會違反協調器函式的基本規則之一,也就是永遠都不該執行 I/O,包括本機檔案系統存取。 如需詳細資訊,請參閱協調器函式程式碼條件約束

E2_CopyFileToBlob 活動函式

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

注意

您必須安裝 Microsoft.Azure.WebJobs.Extensions.Storage NuGet 封裝來執行該範例程式碼。

函式會使用 Azure Functions 繫結的一些進階功能 (也就是使用 Binder 參數),但基於本逐步解說的目的,您不必擔心這些細節。

實作會從磁碟載入檔案,並以非同步方式將內容串流至 "backups" 容器中相同名稱的 blob。 傳回值是複製到儲存體的位元組數,協調器函式接著會利用此值來計算的總數。

注意

這是將 I/O 作業移入 activityTrigger 函式中的最佳範例。 不僅可將工作分散至許多不同的機器,還可讓您享受到檢查點檢查進度的好處。 如果主機處理序由於任何原因而終止,您會知道哪些上傳已完成。

執行範例

您可以在 Windows 上藉由傳送下列 HTTP POST 要求來啟動協調流程。

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

或者,在 Linux 函式應用程式上 (Python 目前只在 Linux 上執行App Service),您可以啟動協調流程,如下所示:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

注意

您叫用的 HttpStart函式僅適用於 JSON 格式的內容。 因此,需要 Content-Type: application/json 標頭,而且目錄路徑會編碼為 JSON 字串。 此外,HTTP 程式碼片段假設 host.json 檔案中有一個項目會從所有的 HTTP 觸發程序函式 URL 中移除預設 api/ 前置詞。 您可以在範例的 host.json 檔案中找到此組態的標記。

此 HTTP 要求將會觸發 E2_BackupSiteContent 協調器並傳遞字串 D:\home\LogFiles 當作參數。 回應提供的連結可取得備份作業的狀態:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

根據您在應用程式函式中有多少記錄檔而定,這項作業可能需要幾分鐘才能完成。 您可以在前一個 HTTP 202 回應的 Location 標頭中查詢 URL,以取得最新狀態。

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

在此情況下,函式仍在執行中。 您可以看到已儲存至協調器狀態的輸入,以及上次更新時間。 您可以繼續使用 Location 標頭值來輪詢是否已完成。 當狀態為 "Completed" 時,您會看到類似下列的 HTTP 回應值:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

現在,您可以看到協調流程已完成,以及大約耗費多少時間完成。 您還會看到 output 欄位的值,這指出大約已上傳 450 KB 的記錄。

下一步

此範例已說明如何實作展開傳送/收合傳送模式。 下一個範例示範如何使用長期計時器來實作監視模式。