Durable Functions 中的展開傳送/收合傳送情節 - 雲端備份範例
本文內容
「展開傳送/收合傳送」 是指同時執行多個函式,然後對結果執行一些彙總的模式。 本文以一個範例說明如何使用 Durable Functions 來實作展開傳送/收合傳送情節。 範例是一個永久性函式,可將應用程式的所有或部分網站內容備份至 Azure 儲存體。
注意
適用於 Azure Functions 的第 4 版 Node.js 程式設計模型已正式推出。 新的 v4 模型旨在為 JavaScript 和 TypeScript 開發人員提供更靈活的直覺式體驗。 如需深入了解 v3 與 v4 之間的差異,請參閱移轉指南 。
在下列程式碼片段中,JavaScript (PM4) 表示程式設計模型 V4,這是新的體驗。
必要條件
案例概觀
在此範例中,函式會將指定目錄中的所有檔案,以遞迴方式上傳至 blob 儲存體。 還會計算已上傳的位元組總數。
您可以只撰寫一個函式來處理這一切。 您會遇到的主要問題是延展性 。 單一函式執行只能在單一虛擬機器上執行,所以輸送量受限於該單一 VM 的輸送量。 另一個問題是可靠性 。 如果半途失敗,或整個程序耗時超過 5 分鐘,備份可能會失敗,而處於只有部分完成的狀態。 於是就必須重新啟動。
更強固的方法是撰寫兩個一般函式:其中一個會列舉檔案,並將檔案名稱加入佇列中,另一個會讀取佇列,並將檔案上傳至 blob 儲存體。 此方法有較高的輸送量和可靠性,但需要您佈建及管理佇列。 更重要的是,如果您想要再多一些功能,例如報告已上傳的位元組總數,則在狀態管理 和協調 方面會變得相當複雜。
Durable Functions 方法提供上述所有優點,而且額外負荷極低。
函式
本文說明範例應用程式中的函式如下:
E2_BackupSiteContent
:協調器函式 ,呼叫 E2_GetFileList
以取得要備份的檔案清單,然後呼叫 E2_CopyFileToBlob
來備份每個檔案。
E2_GetFileList
:傳回目錄中檔案清單的活動函式 。
E2_CopyFileToBlob
:將單一檔案備份至 Azure Blob 儲存體的活動函式。
E2_BackupSiteContent 協調器函式
此協調器函式基本上會執行下列動作:
接受 rootDirectory
值作為輸入參數。
呼叫函式以取得 rootDirectory
下的檔案遞迴清單。
執行多次平行函式呼叫,將每個檔案上傳到 Azure Blob 儲存體。
等候所有上傳完成。
傳回已上傳到 Azure Blob 儲存體的位元組總數。
以下是實作協調器函式的程式碼:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
請注意 await Task.WhenAll(tasks);
這一行。 未 等待 E2_CopyFileToBlob
函式的所有個別呼叫,這可使其平行執行。 將這一批工作傳給 Task.WhenAll
時,將會傳回一個「直到所有複製作業都完成」 才會完成的工作。 如果您熟悉 .NET 中的工作平行程式庫 (TPL),則對此不會感到陌生。 差別在於,這些工作可以在多個虛擬機器上同時執行,而 Durable Functions 擴充可確保端對端執行在處理序回收的情況下迅速恢復。
結束等候 Task.WhenAll
之後,就可知道所有函式呼叫已完成,而值也已傳回給我們。 每次呼叫 E2_CopyFileToBlob
都會傳回已上傳的位元組數,因此,只要合計所有這些傳回值,就能算出位元組總數。
函式使用協調器函式的標準 function.json 。
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"disabled": false
}
以下是實作協調器函式的程式碼:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
請注意 yield context.df.Task.all(tasks);
這一行。 未 產生 E2_CopyFileToBlob
函式的所有個別呼叫,這可使其平行執行。 將這一批工作傳給 context.df.Task.all
時,將會傳回一個「直到所有複製作業都完成」 才會完成的工作。 如果您熟悉 JavaScript 中的 Promise.all
,則您不會感到陌生。 差別在於,這些工作可以在多個虛擬機器上同時執行,而 Durable Functions 擴充可確保端對端執行在處理序回收的情況下迅速恢復。
注意
儘管工作在概念上類似於 JavaScript Promise,協調器函式還是應該使用 context.df.Task.all
和 context.df.Task.any
,而不是使用 Promise.all
和 Promise.race
來管理工作平行處理。
從 context.df.Task.all
產生之後,就可知道所有函式呼叫已完成,而值也已傳回給我們。 每次呼叫 E2_CopyFileToBlob
都會傳回已上傳的位元組數,因此,只要合計所有這些傳回值,就能算出位元組總數。
以下是實作協調器函式的程式碼:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
請注意 yield context.df.Task.all(tasks);
這一行。 未 產生 copyFileToBlob
函式的所有個別呼叫,這可使其平行執行。 將這一批工作傳給 context.df.Task.all
時,將會傳回一個「直到所有複製作業都完成」 才會完成的工作。 如果您熟悉 JavaScript 中的 Promise.all
,則您不會感到陌生。 差別在於,這些工作可以在多個虛擬機器上同時執行,而 Durable Functions 擴充可確保端對端執行在處理序回收的情況下迅速恢復。
注意
儘管工作在概念上類似於 JavaScript Promise,協調器函式還是應該使用 context.df.Task.all
和 context.df.Task.any
,而不是使用 Promise.all
和 Promise.race
來管理工作平行處理。
從 context.df.Task.all
產生之後,就可知道所有函式呼叫已完成,而值也已傳回給我們。 每次呼叫 copyFileToBlob
都會傳回已上傳的位元組數,因此,只要合計所有這些傳回值,就能算出位元組總數。
函式使用協調器函式的標準 function.json 。
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
以下是實作協調器函式的程式碼:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
請注意 yield context.task_all(tasks);
這一行。 未 產生 E2_CopyFileToBlob
函式的所有個別呼叫,這可使其平行執行。 將這一批工作傳給 context.task_all
時,將會傳回一個「直到所有複製作業都完成」 才會完成的工作。 如果您熟悉 Python 中的 asyncio.gather
,則您不會感到陌生。 差別在於,這些工作可以在多個虛擬機器上同時執行,而 Durable Functions 擴充可確保端對端執行在處理序回收的情況下迅速恢復。
注意
雖然工作在概念上與 Python 可等候功能類似,協調器函式應該使用 yield
和 context.task_all
與 context.task_any
API 來管理工作平行處理。
從 context.task_all
產生之後,就可知道所有函式呼叫已完成,而值也已傳回給我們。 每次呼叫 E2_CopyFileToBlob
都會傳回已上傳的位元組數,因此,只要合計所有這些傳回值,我們就能算出位元組總數。
協助程式活動函式
如同其他範例一樣,協助程式活動函式只不過是使用 activityTrigger
觸發程序繫結的一般函式。
E2_GetFileList 活動函式
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
ILogger log)
{
log.LogInformation($"Searching for files under '{rootDirectory}'...");
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");
return files;
}
E2_GetFileList
的 function.json 檔案看起來像下面這樣:
{
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
],
"disabled": false
}
實作如下:
const readdirp = require("readdirp");
module.exports = function (context, rootDirectory) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
readdirp(
{ root: rootDirectory, entryType: "all" },
function (fileInfo) {
if (!fileInfo.stat.isDirectory()) {
allFilePaths.push(fileInfo.fullPath);
}
},
function (err, res) {
if (err) {
throw err;
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
context.done(null, allFilePaths);
}
);
};
函式會 readdirp
使用模組 (2.x 版) 以遞迴方式讀取目錄結構。
以下是 getFileList
活動函式的實作:
const df = require("durable-functions");
const readdirp = require("readdirp");
const getFileListActivityName = "getFileList";
df.app.activity(getFileListActivityName, {
handler: async function (rootDirectory, context) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
for await (const entry of readdirp(rootDirectory, { type: "files" })) {
allFilePaths.push(entry.fullPath);
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
return allFilePaths;
},
});
此函式使用 readdirp
模組 (版本 3.x
),以遞迴方式讀取目錄結構。
E2_GetFileList
的 function.json 檔案看起來像下面這樣:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
]
}
實作如下:
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
注意
您可能覺得奇怪,為何不能直接將此程式碼放入協調器函式中。 您可以這樣做,但這會違反協調器函式的基本規則之一,也就是永遠都不該執行 I/O,包括本機檔案系統存取。 如需詳細資訊,請參閱協調器函式程式碼條件約束 。
E2_CopyFileToBlob 活動函式
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
注意
您必須安裝 Microsoft.Azure.WebJobs.Extensions.Storage
NuGet 封裝來執行該範例程式碼。
函式會使用 Azure Functions 繫結的一些進階功能 (也就是使用 Binder
參數 ),但基於本逐步解說的目的,您不必擔心這些細節。
E2_CopyFileToBlob
的 function.json 檔案也一樣簡單:
{
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
},
{
"name": "out",
"type": "blob",
"path": "",
"connection": "AzureWebJobsStorage",
"direction": "out"
}
],
"disabled": false
}
JavaScript 實作會使用適用於節點的 Azure 儲存體 SDK ,將檔案上傳至Azure Blob 儲存體。
const fs = require("fs");
const path = require("path");
const storage = require("azure-storage");
module.exports = function (context, filePath) {
const container = "backups";
const root = path.parse(filePath).root;
const blobPath = filePath.substring(root.length).replace("\\", "/");
const outputLocation = `backups/${blobPath}`;
const blobService = storage.createBlobService();
blobService.createContainerIfNotExists(container, (error) => {
if (error) {
throw error;
}
fs.stat(filePath, function (error, stats) {
if (error) {
throw error;
}
context.log(
`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`
);
const readStream = fs.createReadStream(filePath);
blobService.createBlockBlobFromStream(
container,
blobPath,
readStream,
stats.size,
function (error) {
if (error) {
throw error;
}
context.done(null, stats.size);
}
);
});
});
};
copyFileToBlob
的 JavaScript 實作使用 Azure 儲存體輸出繫結,將檔案上傳至 Azure Blob 儲存體。
const df = require("durable-functions");
const fs = require("fs/promises");
const { output } = require("@azure/functions");
const copyFileToBlobActivityName = "copyFileToBlob";
const blobOutput = output.storageBlob({
path: "backups/{backupPath}",
connection: "StorageConnString",
});
df.app.activity(copyFileToBlobActivityName, {
extraOutputs: [blobOutput],
handler: async function ({ backupPath, filePath }, context) {
const outputLocation = `backups/${backupPath}`;
const stats = await fs.stat(filePath);
context.log(`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`);
const fileContents = await fs.readFile(filePath);
context.extraOutputs.set(blobOutput, fileContents);
return stats.size;
},
});
E2_CopyFileToBlob
的 function.json 檔案也一樣簡單:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
}
]
}
Python 實作會使用適用於 Python 的 Azure 儲存體 SDK ,將檔案上傳至Azure Blob 儲存體。
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
實作會從磁碟載入檔案,並以非同步方式將內容串流至 "backups" 容器中相同名稱的 blob。 傳回值是複製到儲存體的位元組數,協調器函式接著會利用此值來計算的總數。
注意
這是將 I/O 作業移入 activityTrigger
函式中的最佳範例。 不僅可將工作分散至許多不同的機器,還可讓您享受到檢查點檢查進度的好處。 如果主機處理序由於任何原因而終止,您會知道哪些上傳已完成。
執行範例
您可以在 Windows 上藉由傳送下列 HTTP POST 要求來啟動協調流程。
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
或者,在 Linux 函式應用程式上 (Python 目前只在 Linux 上執行App Service),您可以啟動協調流程,如下所示:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
注意
您叫用的 HttpStart
函式僅適用於 JSON 格式的內容。 因此,需要 Content-Type: application/json
標頭,而且目錄路徑會編碼為 JSON 字串。 此外,HTTP 程式碼片段假設 host.json
檔案中有一個項目會從所有的 HTTP 觸發程序函式 URL 中移除預設 api/
前置詞。 您可以在範例的 host.json
檔案中找到此組態的標記。
此 HTTP 要求將會觸發 E2_BackupSiteContent
協調器並傳遞字串 D:\home\LogFiles
當作參數。 回應提供的連結可取得備份作業的狀態:
HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
(...trimmed...)
根據您在應用程式函式中有多少記錄檔而定,這項作業可能需要幾分鐘才能完成。 您可以在前一個 HTTP 202 回應的 Location
標頭中查詢 URL,以取得最新狀態。
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}
在此情況下,函式仍在執行中。 您可以看到已儲存至協調器狀態的輸入,以及上次更新時間。 您可以繼續使用 Location
標頭值來輪詢是否已完成。 當狀態為 "Completed" 時,您會看到類似下列的 HTTP 回應值:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
現在,您可以看到協調流程已完成,以及大約耗費多少時間完成。 您還會看到 output
欄位的值,這指出大約已上傳 450 KB 的記錄。
下一步
此範例已說明如何實作展開傳送/收合傳送模式。 下一個範例示範如何使用長期計時器 來實作監視模式。