什麼是 Durable Functions?
Durable Functions 是 Azure Functions 的一項功能,可讓您在無伺服器計算環境中撰寫具狀態函式。 此擴充功能可讓您使用 Azure Functions 程式設計模型,藉由撰寫協調器函式來定義具狀態的工作流程,以及撰寫實體函式來定義具狀態的實體。 在幕後,延伸模組會為您管理狀態、檢查點和重新啟動,讓您可以專注於商務邏輯。
支援的語言
Durable Functions 設計為可搭配所有的 Azure Functions 程式設計語言使用,但對每個語言可能有不同的最低需求。 下表顯示最低支援的應用程式設定:
語言堆疊 | Azure Functions 執行階段版本 | 語言背景工作角色版本 | 搭售方案最低版本 |
---|---|---|---|
.NET / C# / F# | Functions 1.0+ | 同處理序 跨處理序 |
n/a |
JavaScript/TypeScript (V3 prog. 模型) | Functions 2.0+ | Node 8+ | 2.x 搭售方案 |
JavaScript/TypeScript (V4 prog. 模型) | Functions 4.25+ | Node 18+ | 3.15+ 套件組合 |
Python | Functions 2.0+ | Python 3.7+ | 2.x 搭售方案 |
Python (V2 prog. 模型) | Functions 4.0+ | Python 3.7+ | 3.15+ 套件組合 |
PowerShell | Functions 3.0+ | PowerShell 7+ | 2.x 搭售方案 |
Java | Functions 4.0+ | Java 8+ | 4.x 搭售方案 |
重要
本文使用索引標籤來支援多個版本的 Node.js 程式設計模型。 v4 模型已正式推出,旨在為 JavaScript 和 TypeScript 開發人員提供更靈活且更直覺的體驗。 如需 v4 模型運作方式的更多詳細資料,請參閱 Azure Functions Node.js 開發人員指南。 若要深入了解 v3 與 v4 之間的差異,請參閱移轉指南。
重要
本文使用索引標籤來支援多個版本的 Python 程序設計模型。 v2 模型已正式推出,其設計目的是要透過裝飾項目,提供更以程式碼為中心的方式撰寫函式。 如需 v2 模型運作方式的更多詳細資料,請參閱 Azure Functions Python 開發人員指南。
如同 Azure Functions,有一些範本可協助您使用 Visual Studio、Visual Studio Code 和 Azure 入口網站開發 Durable Functions。
應用程式模式
Durable Functions 的主要使用案例是簡化無伺服器應用程式中的複雜、具狀態協調需求。 下列各節描述可受益於 Durable Functions 的典型應用程式模式:
模式 #1:函式鏈結
函式鏈結模式會按照特定順序執行一連串的函式。 在此模式中,一個函式的輸出會套用至另一個函式的輸入。 在每個函式間使用佇列,可確保系統耐用且可調整,即使一個函式到下一個函式間有控制流程。
您可以使用 Durable Functions 簡潔地實作如下列範例所示的函式鏈結模式。
在此範例中,F1
、F2
、F3
和 F4
值是相同函式應用程式中其他函式的名稱。 您可以使用一般命令式編碼建構來實作控制流程。 程式碼從上到下執行。 程式碼可以涉及現有的語言控制流程語意,例如條件和迴圈。 您可以在 try
/catch
/finally
區塊中包含錯誤處理邏輯。
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
您可以使用 context
參數依名稱叫用其他函式、傳遞參數,以及傳回函式輸出。 每當程式碼呼叫 await
時,Durable Functions 架構便會對目前函式執行個體的進度設定檢查點。 如果處理序或虛擬機器在執行途中回收,函式執行個體便會從先前的 await
呼叫繼續執行。 如需詳細資訊,請參閱下一節:模式 #2:展開傳送/收合傳送。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
您可以使用 context.df
物件依名稱叫用其他函式、傳遞參數,以及傳回函式輸出。 每當程式碼呼叫 yield
時,Durable Functions 架構便會對目前函式執行個體的進度設定檢查點。 如果處理序或虛擬機器在執行途中回收,函式執行個體便會從先前的 yield
呼叫繼續執行。 如需詳細資訊,請參閱下一節:模式 #2:展開傳送/收合傳送。
注意
JavaScript 中的 context
物件代表整個函式內容。 請使用主要內容上的 df
屬性來存取 Durable Functions 內容。
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
您可以使用 context
物件依名稱叫用其他函式、傳遞參數,以及傳回函式輸出。 每當程式碼呼叫 yield
時,Durable Functions 架構便會對目前函式執行個體的進度設定檢查點。 如果處理序或虛擬機器在執行途中回收,函式執行個體便會從先前的 yield
呼叫繼續執行。 如需詳細資訊,請參閱下一節:模式 #2:展開傳送/收合傳送。
注意
Python 中的 context
物件代表協調流程內容。 使用協調流程內容上的 function_context
屬性,存取主要的 Azure Functions 內容。
param($Context)
$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z
您可以使用 Invoke-DurableActivity
命令依名稱叫用其他函式、傳遞參數,以及傳回函式輸出。 每當程式碼在沒有 NoWait
參數的情況下呼叫 Invoke-DurableActivity
時,Durable Functions 架構便會對目前函式執行個體的進度設定檢查點。 如果處理序或虛擬機器在執行途中回收,函式執行個體便會從先前的 Invoke-DurableActivity
呼叫繼續執行。 如需詳細資訊,請參閱下一節:模式 #2:展開傳送/收合傳送。
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
您可以使用 ctx
物件依名稱叫用其他函式、傳遞參數,以及傳回函式輸出。 這些方法的輸出是 Task<V>
物件,其中 V
是已叫用函式所傳回的資料類型。 每當您呼叫 Task<V>.await()
時,Durable Functions 架構便會對目前函式執行個體的進度設定檢查點。 如果處理序非預期地在執行途中回收,函式執行個體便會從先前的 Task<V>.await()
呼叫繼續執行。 如需詳細資訊,請參閱下一節:模式 #2:展開傳送/收合傳送。
模式 #2:展開傳送/收合傳送
在展開傳送/收合傳送模式中,您會以平行方式執行多個函式,然後等候所有函式完成。 某些彙總工作通常會在函式傳回結果時執行。
透過一般函式,您可以讓函式將多則訊息傳送至佇列,來進行展開傳送。 反過來的收合傳送則困難得多。 若要收合傳送,您可撰寫程式碼來追蹤佇列所觸發的函式何時結束,然後儲存函式輸出。
Durable Functions 擴充功能會以較簡單的程式碼處理此模式:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
展開傳送工作會散發至 F2
函式的多個執行個體。 系統會使用動態工作清單來追蹤此工作。 系統會呼叫 Task.WhenAll
以等候所有已呼叫的函式完成。 然後,F2
函式輸出會從動態工作清單彙總,並傳遞至 F3
函式。
在 Task.WhenAll
的 await
呼叫發生的自動檢查點作業,可確保可能的中途當機或重新開機不需要重新啟動已完成的工作。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
展開傳送工作會散發至 F2
函式的多個執行個體。 系統會使用動態工作清單來追蹤此工作。 系統會呼叫 context.df.Task.all
API 以等候所有已呼叫的函式完成。 然後,F2
函式輸出會從動態工作清單彙總,並傳遞至 F3
函式。
在 context.df.Task.all
的 yield
呼叫發生的自動檢查點作業,可確保可能的中途當機或重新開機不需要重新啟動已完成的工作。
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
展開傳送工作會散發至 F2
函式的多個執行個體。 系統會使用動態工作清單來追蹤此工作。 系統會呼叫 context.task_all
API 以等候所有已呼叫的函式完成。 然後,F2
函式輸出會從動態工作清單彙總,並傳遞至 F3
函式。
在 context.task_all
的 yield
呼叫發生的自動檢查點作業,可確保可能的中途當機或重新開機不需要重新啟動已完成的工作。
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
展開傳送工作會散發至 F2
函式的多個執行個體。 請注意在 F2
函式上呼叫 NoWait
參數的使用方式:此交換器無須等待活動完成,即可讓協調器繼續叫用 F2
。 系統會使用動態工作清單來追蹤此工作。 系統會呼叫 Wait-ActivityFunction
命令以等候所有已呼叫的函式完成。 然後,F2
函式輸出會從動態工作清單彙總,並傳遞至 F3
函式。
在 Wait-ActivityFunction
呼叫發生的自動檢查點作業,可確保可能的中途當機或重新開機不需要重新啟動已完成的工作。
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
展開傳送工作會散發至 F2
函式的多個執行個體。 系統會使用動態工作清單來追蹤此工作。 系統會呼叫 ctx.allOf(parallelTasks).await()
以等候所有已呼叫的函式完成。 然後,F2
函式輸出會從動態工作清單彙總,並以協調器函式的輸出傳回。
在 ctx.allOf(parallelTasks)
的 .await()
呼叫發生的自動檢查點作業,可確保未預期的處理程序回收任何不需要重新啟動已完成的工作。
注意
在少數的情況下,在活動函式完成後,但將其完成儲存到協調流程歷程記錄前的空窗期中可能會發生損毀。 如果發生這種情況,則會在程序復原之後從頭重新執行活動函式。
模式 #3:非同步的 HTTP API
非同步 HTTP API 模式可處理與外部用戶端協調長時間執行作業狀態的問題。 實作此模式的常見方式是讓 HTTP 端點觸發長時間執行的動作。 然後,將用戶端重新導向至用戶端輪詢的狀態端點,以了解作業何時完成。
Durable Functions 為此模式提供內建支援,簡化或甚至移除與長時間執行的函式執行互動所需撰寫的程式碼。 例如,Durable Functions 快速入門範例 (C#、JavaScript、TypeScript、Python、PowerShell 和 Java) 會顯示一個簡單的 REST 命令,您可將此命令用來啟動新的協調器函式執行個體。 在執行個體啟動之後,延伸模組會公開 Webhook HTTP API,其會查詢協調器函式狀態。
下列範例顯示啟動協調器並查詢其狀態的 REST 命令。 為了清楚起見,範例中會省略某些通訊協定詳細資料。
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
Durable Functions 執行階段會為您管理狀態,因此您不必實作自己的狀態追蹤機制。
Durable Functions 延伸模組會公開內建的 HTTP API,其會管理長時間執行的協調流程。 您也可以使用自己的函式觸發程序 (例如 HTTP、佇列或 Azure 事件中樞) 和耐久用戶端繫結,自行執行此模式。 例如,您可以使用佇列訊息來觸發終止。 或者,您可以使用由 Microsoft Entra 驗證原則保護的 HTTP 觸發程序,而不是使用內建 HTTP API (其使用產生的金鑰進行驗證)。
如需詳細資訊,請參閱 HTTP 功能一文,其中說明如何使用 Durable Functions 擴充功能,透過 HTTP 公開非同步、長時間執行的程序。
模式 #4:監視器
監視模式是指工作流程中靈活的週期性流程。 一項範例是輪詢直到滿足特定條件。 您可以一般計時器觸發程序來解決基本案例,例如定期清除作業,但其間隔是靜態的,且管理執行個體存留期會變得很複雜。 您可以使用 Durable Functions 來建立彈性的週期間隔、管理工作存留期,以及從單一協調流程建立多個監視流程。
監視模式的範例是反轉先前的非同步 HTTP API 案例。 長期執行的監視器會取用外部端點,而後等待狀態變更,而不會公開端點,以供外部用戶端監視長時間執行的作業。
在幾行程式碼中,您可以使用 Durable Functions,建立多個觀察任意端點的監視器。 監視器可以在符合條件時結束執行,或者另一個函式可以使用永久性協調流程用戶端來終止監視器。 您可以根據特定條件 (例如,指數輪詢) 來變更監視器的 wait
間隔時間。
下列程式碼會實作基本監視器:
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
收到要求時,系統會針對該作業識別碼建立新的協調流程執行個體。 執行個體會輪詢狀態,直到符合條件或逾時到期為止。 長期計時器可控制輪詢間隔。 接著可執行更多工作,否則協調流程可能會結束。
模式 #5:人為互動
許多自動化流程涉及某種人類互動。 讓人類參與自動化流程相當棘手,因為人員並不像雲端服務那樣經常有空且回應迅速。 自動化流程可能會使用逾時和補償邏輯來允許此互動。
核准程序是需要人為互動的商務程序範例。 超過特定金額的費用報告就可能需要管理員核准。 如果經理未在 72 小時內核准費用報告 (可能是經理休假了),系統就會啟動向上呈報程序,以尋求其他人的核准 (或許是經理的經理)。
您可以在此範例中使用協調器函式實作模式。 協調器會使用長期計時器來要求核准。 如果發生逾時,協調器就會呈報。 協調器會等待外部事件,例如人為互動所產生的通知。
這些範例會建立核准程序,以示範人類互動模式:
[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
若要建立長期計時器,請呼叫 context.CreateTimer
。 通知則由 context.WaitForExternalEvent
接收。 接著會呼叫 Task.WhenAny
,以決定是要向上呈報 (先發生逾時) 還是處理核准 (逾時前收到核准)。
const df = require("durable-functions");
const moment = require('moment');
module.exports = df.orchestrator(function*(context) {
yield context.df.callActivity("RequestApproval");
const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
const durableTimeout = context.df.createTimer(dueTime.toDate());
const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
if (winningEvent === approvalEvent) {
durableTimeout.cancel();
yield context.df.callActivity("ProcessApproval", approvalEvent.result);
} else {
yield context.df.callActivity("Escalate");
}
});
若要建立長期計時器,請呼叫 context.df.createTimer
。 通知則由 context.df.waitForExternalEvent
接收。 接著會呼叫 context.df.Task.any
,以決定是要向上呈報 (先發生逾時) 還是處理核准 (逾時前收到核准)。
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
due_time = context.current_utc_datetime + timedelta(hours=72)
durable_timeout_task = context.create_timer(due_time)
approval_event_task = context.wait_for_external_event("ApprovalEvent")
winning_task = yield context.task_any([approval_event_task, durable_timeout_task])
if approval_event_task == winning_task:
durable_timeout_task.cancel()
yield context.call_activity("ProcessApproval", approval_event_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
若要建立長期計時器,請呼叫 context.create_timer
。 通知則由 context.wait_for_external_event
接收。 接著會呼叫 context.task_any
,以決定是要向上呈報 (先發生逾時) 還是處理核准 (逾時前收到核准)。
param($Context)
$output = @()
$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId
$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
if ($approvalEvent -eq $firstEvent) {
Stop-DurableTimerTask -Task $durableTimeoutEvent
$output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
$output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}
$output
若要建立長期計時器,請呼叫 Start-DurableTimer
。 通知則由 Start-DurableExternalEventListener
接收。 接著會呼叫 Wait-DurableTask
,以決定是要向上呈報 (先發生逾時) 還是處理核准 (逾時前收到核准)。
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
ctx.waitForExternalEvent(...).await()
方法呼叫會暫停協調流程,直到它收到名為 ApprovalEvent
的事件,其具有 boolean
承載。 如果收到事件,則會呼叫活動函式來處理核准結果。 不過,如果在 timeout
(72 小時) 到期之前未收到這類事件,就會引發 TaskCanceledException
且會呼叫 Escalate
活動函式。
注意
在使用量方案中執行時,等候外部事件所花費的時間不收費。
外部用戶端可以使用內建的 HTTP API,將事件通知傳遞至等候中的協調器函式:
curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"
此外也可從相同函式應用程式中的另一個函式使用長期協調流程用戶端來引發事件:
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
[HttpTrigger] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
import azure.durable_functions as df
async def main(client: str):
durable_client = df.DurableOrchestrationClient(client)
is_approved = True
await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"
@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
@HttpTrigger(name = "instanceId") String instanceId,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
client.raiseEvent(instanceId, "ApprovalEvent", true);
}
模式 #6:彙總工具 (具狀態實體)
第六個模式是關於將一段時間的事件資料彙總成單一可定址的「實體」。 在此模式中,所彙總的資料可能來自多個來源、可分批傳遞,或可散佈於長期的時間。 彙總工具可能需要在事件資料送達時對其採取動作,而外部用戶端可能需要查詢所彙總的資料。
嘗試使用一般無狀態函式來實作此模式的弔詭之處在於,並行控制會變成一大挑戰。 您不只需要擔心多個執行緒同時修改相同的資料,也需要擔心如何確保彙總工具一次只在單一 VM 上執行。
您可以使用耐久性實體,輕鬆地將此模式實作為單一函式。
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
耐久性實體也可以模型化為 .NET 中的類別。 如果作業清單固定且變得很大,此模型會很實用。 下列範例等同於使用 .NET 類別和方法來實作 Counter
實體。
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");
module.exports = df.entity(function(context) {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case "add":
const amount = context.df.getInput();
context.df.setState(currentValue + amount);
break;
case "reset":
context.df.setState(0);
break;
case "get":
context.df.return(currentValue);
break;
}
});
import azure.functions as func
import azure.durable_functions as df
def entity_function(context: df.DurableOrchestrationContext):
current_value = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "add":
amount = context.get_input()
current_value += amount
context.set_result(current_value)
elif operation == "reset":
current_value = 0
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
main = df.Entity.create(entity_function)
注意
PowerShell 目前不支援耐久性實體。
注意
JAVA 目前不支援耐久性實體。
用戶端可以使用實體用戶端繫結,將實體函式的「作業」排入佇列 (也稱為「訊號處理」)。
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
注意
動態產生的 Proxy 也適用於在 .NET 中以型別安全方式進行實體訊號處理。 除了訊號處理外,用戶端也可以在協調流程用戶端繫結上使用型別安全方法來查詢實體函式的狀態。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
entity_id = df.EntityId("Counter", "myCounter")
instance_id = await client.signal_entity(entity_id, "add", 1)
return func.HttpResponse("Entity signaled")
實體函式可在 C#、JavaScript 和 Python 的 Durable Functions 2.0 和更新版本中取得。
技術
Durable Functions 擴充功能其實是以長期工作架構為基礎所建置的,長期工作架構是 GitHub 上以程式碼用來建置工作流程的開放原始碼程式庫。 如同 Azure Functions 是 Azure WebJobs 的無伺服器演化,Durable Functions 也是長期工作架構的無伺服器演化。 Microsoft 和其他組織會廣泛地使用長期工作架構,將任務關鍵程序自動化。 它天生就適合無伺服器的 Azure Functions 環境。
程式碼條件約束
為了提供可靠且長時間執行的執行保證,協調器函式具有一組必須遵循的編碼規則。 如需詳細資訊,請參閱協調器函式程式碼條件約束一文。
計費
Durable Functions 的收費方式與 Azure Functions 相同。 如需詳細資訊,請參閱 Azure Functions 價格。 在 Azure Functions 取用方案中執行協調器函式時,有一些要注意的計費行為。 如需這些行為的詳細資訊,請參閱 Durable Functions 計費一文。
立即參與
完成下列其中一個特定語言快速入門教學課程,即可在 10 分鐘內開始使用 Durable Functions:
- 使用 Visual Studio 2019 的 C#
- 使用 Visual Studio Code 的 JavaScript
- 使用 Visual Studio Code 的 TypeScript
- 使用 Visual Studio Code 的 Python
- 使用 Visual Studio Code 的 PowerShell
- 使用 Maven 的 JAVA
在這些快速入門中,您會在本機建立及測試 "hello world" 耐久函式。 接著會將函式程式碼發佈至 Azure。 您建立的函式會協調對其他函式的呼叫並鏈結在一起。
發行集
Durable Functions 是與 Microsoft Research 共同開發而成。 因此,Durable Functions 小組會主動產生研究論文和成品;其中包括:
- Durable Functions:具狀態無伺服器的語意 (OOPSLA'21)
- Durable Functions 和 Netherite 的無伺服器工作流程 (列印前)
深入了解
下列影片特別說明 Durable Functions 的優點:
因為 Durable Functions 是 Azure Functions 的進階擴充功能,因此並非所有應用程式都適用。 如需與其他 Azure 協調流程技術的比較,請參閱比較 Azure Functions 和 Azure Logic Apps。