Durable Functions ist eine Erweiterung von Azure Functions, mit der Sie zustandsbehaftete Funktionen in einer serverlosen Compute-Umgebung schreiben können. Mit der Erweiterung können Sie mithilfe des Azure Functions-Programmiermodells zustandsbehaftete Workflows durch Schreiben von Orchestratorfunktionen und zustandsbehaftete Entitäten durch Schreiben von Entitätsfunktionen definieren. Im Hintergrund verwaltet die Erweiterung Zustand, Prüfpunkte und Neustarts für Sie, sodass Sie sich auf Ihre Geschäftslogik konzentrieren können.
Unterstützte Sprachen
Durable Functions ist darauf ausgelegt, mit allen Azure Functions-Programmiersprachen zu funktionieren, hat aber möglicherweise unterschiedliche Mindestanforderungen für jede Sprache. In der folgenden Tabelle werden die mindestens unterstützten App-Konfigurationen aufgeführt:
Sprachstapel
Versionen der Azure Functions-Runtime
Version des Sprachworkers
Paketmindestversion
.NET/C#/F#
Functions 1.0 oder höher
In-Process Out-of-Process
–
JavaScript-/TypeScript (V3 Programmiermodell)
Functions 2.0 oder höher
Node 8 oder höher
2.x-Pakete
JavaScript-/TypeScript (V4 Programmiermodell)
Functions 4.16.5+
Node 18+
3.15+-Bündel
Python
Functions 2.0 oder höher
Python 3.7 oder höher
2.x-Pakete
Python (V2-Programmierungsmodell)
Functions 4.0 oder höher
Python 3.7 oder höher
3.15+-Bündel
PowerShell
Functions 3.0 oder höher
PowerShell 7 oder höher
2.x-Pakete
Java
Functions 4.0 oder höher
Java 8 oder höher
4.x-Pakete
Hinweis
Die neuen Programmiermodelle zum Erstellen von Funktionen in Python (V2) und Node.js (V4) befindet sich derzeit in der Vorschauphase. Im Vergleich zu den aktuellen Modellen sind die neuen Umgebungen für Python- und JavaScript-/TypeScript-Entwickler flexibler und intuitiver gestaltet. Weitere Informationen zu den Unterschieden zwischen den Modellen finden Sie im Python-Entwicklerhandbuch und im Node.js Upgradehandbuch.
In den folgenden Codeausschnitten bezeichnet Python (PM2) das Programmiermodell V2 und JavaScript (PM4) das Programmiermodell V4, die neuen Umgebungen.
Wie bei Azure Functions sind Vorlagen verfügbar, um Sie bei der Entwicklung von Durable Functions mit Visual Studio, Visual Studio Code und dem Azure-Portal zu unterstützen.
Anwendungsmuster
Der primäre Anwendungsfall für Durable Functions ist die Vereinfachung komplexer, zustandsbehafteter Koordinationsanforderungen in serverlosen Anwendungen. In den folgenden Abschnitten werden typische Anwendungsmuster beschrieben, die von Durable Functions profitieren können:
Beim Muster der Funktionsverkettung wird eine Abfolge von Funktionen in einer bestimmten Reihenfolge ausgeführt. Bei diesem Muster wird die Ausgabe einer Funktion als Eingabe einer weiteren Funktion verwendet. Die Verwendung von Warteschlangen zwischen den einzelnen Funktionen stellt sicher, dass das System dauerhaft und skalierbare bleibt, auch wenn es einen Steuerungsfluss von einer Funktion zur nächsten gibt.
Mithilfe von Durable Functions können Sie das Funktionsverkettungsmuster präzise wie im folgenden Beispiel dargestellt implementieren.
In diesem Beispiel sind die Werte F1, F2, F3 und F4 die Namen weiterer Funktionen in der gleichen Funktions-App. Sie können die Ablaufsteuerung mithilfe normaler imperativer Codierungskonstrukte implementieren. Der Code wird von oben nach unten ausgeführt. Der Code kann bestehende sprachliche Ablaufsteuerungssemantik wie Bedingungsanweisungen und Schleifen umfassen. Sie können Logik zur Fehlerbehandlung in try/catch/finally-Blöcken einschließen.
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
Sie können den Parameter context verwenden, um andere Funktionen anhand des Namens aufzurufen, Parameter zu übergeben und Funktionsausgaben zurückzugeben. Bei jedem Aufruf von await im Code erstellt das Framework von Durable Functions Prüfpunkte für den Status der aktuellen Funktionsinstanz. Wenn der Prozess oder der virtuelle Computer mitten in der Ausführung neu gestartet wird, wird die Funktionsinstanz ab dem vorherigen Aufruf von await fortgesetzt. Weitere Informationen finden Sie im nächsten Abschnitt, Muster 2: Auffächern auswärts/einwärts.
[Function("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
Sie können den Parameter context verwenden, um andere Funktionen anhand des Namens aufzurufen, Parameter zu übergeben und Funktionsausgaben zurückzugeben. Bei jedem Aufruf von await im Code erstellt das Framework von Durable Functions Prüfpunkte für den Status der aktuellen Funktionsinstanz. Wenn der Prozess oder der virtuelle Computer mitten in der Ausführung neu gestartet wird, wird die Funktionsinstanz ab dem vorherigen Aufruf von await fortgesetzt. Weitere Informationen finden Sie im nächsten Abschnitt, Muster 2: Auffächern auswärts/einwärts.
Sie können das Objekt context.df verwenden, um andere Funktionen anhand des Namens aufzurufen, Parameter zu übergeben und Funktionsausgaben zurückzugeben. Bei jedem Aufruf von yield im Code erstellt das Framework von Durable Functions Prüfpunkte für den Status der aktuellen Funktionsinstanz. Wenn der Prozess oder der virtuelle Computer mitten in der Ausführung neu gestartet wird, wird die Funktionsinstanz ab dem vorherigen Aufruf von yield fortgesetzt. Weitere Informationen finden Sie im nächsten Abschnitt, Muster 2: Auffächern auswärts/einwärts.
Hinweis
Das Objekt context in JavaScript stellt den gesamten Funktionskontext dar. Verwenden Sie für den Zugriff auf den Durable Functions-Kontext die Eigenschaft df im Hauptkontext.
Sie können das Objekt context.df verwenden, um andere Funktionen anhand des Namens aufzurufen, Parameter zu übergeben und Funktionsausgaben zurückzugeben. Bei jedem Aufruf von yield im Code erstellt das Framework von Durable Functions Prüfpunkte für den Status der aktuellen Funktionsinstanz. Wenn der Prozess oder der virtuelle Computer mitten in der Ausführung neu gestartet wird, wird die Funktionsinstanz ab dem vorherigen Aufruf von yield fortgesetzt. Weitere Informationen finden Sie im nächsten Abschnitt, Muster 2: Auffächern auswärts/einwärts.
Hinweis
Das Objekt context in JavaScript stellt den gesamten Funktionskontext dar. Verwenden Sie für den Zugriff auf den Durable Functions-Kontext die Eigenschaft df im Hauptkontext.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
Sie können das Objekt context verwenden, um andere Funktionen anhand des Namens aufzurufen, Parameter zu übergeben und Funktionsausgaben zurückzugeben. Bei jedem Aufruf von yield im Code erstellt das Framework von Durable Functions Prüfpunkte für den Status der aktuellen Funktionsinstanz. Wenn der Prozess oder der virtuelle Computer mitten in der Ausführung neu gestartet wird, wird die Funktionsinstanz ab dem vorherigen Aufruf von yield fortgesetzt. Weitere Informationen finden Sie im nächsten Abschnitt, Muster 2: Auffächern auswärts/einwärts.
Hinweis
Das context-Objekt in Python stellt den Orchestrierungskontext dar. Verwenden Sie für den Zugriff auf den Azure Functions-Hauptkontext die Eigenschaft function_context im Orchestrierungskontext.
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.orchestration_trigger(context_name="context")
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
Sie können das Objekt context verwenden, um andere Funktionen anhand des Namens aufzurufen, Parameter zu übergeben und Funktionsausgaben zurückzugeben. Bei jedem Aufruf von yield im Code erstellt das Framework von Durable Functions Prüfpunkte für den Status der aktuellen Funktionsinstanz. Wenn der Prozess oder der virtuelle Computer mitten in der Ausführung neu gestartet wird, wird die Funktionsinstanz ab dem vorherigen Aufruf von yield fortgesetzt. Weitere Informationen finden Sie im nächsten Abschnitt, Muster 2: Auffächern auswärts/einwärts.
Hinweis
Das context-Objekt in Python stellt den Orchestrierungskontext dar. Verwenden Sie für den Zugriff auf den Azure Functions-Hauptkontext die Eigenschaft function_context im Orchestrierungskontext.
Sie können den Befehl Invoke-DurableActivity verwenden, um andere Funktionen anhand des Namens aufzurufen, Parameter zu übergeben und Funktionsausgaben zurückzugeben. Jedes Mal, wenn der Code Invoke-DurableActivity ohne den Switch NoWait aufruft, erstellt das Durable Functions-Framework Prüfpunkte zum Status der aktuellen Funktionsinstanz. Wenn der Prozess oder der virtuelle Computer mitten in der Ausführung neu gestartet wird, wird die Funktionsinstanz ab dem vorherigen Aufruf von Invoke-DurableActivity fortgesetzt. Weitere Informationen finden Sie im nächsten Abschnitt, Muster 2: Auffächern auswärts/einwärts.
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
Sie können das Objekt ctx verwenden, um andere Funktionen anhand des Namens aufzurufen, Parameter zu übergeben und Funktionsausgaben zurückzugeben. Die Ausgabe dieser Methodenaufrufe ist ein Task<V>-Objekt, bei dem V der Datentyp ist, der von der aufgerufenen Funktion zurückgegeben wird. Bei jedem Aufruf von Task<V>.await() erstellt das Durable Functions-Framework Prüfpunkte für den Status der aktuellen Funktionsinstanz. Wenn der Prozess unerwartet mitten in der Ausführung neu gestartet wird, wird die Funktionsinstanz ab dem vorherigen Aufruf von Task<V>.await() fortgesetzt. Weitere Informationen finden Sie im nächsten Abschnitt, Muster 2: Auffächern auswärts/einwärts.
Muster 2: Auffächern auswärts/einwärts
Beim Muster Auffächern auswärts/einwärts werden mehrere Funktionen parallel ausgeführt und anschließend auf den Abschluss aller gewartet. Häufig werden die von den Funktionen zurückgegebenen Ergebnisse einer Aggregation unterzogen.
Bei normalen Funktionen kann das Auffächern auswärts erfolgen, indem die Funktion mehrere Nachrichten an eine Warteschlange sendet. Das Auffächern zurück nach innen ist wesentlich schwieriger. Für das Auffächern nach innen schreiben Sie in einer normalen Funktion Code, der nachverfolgt, wann die von der Warteschlange ausgelösten Funktionen enden und speichern dann ihre Ausgaben.
Die Erweiterung Durable Functions wird diesem Muster mit relativ einfachem Code gerecht:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
Das Auffächern nach außen ist auf mehrere Instanzen der F2-Funktion verteilt. Die Arbeit wird mithilfe einer dynamischen Aufgabenliste nachverfolgt. Task.WhenAll wird aufgerufen, um zu warten, bis alle aufgerufenen Funktionen beendet sind. Anschließend werden die Ausgaben der F2-Funktion aus der dynamischen Aufgabenliste aggregiert und an die F3-Funktion übergeben.
Die automatische Prüfpunkterstellung, die beim Aufruf von await für Task.WhenAll erfolgt, stellt sicher, dass ein potentieller Absturz oder Neustart während der Ausführung keinen Neustart bereits abgeschlossener Aufgaben erfordert.
[Function("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
Das Auffächern nach außen ist auf mehrere Instanzen der F2-Funktion verteilt. Die Arbeit wird mithilfe einer dynamischen Aufgabenliste nachverfolgt. Task.WhenAll wird aufgerufen, um zu warten, bis alle aufgerufenen Funktionen beendet sind. Anschließend werden die Ausgaben der F2-Funktion aus der dynamischen Aufgabenliste aggregiert und an die F3-Funktion übergeben.
Die automatische Prüfpunkterstellung, die beim Aufruf von await für Task.WhenAll erfolgt, stellt sicher, dass ein potentieller Absturz oder Neustart während der Ausführung keinen Neustart bereits abgeschlossener Aufgaben erfordert.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
Das Auffächern nach außen ist auf mehrere Instanzen der F2-Funktion verteilt. Die Arbeit wird mithilfe einer dynamischen Aufgabenliste nachverfolgt. Die API context.df.Task.all wird aufgerufen, um zu warten, bis alle aufgerufenen Funktionen beendet sind. Anschließend werden die Ausgaben der F2-Funktion aus der dynamischen Aufgabenliste aggregiert und an die F3-Funktion übergeben.
Die automatische Prüfpunkterstellung, die beim Aufruf von yield für context.df.Task.all erfolgt, stellt sicher, dass ein potentieller Absturz oder Neustart während der Ausführung keinen Neustart bereits abgeschlossener Aufgaben erfordert.
const df = require("durable-functions");
df.app.orchestration("fanOutFanInDemo", function* (context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
Das Auffächern nach außen ist auf mehrere Instanzen der F2-Funktion verteilt. Die Arbeit wird mithilfe einer dynamischen Aufgabenliste nachverfolgt. Die API context.df.Task.all wird aufgerufen, um zu warten, bis alle aufgerufenen Funktionen beendet sind. Anschließend werden die Ausgaben der F2-Funktion aus der dynamischen Aufgabenliste aggregiert und an die F3-Funktion übergeben.
Die automatische Prüfpunkterstellung, die beim Aufruf von yield für context.df.Task.all erfolgt, stellt sicher, dass ein potentieller Absturz oder Neustart während der Ausführung keinen Neustart bereits abgeschlossener Aufgaben erfordert.
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
Das Auffächern nach außen ist auf mehrere Instanzen der F2-Funktion verteilt. Die Arbeit wird mithilfe einer dynamischen Aufgabenliste nachverfolgt. Die API context.task_all wird aufgerufen, um zu warten, bis alle aufgerufenen Funktionen beendet sind. Anschließend werden die Ausgaben der F2-Funktion aus der dynamischen Aufgabenliste aggregiert und an die F3-Funktion übergeben.
Die automatische Prüfpunkterstellung, die beim Aufruf von yield für context.task_all erfolgt, stellt sicher, dass ein potentieller Absturz oder Neustart während der Ausführung keinen Neustart bereits abgeschlossener Aufgaben erfordert.
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.orchestration_trigger(context_name="context")
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
Das Auffächern nach außen ist auf mehrere Instanzen der F2-Funktion verteilt. Die Arbeit wird mithilfe einer dynamischen Aufgabenliste nachverfolgt. Die API context.task_all wird aufgerufen, um zu warten, bis alle aufgerufenen Funktionen beendet sind. Anschließend werden die Ausgaben der F2-Funktion aus der dynamischen Aufgabenliste aggregiert und an die F3-Funktion übergeben.
Die automatische Prüfpunkterstellung, die beim Aufruf von yield für context.task_all erfolgt, stellt sicher, dass ein potentieller Absturz oder Neustart während der Ausführung keinen Neustart bereits abgeschlossener Aufgaben erfordert.
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
Das Auffächern nach außen ist auf mehrere Instanzen der F2-Funktion verteilt. Beachten Sie die Verwendung des Switches NoWait für den Aufruf der F2-Funktion: Dieser Switch ermöglicht es dem Orchestrator, F2 weiterhin aufzurufen, ohne auf den Abschluss von Aktivitäten zu warten. Die Arbeit wird mithilfe einer dynamischen Aufgabenliste nachverfolgt. Der Befehl Wait-ActivityFunction wird aufgerufen, um zu warten, bis alle aufgerufenen Funktionen beendet sind. Anschließend werden die Ausgaben der F2-Funktion aus der dynamischen Aufgabenliste aggregiert und an die F3-Funktion übergeben.
Die automatische Prüfpunkterstellung, die beim Aufruf von Wait-ActivityFunction erfolgt, stellt sicher, dass ein potentieller Absturz oder Neustart während der Ausführung keinen Neustart bereits abgeschlossener Aufgaben erfordert.
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
Das Auffächern nach außen ist auf mehrere Instanzen der F2-Funktion verteilt. Die Arbeit wird mithilfe einer dynamischen Aufgabenliste nachverfolgt. ctx.allOf(parallelTasks).await() wird aufgerufen, um zu warten, bis alle aufgerufenen Funktionen beendet sind. Anschließend werden die Ausgaben der F2-Funktion aus der dynamischen Aufgabenliste aggregiert und als Ausgabe der Orchestratorfunktion zurückgegeben.
Die automatische Prüfpunkterstellung, die beim Aufruf von .await() für ctx.allOf(parallelTasks) erfolgt, stellt sicher, dass ein unerwarteter Prozessneustart keinen Neustart bereits abgeschlossener Aufgaben erfordert.
Hinweis
In seltenen Fällen kann es im Fenster zu einem Absturz kommen, nachdem eine Aktivitätsfunktion abgeschlossen wurde und bevor ihr Abschluss im Orchestrierungsverlauf gespeichert wurde. In dem Fall wird die Aktivitätsfunktion von Anfang an erneut ausgeführt, wenn der Prozess wieder hergestellt wurde.
Muster 3: Asynchrone HTTP-APIs
Das asynchrone HTTP-API-Muster ist geeignet, um den Status von Vorgängen mit langer Ausführungsdauer mit externen Clients zu koordinieren. Ein gängiges Verfahren zum Implementieren dieses Musters besteht darin, die Aktion mit langer Ausführungsdauer von einem HTTP-Endpunkt auslösen zu lassen. Leiten Sie dann den Client zu einem Statusendpunkt um, den der Client abfragt, um herauszufinden, wenn der Vorgang abgeschlossen ist.
Durable Functions bietet integrierte Unterstützung für dieses Muster und vereinfacht oder entfernt den Code, den Sie für die Interaktion mit langen Funktionsausführungen schreiben. Die Schnellstartbeispiele für Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell und Java) enthalten beispielsweise einen einfachen REST-Befehl, den Sie zum Starten neuer Orchestratorfunktionsinstanzen verwenden können. Nach dem Start einer Instanz macht die Erweiterung Webhook-HTTP-APIs verfügbar, die den Status der Orchestratorfunktion abfragen.
Das folgende Beispiel zeigt REST-Befehle zum Starten eines Orchestrators und zum Abfragen seines Status. Zur besseren Übersichtlichkeit werden im Beispiel einige Protokolldetails weggelassen.
Da der Status von der Durable Functions-Runtime für Sie verwaltet wird, müssen Sie keinen eigenen Mechanismus zur Statusnachverfolgung implementieren.
Die Durable Functions-Erweiterung macht integrierte HTTP-APIs verfügbar, die Orchestrierungen mit langer Ausführungsdauer verwalten. Sie können dieses Muster auch selbst implementieren, indem Sie Ihre eigenen Funktionstrigger (z. B. HTTP, eine Warteschlange oder Azure Event Hubs) und die dauerhafte Clientbindung verwenden. Sie können beispielsweise eine Warteschlangennachricht verwenden, um die Beendigung auszulösen. Sie könnten anstelle der integrierten HTTP-APIs, die einen generierten Schlüssel für die Authentifizierung verwenden, auch einen HTTP-Trigger verwenden, der durch eine Azure Active Directory-Authentifizierungsrichtlinie geschützt ist.
Weitere Informationen finden Sie im Artikel HTTP-Features, in dem erläutert wird, wie Sie asynchrone Prozesse mit langer Ausführungszeit über HTTP mithilfe der Durable Functions-Erweiterung verfügbar machen können.
Muster 4: Überwachen
Das Überwachen-Muster bezieht sich auf einen flexiblen, wiederkehrenden Vorgang in einem Workflow. Ein Beispiel besteht im Abfragen, bis bestimmte Bedingungen erfüllt sind. Sie können einen normalen Timertrigger für ein einfaches Szenario verwenden, beispielsweise einen periodischen Bereinigungsauftrag. Sein Intervall ist jedoch statisch, und die Verwaltung der Instanzlebensdauer wird komplex. Mithilfe von Durable Functions können Sie flexible Wiederholungsintervalle erstellen, die Lebensdauer von Aufgaben verwalten und mehrere Überwachungsprozesse aus einer einzelnen Orchestrierung erstellen.
Ein Beispiel für das Überwachen-Muster besteht in der Umkehrung des früheren asynchronen HTTP-API-Szenarios. Anstatt einen Endpunkt für einen externen Client freizugeben, um einen lang laufenden Vorgang zu überwachen, belegt der lang laufende Monitor einen externen Endpunkt und wartet dann auf einen Zustandswechsel.
Mit ein paar Codezeilen können Sie Durable Functions dazu verwenden, mehrere Monitore zu erstellen, die beliebige Endpunkte beobachten. Die Monitore können die Ausführung beenden, wenn eine Bedingung erfüllt ist, oder eine andere Funktion kann den langlebigen Orchestrierungsclient verwenden, um die Monitore zu beenden. Sie können das wait-Intervall eines Monitors auf der Grundlage einer spezifischen Bedingung (z.B. exponentielles Backoff) ändern.
Der folgende Code implementiert einen einfachen Monitor:
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
[Function("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] TaskOrchestrationContext context, int jobId)
{
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
const df = require("durable-functions");
const { DateTime } = require("luxon");
df.app.orchestration("monitorDemo", function* (context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (DateTime.fromJSDate(context.df.currentUtcDateTime) < DateTime.fromJSDate(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", machineId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = DateTime.fromJSDate(context.df.currentUtcDateTime).plus({
seconds: pollingInterval,
});
yield context.df.createTimer(nextCheck.toJSDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
import json
from datetime import timedelta
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.orchestration_trigger(context_name="context")
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
Wenn eine Anforderung empfangen wird, wird eine neue Orchestrierungsinstanz für diese Auftrags-ID erstellt. Die Instanz abruft einen Status ab, bis eine Bedingung erfüllt ist oder bis ein Timeout abläuft. Ein permanenter Timer steuert das Abrufintervall. Anschließend können weitere Arbeitsschritte ausgeführt werden, oder die Orchestrierung wird beendet.
Muster 5: Benutzerinteraktion
Viele automatisierte Prozesse enthalten eine Form der Benutzerinteraktion. Das Einbeziehen von Menschen in einen automatisierten Prozess ist schwierig, da Personen nicht im gleichen hohen Maß verfügbar und reaktionsfähig sind wie Clouddienste. Ein automatisierter Prozess kann diese Interaktion mithilfe von Zeitlimits und Kompensationslogik ermöglichen.
Ein Genehmigungsprozess ist ein Beispiel für einen Geschäftsprozesses, der Benutzerinteraktion umfasst. Beispielsweise kann für eine Spesenabrechnung, die einen bestimmten Betrag überschreitet, die Genehmigung eines Vorgesetzten erforderlich sein. Wenn der Vorgesetzte die Spesenabrechnung nicht innerhalb von 72 Stunden genehmigt (vielleicht weil er im Urlaub ist), wird ein Eskalationsverfahren wirksam, um die Genehmigung von einer anderen Person (z.B. dem Vorgesetzten des Vorgesetzten) zu erhalten.
Sie können das Muster aus diesem Beispiel mithilfe einer Orchestrierungsfunktion implementieren. Der Orchestrator verwendet einen permanenten Timer, um die Genehmigung anzufordern. Der Orchestrator führt die Eskalation aus, wenn das Timeout auftritt. Der Orchestrator wartet auf ein externes Ereignis, beispielsweise eine Benachrichtigung, die durch eine Benutzerinteraktion generiert wird.
In diesen Beispielen wird ein Genehmigungsprozess erstellt, um das Muster der Benutzerinteraktion zu veranschaulichen:
Rufen Sie zum Erstellen des permanenten Timers context.CreateTimer auf. Die Benachrichtigung wird von context.WaitForExternalEvent empfangen. Anschließend wird Task.WhenAny aufgerufen, um zu entscheiden, ob eine Eskalation erfolgt (Timeout tritt zuerst auf) oder die Genehmigung verarbeitet wird (Genehmigung wird vor dem Timeout empfangen).
Rufen Sie zum Erstellen des permanenten Timers context.CreateTimer auf. Die Benachrichtigung wird von context.WaitForExternalEvent empfangen. Anschließend wird Task.WhenAny aufgerufen, um zu entscheiden, ob eine Eskalation erfolgt (Timeout tritt zuerst auf) oder die Genehmigung verarbeitet wird (Genehmigung wird vor dem Timeout empfangen).
Rufen Sie zum Erstellen des permanenten Timers context.df.createTimer auf. Die Benachrichtigung wird von context.df.waitForExternalEvent empfangen. Anschließend wird context.df.Task.any aufgerufen, um zu entscheiden, ob eine Eskalation erfolgt (Timeout tritt zuerst auf) oder die Genehmigung verarbeitet wird (Genehmigung wird vor dem Timeout empfangen).
Rufen Sie zum Erstellen des permanenten Timers context.df.createTimer auf. Die Benachrichtigung wird von context.df.waitForExternalEvent empfangen. Anschließend wird context.df.Task.any aufgerufen, um zu entscheiden, ob eine Eskalation erfolgt (Timeout tritt zuerst auf) oder die Genehmigung verarbeitet wird (Genehmigung wird vor dem Timeout empfangen).
Rufen Sie zum Erstellen des permanenten Timers context.create_timer auf. Die Benachrichtigung wird von context.wait_for_external_event empfangen. Anschließend wird context.task_any aufgerufen, um zu entscheiden, ob eine Eskalation erfolgt (Timeout tritt zuerst auf) oder die Genehmigung verarbeitet wird (Genehmigung wird vor dem Timeout empfangen).
Rufen Sie zum Erstellen des permanenten Timers context.create_timer auf. Die Benachrichtigung wird von context.wait_for_external_event empfangen. Anschließend wird context.task_any aufgerufen, um zu entscheiden, ob eine Eskalation erfolgt (Timeout tritt zuerst auf) oder die Genehmigung verarbeitet wird (Genehmigung wird vor dem Timeout empfangen).
Rufen Sie zum Erstellen des permanenten Timers Start-DurableTimer auf. Die Benachrichtigung wird von Start-DurableExternalEventListener empfangen. Anschließend wird Wait-DurableTask aufgerufen, um zu entscheiden, ob eine Eskalation erfolgt (Timeout tritt zuerst auf) oder die Genehmigung verarbeitet wird (Genehmigung wird vor dem Timeout empfangen).
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
Der ctx.waitForExternalEvent(...).await()-Methodenaufruf hält die Orchestrierung an, bis ein Ereignis mit dem Namen ApprovalEvent empfangen wird, das über boolean-Nutzdaten verfügt. Bei empfang des Ereignisses wird eine Aktivitätsfunktion aufgerufen, um das Genehmigungsergebnis zu verarbeiten. Wenn jedoch vor Ablauf des timeout (72 Stunden) kein solches Ereignis empfangen wurde, wird eine TaskCanceledException ausgelöst und die Escalate-Aktivitätsfunktion aufgerufen.
Hinweis
Es fällt im Verbrauchstarif keine Gebühr für die Zeit an, die beim Ausführen auf externe Ereignisse gewartet wird.
Ein externer Client kann die Ereignisbenachrichtigung über die integrierten HTTP-APIs an eine wartende Orchestratorfunktion senden:
Beim sechsten Muster geht es um Aggregierung von Ereignisdaten über einen bestimmten Zeitraum in einer einzigen, adressierbaren Entität. In diesem Muster können die aggregierten Daten aus mehreren Quellen stammen, in Batches geliefert werden und über lange Zeiträume verteilt sein. Der Aggregator muss möglicherweise Aktionen für Ereignisdaten durchführen, wenn er diese empfängt, und es kann sein, dass externe Daten die aggregierten Daten abfragen müssen.
Das Schwierige an der Implementierung dieses Musters mit normalen, zustandslosen Funktionen ist die Tatsache, dass das Steuern der Parallelität zur Herausforderung wird. Sie müssen sich nicht nur um mehrere Threads kümmern, die gleichzeitig dieselben Daten anpassen, sondern Sie müssen auch sicherstellen, dass der Aggregator immer nur auf einer VM ausgeführt wird.
Sie können dauerhafte Entitäten verwenden, um dieses Muster problemlos als einzelne Funktion zu implementieren.
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
Dauerhafte Entitäten können auch als Klassen in .NET modelliert werden. Dieses Modell ist bei einer festen Liste von Vorgängen hilfreich, die recht groß wird. Beim folgenden Beispiel handelt es sich um eine äquivalente Implementierung der Counter-Entität unter Verwendung von .NET-Klassen und -Methoden.
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
Dauerhafte Entitäten werden in isolierten .NET-Workerprozessen derzeit nicht unterstützt.
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
Hinweis
Außerdem stehen dynamisch generierte Proxys in .NET für signalisierende Entitäten auf typsichere Weise zur Verfügung. Zusätzlich zur Signalisierung können Clients auch den Zustand einer Entitätsfunktion mithilfe typsicherer Methoden für die Orchestrierungsclientbindung abfragen.
Dauerhafte Entitäten werden in isolierten .NET-Workerprozessen derzeit nicht unterstützt.
Dauerhafte Entitäten werden in PowerShell derzeit nicht unterstützt.
Dauerhafte Entitäten werden derzeit in Java nicht unterstützt.
Entitätsfunktionen stehen ab Durable Functions 2.0 für C#, JavaScript und Python zur Verfügung.
Die Technologie
Im Hintergrund baut die Durable Functions-Erweiterung auf dem Durable Task Framework auf, einer Open-Source-Bibliothek auf GitHub, die zum Erstellen von Workflows im Code verwendet wird. Wie Azure Functions die serverlose Weiterentwicklung von Azure WebJobs ist, so ist Durable Functions die serverlose Weiterentwicklung von Durable Task Framework. Microsoft und andere Unternehmen verwenden das Durable Task Framework intensiv zum Automatisieren unternehmenswichtiger Prozesse. Es ist wie geschaffen für die serverlose Azure Functions-Umgebung.
Codeeinschränkungen
Damit eine zuverlässige und lange Ausführung gewährleistet ist, gelten für Orchestratorfunktionen einige Programmierregeln, die beachtet werden müssen. Weitere Informationen finden Sie im Artikel Codeeinschränkungen für Orchestratorfunktionen.
Abrechnung
Durable Functions wird genau wie Azure Functions in Rechnung gestellt. Weitere Informationen finden Sie unter Azure Functions – Preise. Beim Ausführen von Orchestratorfunktionen im Nutzungsplan von Azure Functions sind einige Verhaltensweisen zu beachten, die bei der Abrechnung auftreten. Weitere Informationen zu diesen Verhaltensweisen finden Sie im Artikel Abrechnung von Durable Functions.
Sofort loslegen
Sie können die ersten Schritte mit Durable Functions in weniger als 10 Minuten durchführen, indem Sie eines dieser sprachspezifischen Schnellstarttutorials abschließen:
In diesen Schnellstarts erstellen und testen Sie lokal eine dauerhafte Funktion vom Typ „Hallo Welt“. Anschließend veröffentlichen Sie den Funktionscode in Azure. Mit der von Ihnen erstellten Funktion werden Aufrufe anderer Funktionen orchestriert und miteinander verkettet.
Veröffentlichungen
Durable Functions wird in Zusammenarbeit mit Microsoft Research entwickelt. Daher erstellt das Durable Functions-Team aktiv Forschungsberichte und Artefakte, u. a.:
Im folgenden Video werden die Vorteile von Durable Functions aufgezeigt:
Eine ausführlichere Erläuterung von Durable Functions und der zugrunde liegenden Technologie finden Sie im folgenden Video (es bezieht sich auf .NET, aber die Konzepte gelten aber auch für andere unterstützte Sprachen):