Mik azok a tartós függvények?
A Durable Functions az Azure Functions egyik funkciója, amellyel állapotalapú függvényeket írhat kiszolgáló nélküli számítási környezetben. A bővítmény lehetővé teszi az állapotalapú munkafolyamatok meghatározását vezénylő függvények és állapotalapú entitások írásával az entitásfüggvények Azure Functions-programozási modellel történő írásával. A háttérben a bővítmény kezeli az állapotot, az ellenőrzőpontokat és az újraindításokat, így az üzleti logikára összpontosíthat.
Támogatott nyelvek
A Durable Functions úgy lett kialakítva, hogy az Azure Functions összes programozási nyelvével működjön, de az egyes nyelvekre eltérő minimális követelmények vonatkoznak. Az alábbi táblázat a minimálisan támogatott alkalmazáskonfigurációkat mutatja be:
Nyelvi verem | Azure Functions Runtime-verziók | Nyelvi feldolgozó verziója | A csomagok minimális verziója |
---|---|---|---|
.NET / C# / F# | Functions 1.0+ | Folyamatban Folyamaton kívüli |
n.a. |
JavaScript/TypeScript (V3 prog. modell) | Functions 2.0+ | Node 8+ | 2.x csomagok |
JavaScript/TypeScript (V4 prog. modell) | Functions 4.25+ | Node 18+ | 3,15+ csomag |
Python | Functions 2.0+ | Python 3.7+ | 2.x csomagok |
Python (V2 prog. modell) | Functions 4.0+ | Python 3.7+ | 3,15+ csomag |
PowerShell | Functions 3.0+ | PowerShell 7+ | 2.x csomagok |
Java | Functions 4.0+ | Java 8+ | 4.x csomagok |
Fontos
Ez a cikk lapokat használ a Node.js programozási modell több verziójának támogatásához. A v4-modell általánosan elérhető, és úgy lett kialakítva, hogy rugalmasabb és intuitívabb felhasználói élményt nyújtson JavaScript- és TypeScript-fejlesztők számára. A v4-modell működésével kapcsolatos további információkért tekintse meg az Azure Functions Node.js fejlesztői útmutatóját. A v3 és a v4 közötti különbségekről a migrálási útmutatóban olvashat bővebben.
Fontos
Ez a cikk lapokat használ a Python programozási modell több verziójának támogatásához. A v2 modell általánosan elérhető, és úgy lett kialakítva, hogy kódközpontúbb módot biztosítson a függvények dekorátorokon keresztüli létrehozásához. A v2-modell működésével kapcsolatos további részletekért tekintse meg az Azure Functions Python fejlesztői útmutatóját.
Az Azure Functionshez hasonlóan a Durable Functions fejlesztéséhez is vannak sablonok a Visual Studio, a Visual Studio Code és az Azure Portal használatával.
Alkalmazásminták
A Durable Functions elsődleges használati esete a kiszolgáló nélküli alkalmazások összetett, állapotalapú koordinációs követelményeinek egyszerűsítése. A következő szakaszok a Durable Functions előnyeit élvező tipikus alkalmazásmintákat ismertetik:
- Függvényláncolás
- Kifúvatás/befúvatás
- Aszinkron HTTP API-k
- Figyelés
- Emberi interakció
- Összesítő (állapotalapú entitások)
1. minta: Függvényláncolás
A függvényláncolási mintában a függvények sorozata egy adott sorrendben fut. Ebben a mintában az egyik függvény kimenete egy másik függvény bemenetére lesz alkalmazva. Az egyes függvények közötti üzenetsorok használata biztosítja, hogy a rendszer tartós és méretezhető maradjon, még akkor is, ha az egyik függvényről a másikra irányít.
A Durable Functions használatával a függvényláncoló mintát tömören implementálhatja az alábbi példában látható módon.
Ebben a példában az értékek F1
F2
F3
, és F4
az ugyanabban a függvényalkalmazásban található egyéb függvények nevei. A vezérlési folyamatot normál imperatív kódolási szerkezetekkel valósíthatja meg. A kód felülről lefelé fut. A kód magában foglalhatja a meglévő nyelvi vezérlési folyamat szemantikáját, például a feltételes feltételeket és a hurkokat. A hibakezelési logikát blokkokban try
//catch
finally
is megadhatja.
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
A paraméter használatával context
meghívhat más függvényeket név, paraméterek átadása és függvénykimenet alapján. A kód minden hívásakor await
a Durable Functions-keretrendszer ellenőrzi az aktuális függvénypéldány állapotát. Ha a folyamat vagy a virtuális gép a végrehajtás közben félúton újraindul, a függvénypéldány az előző await
hívásból folytatódik. További információkért lásd a következő, 2. minta: Ventilátor ki/ventilátor be.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
Az objektum használatával context.df
más függvényeket hívhat meg név, paraméterek átadása és függvénykimenet visszaadása alapján. A kód minden hívásakor yield
a Durable Functions-keretrendszer ellenőrzi az aktuális függvénypéldány állapotát. Ha a folyamat vagy a virtuális gép a végrehajtás közben félúton újraindul, a függvénypéldány az előző yield
hívásból folytatódik. További információkért lásd a következő, 2. minta: Ventilátor ki/ventilátor be.
Feljegyzés
A context
JavaScript objektuma a teljes függvénykörnyezetet jelöli. A Durable Functions-környezet elérése a df
fő környezet tulajdonságával.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
Az objektum használatával context
más függvényeket hívhat meg név, paraméterek átadása és függvénykimenet visszaadása alapján. A kód minden hívásakor yield
a Durable Functions-keretrendszer ellenőrzi az aktuális függvénypéldány állapotát. Ha a folyamat vagy a virtuális gép a végrehajtás közben félúton újraindul, a függvénypéldány az előző yield
hívásból folytatódik. További információkért lásd a következő, 2. minta: Ventilátor ki/ventilátor be.
Feljegyzés
A context
Python objektuma a vezénylési környezetet jelöli. A fő Azure Functions-környezet elérése a function_context
vezénylési környezet tulajdonságával.
param($Context)
$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z
A parancs használatával Invoke-DurableActivity
más függvényeket hívhat meg név, paraméterek átadása és függvénykimenet visszaadása alapján. Minden alkalommal, amikor a kód kapcsoló nélkül NoWait
hívInvoke-DurableActivity
, a Durable Functions-keretrendszer ellenőrzi az aktuális függvénypéldány állapotát. Ha a folyamat vagy a virtuális gép a végrehajtás közben félúton újraindul, a függvénypéldány az előző Invoke-DurableActivity
hívásból folytatódik. További információkért lásd a következő, 2. minta: Ventilátor ki/ventilátor be.
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
Az objektum használatával ctx
más függvényeket hívhat meg név, paraméterek átadása és függvénykimenet visszaadása alapján. Ezeknek a metódusoknak a kimenete egy Task<V>
objektum, ahol V
a meghívott függvény által visszaadott adatok típusa. Minden híváskor Task<V>.await()
a Durable Functions-keretrendszer ellenőrzi az aktuális függvénypéldány állapotát. Ha a folyamat váratlanul újraindul a végrehajtás közben, a függvénypéldány az előző Task<V>.await()
hívásból folytatódik. További információkért lásd a következő, 2. minta: Ventilátor ki/ventilátor be.
Minta #2: Ventilátor ki/ventilátor a
A mintában lévő ventilátor/ventilátor esetében több függvényt kell párhuzamosan végrehajtania, majd várnia kell, amíg az összes függvény befejeződik. A függvényekből visszaadott eredményeken gyakran végeznek összesítési munkát.
Normál függvények használatakor a függvény több üzenetet is elküldhet egy üzenetsorba. A visszatérés sokkal nagyobb kihívást jelent. Ha normál függvényben szeretne ventilátort használni, írjon egy kódot, amely nyomon követi, hogy mikor fejeződnek be az üzenetsor által aktivált függvények, majd tárolja a függvény kimeneteit.
A Durable Functions bővítmény viszonylag egyszerű kóddal kezeli ezt a mintát:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
A kisugárzó munka a függvény több példányára F2
van elosztva. A munka nyomon követése a tevékenységek dinamikus listájával történik. Task.WhenAll
a rendszer meghívja, hogy várja meg, amíg az összes hívott függvény befejeződik. Ezután a F2
függvénykimenetek a dinamikus feladatlistából összesítve kerülnek átadásra a F3
függvénynek.
A híváskor await
Task.WhenAll
előforduló automatikus ellenőrzőpont-ellenőrzés biztosítja, hogy egy lehetséges félúti összeomlás vagy újraindítás ne igényelje a már befejezett feladat újraindítását.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
A kisugárzó munka a függvény több példányára F2
van elosztva. A munka nyomon követése a tevékenységek dinamikus listájával történik. context.df.Task.all
Az API azért van meghívva, hogy megvárja az összes hívott függvény befejezését. Ezután a F2
függvénykimenetek a dinamikus feladatlistából összesítve kerülnek átadásra a F3
függvénynek.
A híváskor yield
context.df.Task.all
előforduló automatikus ellenőrzőpont-ellenőrzés biztosítja, hogy egy lehetséges félúti összeomlás vagy újraindítás ne igényelje a már befejezett feladat újraindítását.
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
A kisugárzó munka a függvény több példányára F2
van elosztva. A munka nyomon követése a tevékenységek dinamikus listájával történik. context.task_all
Az API azért van meghívva, hogy megvárja az összes hívott függvény befejezését. Ezután a F2
függvénykimenetek a dinamikus feladatlistából összesítve kerülnek átadásra a F3
függvénynek.
A híváskor yield
context.task_all
előforduló automatikus ellenőrzőpont-ellenőrzés biztosítja, hogy egy lehetséges félúti összeomlás vagy újraindítás ne igényelje a már befejezett feladat újraindítását.
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
A kisugárzó munka a függvény több példányára F2
van elosztva. Vegye figyelembe a kapcsoló használatát a NoWait
F2
függvényhíváson: ez a kapcsoló lehetővé teszi, hogy a vezénylő a tevékenység befejezésére való várakozás nélkül folytassa az invokálást F2
. A munka nyomon követése a tevékenységek dinamikus listájával történik. A Wait-ActivityFunction
parancs úgy van meghívva, hogy várja meg, amíg az összes hívott függvény befejeződik. Ezután a F2
függvénykimenetek a dinamikus feladatlistából összesítve kerülnek átadásra a F3
függvénynek.
A híváskor Wait-ActivityFunction
előforduló automatikus ellenőrzőpont-ellenőrzés biztosítja, hogy egy lehetséges félúti összeomlás vagy újraindítás ne igényelje a már befejezett feladat újraindítását.
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
A kisugárzó munka a függvény több példányára F2
van elosztva. A munka nyomon követése a tevékenységek dinamikus listájával történik. ctx.allOf(parallelTasks).await()
a rendszer meghívja, hogy várja meg, amíg az összes hívott függvény befejeződik. Ezután a F2
függvénykimenetek összesítve lesznek a dinamikus feladatlistából, és a vezénylő függvény kimeneteként lesznek visszaadva.
Az automatikus ellenőrzőpontozás, amely a .await()
híváskor ctx.allOf(parallelTasks)
történik, biztosítja, hogy egy váratlan folyamat-újrafeldolgozás ne igényelje a már befejezett tevékenységek újraindítását.
Feljegyzés
Ritkán előfordulhat, hogy egy tevékenységfüggvény befejeződése után, de a befejezés előtt összeomlás történhet az ablakban a vezénylési előzményekbe. Ha ez történik, a tevékenységfüggvény az elejétől kezdve fut újra a folyamat helyreállítása után.
3. minta: Async HTTP API-k
Az aszinkron HTTP API-minta a hosszú ideig futó műveletek állapotának külső ügyfelekkel való összehangolásával kapcsolatos problémát oldja meg. A minta megvalósításának gyakori módja, ha egy HTTP-végpont aktiválja a hosszan futó műveletet. Ezután átirányítsa az ügyfelet egy állapotvégpontra, amelyet az ügyfél lekérdez, hogy megtudja, mikor fejeződik be a művelet.
A Durable Functions beépített támogatást nyújt ehhez a mintához, leegyszerűsítve vagy akár eltávolítva az íráshoz szükséges kódot a hosszú ideig futó függvények végrehajtásához. A Durable Functions gyorsútmutató-mintái (C#, JavaScript, TypeScript, Python, PowerShell és Java) például egy egyszerű REST-parancsot mutatnak be, amellyel új vezénylőfüggvény-példányokat indíthat el. Egy példány elindítása után a bővítmény elérhetővé teszi a webhook HTTP API-kat, amelyek lekérdezik a vezénylő függvény állapotát.
Az alábbi példa olyan REST-parancsokat mutat be, amelyek elindítanak egy vezénylőt, és lekérdezik annak állapotát. Az egyértelműség kedvéért a példából kihagyunk néhány protokollrészletet.
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
Mivel a Durable Functions futtatókörnyezet kezeli az állapotot, nem kell saját állapotkövetési mechanizmust implementálnia.
A Durable Functions bővítmény beépített HTTP API-kat tesz elérhetővé, amelyek hosszú ideig futó vezényléseket kezelnek. Ezt a mintát saját függvény-eseményindítók (például HTTP, üzenetsor vagy Azure Event Hubs) és tartós ügyfélkötések használatával is implementálhatja. Előfordulhat például, hogy üzenetsor-üzenettel aktiválja a leállítást. Vagy olyan HTTP-eseményindítót is használhat, amelyet egy Microsoft Entra hitelesítési szabályzat véd a hitelesítéshez generált kulcsot használó beépített HTTP API-k helyett.
További információkért tekintse meg a HTTP-szolgáltatásokról szóló cikket, amely bemutatja, hogyan teheti elérhetővé az aszinkron, hosszú ideig futó folyamatokat HTTP-en keresztül a Durable Functions bővítmény használatával.
4. minta: Monitorozás
A monitorozási minta egy munkafolyamat rugalmas, ismétlődő folyamatára utal. Ilyen például a lekérdezés, amíg bizonyos feltételek teljesülnek. Egy alapforgatókönyvhez, például egy időszakos törlési feladathoz használhat rendszeres időzítő-eseményindítót , de az időköz statikus, és a példányok élettartamának kezelése összetettebbé válik. A Durable Functions használatával rugalmas ismétlődési időközöket hozhat létre, kezelheti a tevékenységek élettartamát, és egyetlen vezénylésből több monitorozási folyamatot is létrehozhat.
A monitorozási mintára példa a korábbi aszinkron HTTP API-forgatókönyv megfordítása. Ahelyett, hogy egy végpontot ad ki egy külső ügyfél számára egy hosszú ideig futó művelet figyeléséhez, a hosszú ideig futó figyelő egy külső végpontot használ, majd megvárja az állapotváltozást.
Néhány sornyi kódban a Durable Functions használatával több olyan monitort hozhat létre, amelyek tetszőleges végpontokat figyelnek meg. A figyelők befejezhetik a végrehajtást, ha teljesül egy feltétel, vagy egy másik függvény a tartós vezénylési ügyféllel leállítja a monitorokat. Egy monitor intervallumát wait
egy adott feltétel (például exponenciális visszalépés) alapján módosíthatja.
A következő kód egy alapszintű monitort implementál:
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
Amikor egy kérés érkezik, létrejön egy új vezénylési példány az adott feladatazonosítóhoz. A példány egy állapotot kérdez le, amíg egy feltétel nem teljesül, vagy amíg az időtúllépés el nem jár. A lekérdezési időközt tartós időzítő vezérli. Ezután több munka is elvégezhető, vagy a vezénylés véget érhet.
5. minta: Emberi interakció
Számos automatizált folyamat valamilyen emberi interakciót foglal magában. Az emberek bevonása egy automatizált folyamatba trükkös, mert az emberek nem olyan magas rendelkezésre állásúak és rugalmasak, mint a felhőszolgáltatások. Egy automatizált folyamat időtúllépéssel és kompenzációs logikával engedélyezheti ezt az interakciót.
A jóváhagyási folyamat egy olyan üzleti folyamat példája, amely emberi interakciót is magában foglal. Előfordulhat, hogy egy vezető jóváhagyására van szükség egy bizonyos dollárösszeget meghaladó költségjelentéshez. Ha a vezető 72 órán belül nem hagyja jóvá a költségelszámolást (lehet, hogy a vezető szabadságra ment), egy eszkalációs folyamat indul el, hogy valaki mástól (esetleg a vezető felettesétől) megkapja a jóváhagyást.
Ebben a példában a mintát vezénylő függvény használatával implementálhatja. A vezénylő tartós időzítővel kér jóváhagyást. A vezénylő eszkalálódik, ha időtúllépés történik. A vezénylő egy külső eseményre vár, például egy emberi interakció által generált értesítésre.
Ezek a példák egy jóváhagyási folyamatot hoznak létre az emberi interakciós minta bemutatásához:
[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
A tartós időzítő létrehozásához hívja meg a következőt context.CreateTimer
: . Az értesítés a következőtől érkezik: context.WaitForExternalEvent
. Ezután el kell döntenie, Task.WhenAny
hogy eszkalálja-e a jóváhagyást (az időtúllépés először történik), vagy feldolgozzák a jóváhagyást (a jóváhagyás az időtúllépés előtt érkezik meg).
const df = require("durable-functions");
const moment = require('moment');
module.exports = df.orchestrator(function*(context) {
yield context.df.callActivity("RequestApproval");
const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
const durableTimeout = context.df.createTimer(dueTime.toDate());
const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
if (winningEvent === approvalEvent) {
durableTimeout.cancel();
yield context.df.callActivity("ProcessApproval", approvalEvent.result);
} else {
yield context.df.callActivity("Escalate");
}
});
A tartós időzítő létrehozásához hívja meg a következőt context.df.createTimer
: . Az értesítés a következőtől érkezik: context.df.waitForExternalEvent
. Ezután el kell döntenie, context.df.Task.any
hogy eszkalálja-e a jóváhagyást (az időtúllépés először történik), vagy feldolgozzák a jóváhagyást (a jóváhagyás az időtúllépés előtt érkezik meg).
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
due_time = context.current_utc_datetime + timedelta(hours=72)
durable_timeout_task = context.create_timer(due_time)
approval_event_task = context.wait_for_external_event("ApprovalEvent")
winning_task = yield context.task_any([approval_event_task, durable_timeout_task])
if approval_event_task == winning_task:
durable_timeout_task.cancel()
yield context.call_activity("ProcessApproval", approval_event_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
A tartós időzítő létrehozásához hívja meg a következőt context.create_timer
: . Az értesítés a következőtől érkezik: context.wait_for_external_event
. Ezután el kell döntenie, context.task_any
hogy eszkalálja-e a jóváhagyást (az időtúllépés először történik), vagy feldolgozzák a jóváhagyást (a jóváhagyás az időtúllépés előtt érkezik meg).
param($Context)
$output = @()
$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId
$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
if ($approvalEvent -eq $firstEvent) {
Stop-DurableTimerTask -Task $durableTimeoutEvent
$output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
$output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}
$output
A tartós időzítő létrehozásához hívja meg a következőt Start-DurableTimer
: . Az értesítés a következőtől érkezik: Start-DurableExternalEventListener
. Ezután el kell döntenie, Wait-DurableTask
hogy eszkalálja-e a jóváhagyást (az időtúllépés először történik), vagy feldolgozzák a jóváhagyást (a jóváhagyás az időtúllépés előtt érkezik meg).
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
A ctx.waitForExternalEvent(...).await()
metódushívás szünetelteti a vezénylést, amíg meg nem kapja a hasznos adatokkal rendelkező boolean
eseménytApprovalEvent
. Ha az esemény megérkezik, a rendszer meghív egy tevékenységfüggvényt a jóváhagyási eredmény feldolgozásához. Ha azonban nem érkezik ilyen esemény a timeout
(72 óra) lejárta előtt, a rendszer létrehoz egy eseményt TaskCanceledException
, és meghívja a Escalate
tevékenységfüggvényt.
Feljegyzés
A használati tervben való futtatáskor a külső eseményekre való várakozással töltött idő nem számít fel díjat.
A külső ügyfél a beépített HTTP API-k használatával kézbesítheti az eseményértesítést egy várakozó vezénylő függvénynek:
curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"
Egy esemény a tartós vezénylési ügyféllel is létrehozható egy másik függvényből ugyanabban a függvényalkalmazásban:
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
[HttpTrigger] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
import azure.durable_functions as df
async def main(client: str):
durable_client = df.DurableOrchestrationClient(client)
is_approved = True
await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"
@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
@HttpTrigger(name = "instanceId") String instanceId,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
client.raiseEvent(instanceId, "ApprovalEvent", true);
}
6. minta: Összesítő (állapotalapú entitások)
A hatodik minta az eseményadatok egyetlen, címezhető entitásba való összesítéséről szól. Ebben a mintában az összesítendő adatok több forrásból származhatnak, kötegekben kézbesíthetők, vagy hosszú időszakokra szétszóródhatnak. Előfordulhat, hogy az összesítőnek műveletet kell elvégeznie az eseményadatokon, és előfordulhat, hogy a külső ügyfeleknek le kell kérdezniük az összesített adatokat.
A trükkös dolog, hogy megpróbálja implementálni ezt a mintát a normál, állapot nélküli függvényekkel, az, hogy az egyidejűség-vezérlés hatalmas kihívássá válik. Nem csak amiatt kell aggódnia, hogy egyszerre több szál módosítja ugyanazt az adatot, hanem azzal is foglalkoznia kell, hogy az összesítő egyszerre csak egyetlen virtuális gépen fusson.
A Durable entitásokkal egyszerűen implementálhatja ezt a mintát egyetlen függvényként.
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
A tartós entitások osztályként is modellezhetők a .NET-ben. Ez a modell akkor lehet hasznos, ha a műveletek listája rögzített, és nagy lesz. Az alábbi példa az entitás .NET-osztályokat és metódusokat használó egyenértékű implementációja Counter
.
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");
module.exports = df.entity(function(context) {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case "add":
const amount = context.df.getInput();
context.df.setState(currentValue + amount);
break;
case "reset":
context.df.setState(0);
break;
case "get":
context.df.return(currentValue);
break;
}
});
import azure.functions as func
import azure.durable_functions as df
def entity_function(context: df.DurableOrchestrationContext):
current_value = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "add":
amount = context.get_input()
current_value += amount
context.set_result(current_value)
elif operation == "reset":
current_value = 0
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
main = df.Entity.create(entity_function)
Feljegyzés
A Tartós entitások jelenleg nem támogatottak a PowerShellben.
Feljegyzés
A Tartós entitások jelenleg nem támogatottak a Java-ban.
Az ügyfelek egy entitásfüggvény műveleteit (más néven "jelzés") is leköthetik az entitásügyfél-kötés használatával.
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
entity_id = df.EntityId("Counter", "myCounter")
instance_id = await client.signal_entity(entity_id, "add", 1)
return func.HttpResponse("Entity signaled")
Az entitásfüggvények c#, JavaScript és Python esetén a Durable Functions 2.0-s és újabb verziókban érhetők el.
A technológia
A háttérben a Durable Functions bővítmény a Durable Task Frameworkre épül, amely egy nyílt forráskódú kódtár a GitHubon, amely a munkafolyamatok kódban való létrehozásához használható. Ahogy az Azure Functions az Azure WebJobs kiszolgáló nélküli fejlődése, a Durable Functions a Durable Task Framework kiszolgáló nélküli fejlődése. A Microsoft és más szervezetek széles körben használják a Durable Task Frameworkt a kritikus fontosságú folyamatok automatizálására. Ez természetes választás a kiszolgáló nélküli Azure Functions-környezethez.
Kódkorlátozások
A megbízható és hosszú ideig futó végrehajtási garanciák biztosítása érdekében a vezénylő függvények kódolási szabályok készletével rendelkeznek, amelyeket be kell tartani. További információkért tekintse meg az Orchestrator függvénykódjának korlátozásait ismertető cikket.
Számlázás
A Durable Functions számlázása ugyanaz, mint az Azure Functions esetében. További információkért tekintse meg az Azure Functions díjszabását. Ha vezénylőfüggvényeket hajt végre az Azure Functions Consumption-csomagban, néhány számlázási viselkedésről tudnia kell. Ezekről a viselkedésekről további információt a Durable Functions számlázási cikkében talál.
Ugrás jobbra
A Durable Functions használatának első lépéseit 10 perc alatt elvégezheti az alábbi nyelvspecifikus gyorsútmutató-oktatóanyagok egyikével:
- C# a Visual Studio 2019 használatával
- JavaScript a Visual Studio Code használatával
- TypeScript a Visual Studio Code használatával
- Python a Visual Studio Code használatával
- PowerShell a Visual Studio Code használatával
- Java a Maven használatával
Ezekben a rövid útmutatókban helyileg létrehoz és tesztel egy "hello world" tartós függvényt. Ezután közzéteheti a függvénykódot az Azure-ban. A létrehozott függvény vezényléseket hoz létre, és összekapcsolja a más függvényekhez intézett hívásokat.
Kiadványok
A Durable Functions a Microsoft Research szolgáltatással együttműködve készült. Ennek eredményeképpen a Durable Functions csapata aktívan készít kutatási dokumentumokat és összetevőket; ezek közé tartoznak a következők:
- Durable Functions: Szemantikája állapotalapú kiszolgáló nélküli (OOPSLA'21)
- Kiszolgáló nélküli munkafolyamatok Durable Functions és Netherite használatával (előzetes nyomtatás)
További információ
Az alábbi videó a Durable Functions előnyeit emeli ki:
Mivel a Durable Functions az Azure Functions speciális bővítménye, nem minden alkalmazáshoz megfelelő. Más Azure vezénylési technológiákkal való összehasonlításért lásd az Azure Functions és az Azure Logic Apps összehasonlítása című témakört.