Fan-out/Fan-In führt mehrere Funktionen parallel aus und aggregiert dann die Ergebnisse. Dieser Artikel zeigt ein Beispiel, das Durable Functions verwendet, um einige oder alle Website-Inhalte einer App zu Azure Storage zu sichern.
Voraussetzungen
V3-Programmiermodell
V4-Programmiermodell
Fan-out/Fan-In führt mehrere Aktivitäten parallel aus und aggregiert dann die Ergebnisse. In diesem Artikel wird gezeigt, wie Sie das Muster mithilfe der SdKs für dauerhafte Aufgaben für .NET, JavaScript, Python und Java implementieren.
Beschreibung des Szenarios
In diesem Beispiel laden die Funktionen alle Dateien unter einem angegebenen Verzeichnis (rekursiv) in blob Storage hoch. Sie zählen auch die Gesamtanzahl der hochgeladenen Bytes.
Eine einzelne Funktion kann alles verarbeiten, aber sie skaliert nicht. Eine einzelne Funktionsausführung wird auf einem virtuellen Computer (VM) ausgeführt, sodass der Durchsatz auf diese VM beschränkt ist. Zuverlässigkeit ist ein weiterer Aspekt. Wenn der Prozess in der Mitte fehlschlägt oder mehr als fünf Minuten dauert, kann die Sicherung in einem teilweise abgeschlossenem Zustand enden. Anschließend starten Sie die Sicherung neu.
Ein robusterer Ansatz besteht darin, zwei separate Funktionen zu verwenden: eine listet die Dateien auf und fügt einer Warteschlange Dateinamen hinzu, und die anderen liest aus der Warteschlange und lädt die Dateien in BLOB-Speicher hoch. Dieser Ansatz verbessert den Durchsatz und die Zuverlässigkeit, aber Sie müssen die Warteschlange einrichten und verwalten. Entscheidender ist, dass dieser Ansatz die Komplexität bei der Verwaltung und Koordination des Systemzustands erhöht, wie beispielsweise bei der Berichterstattung über die Gesamtanzahl der hochgeladenen Bytes.
Durable Functions bietet all diese Vorteile mit geringem Aufwand.
Im folgenden Beispiel verarbeitet der Orchestrator mehrere Arbeitsaufgaben parallel und aggregiert dann die Ergebnisse. Dieses Muster ist nützlich, wenn Sie Folgendes benötigen:
- Verarbeiten einer Reihe von Elementen, bei denen jedes Element unabhängig voneinander verarbeitet werden kann
- Verteilen von Arbeit auf mehreren Computern für einen besseren Durchsatz
- Aggregierte Ergebnisse aus allen parallelen Vorgängen
Ohne das Lüfter-/Fan-In-Muster verarbeiten Sie elemente entweder sequenziell, wodurch der Durchsatz begrenzt wird, oder Sie verwalten Ihre eigene Warteschlangen- und Koordinationslogik, wodurch Komplexität hinzugefügt wird.
Die SDKs für dauerhafte Aufgaben behandeln Parallelisierung und Koordination, sodass das Muster einfach zu implementieren ist.
Die Funktionen
In diesem Artikel werden die Funktionen in der Beispiel-App beschrieben:
-
E2_BackupSiteContent: Eine Orchestratorfunktion , die aufruft E2_GetFileList , um eine Liste der zu sichernden Dateien abzurufen, und dann für jede Datei aufruft E2_CopyFileToBlob .
-
E2_GetFileList: Eine Aktivitäts Funktion, die eine Liste der Dateien in einem Verzeichnis zurückgibt.
-
E2_CopyFileToBlob: Eine Aktivitätsfunktion, die eine einzelne Datei in den Azure-Blob-Speicher sichert.
In diesem Artikel werden die Komponenten im Beispielcode beschrieben:
-
ParallelProcessingOrchestration, fanOutFanInOrchestrator, fan_out_fan_in_orchestrator oder FanOutFanIn_WordCount: Ein Orchestrator, der die Arbeit an mehreren Aktivitäten parallel ausführt, wartet, bis alle Aktivitäten abgeschlossen sind, und konsolidiert dann, indem die Ergebnisse aggregiert werden.
-
ProcessWorkItemActivity, processWorkItem, oder process_work_itemCountWords: Eine Aktivität, die eine einzelne Arbeitsaufgabe verarbeitet.
-
AggregateResultsActivity, aggregateResultsoder aggregate_results: Eine Aktivität, die Ergebnisse aus allen parallelen Vorgängen aggregiert.
Orchestrator
Diese Orchestratorfunktion führt folgende Aktionen aus:
- Nimmt
rootDirectory als Eingabe an.
- Sie ruft eine Funktion auf, um unter
rootDirectory eine rekursive Liste mit Dateien abzurufen.
- Führt parallele Funktionsaufrufe aus, um jede Datei in Azure Blob Storage hochzuladen.
- Sie wartet, bis alle Uploads abgeschlossen wurden.
- Gibt die Gesamtanzahl der in Azure Blob Storage hochgeladenen Bytes zurück.
Im Folgenden wird der Code dargestellt, der die Orchestratorfunktion implementiert:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Beachten Sie die Zeile await Task.WhenAll(tasks);. Der Code wartet nicht auf die einzelnen Aufrufe E2_CopyFileToBlob, sodass sie parallel ausgeführt werden. Wenn der Orchestrator das Aufgabenarray an Task.WhenAll übergibt, wird eine Aufgabe zurückgegeben, die erst dann abgeschlossen wird, wenn alle Kopiervorgänge abgeschlossen sind. Wenn Sie mit der Task Parallel Library (TPL) in .NET vertraut sind, ist dieses Muster vertraut. Der Unterschied besteht darin, dass diese Aufgaben gleichzeitig auf mehreren virtuellen Computern ausgeführt werden können, und die Durable Functions Erweiterung stellt sicher, dass die End-to-End-Ausführung widerstandsfähig für die Prozesswiederverwendung ist.
Nachdem der Orchestrator abgewartet hat Task.WhenAll, sind alle Funktionsaufrufe abgeschlossen und geben Werte zurück. Bei jedem Aufruf von E2_CopyFileToBlob wird die Anzahl der hochgeladenen Bytes zurückgegeben. Berechnen Sie die Summe, indem Sie die Rückgabewerte hinzufügen.
V3-Programmiermodell
Die Funktion verwendet die Standarddatei function.json für Orchestratorfunktionen.
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"disabled": false
}
Im Folgenden wird der Code dargestellt, der die Orchestratorfunktion implementiert:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Beachten Sie die Zeile yield context.df.Task.all(tasks);. Der Code liefert nicht die einzelnen Aufrufe von E2_CopyFileToBlob, daher werden sie parallel ausgeführt. Wenn der Orchestrator das Aufgabenarray an context.df.Task.all übermittelt, wird eine Aufgabe zurückgegeben, die erst abgeschlossen ist, wenn alle Kopiervorgänge abgeschlossen sind. Wenn Sie mit Promise.all in JavaScript vertraut sind, ist dies nicht neu für Sie. Der Unterschied besteht darin, dass diese Aufgaben gleichzeitig auf mehreren virtuellen Computern ausgeführt werden können, und die Durable Functions Erweiterung stellt sicher, dass die End-to-End-Ausführung widerstandsfähig für die Prozesswiederverwendung ist.
Hinweis
Obwohl Aufgaben ein ähnliches Konzept zugrunde liegt wie JavaScript-Zusagen, sollten für Orchestratorfunktionen context.df.Task.all und context.df.Task.any anstelle von Promise.all und Promise.race verwendet werden, um die Parallelisierung von Aufgaben zu verwalten.
Nach der Rückgabe des Orchestrators context.df.Task.allsind alle Funktionsaufrufe abgeschlossen und geben Werte zurück. Jeder Aufruf von E2_CopyFileToBlob gibt die Anzahl von hochgeladenen Bytes zurück. Die Summe der Gesamtbytezahl wird folglich berechnet, indem alle diese Rückgabewerte addiert werden.
V4-Programmiermodell
Im Folgenden wird der Code dargestellt, der die Orchestratorfunktion implementiert:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace(/\\/g, "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Beachten Sie die Zeile yield context.df.Task.all(tasks);. Sämtliche einzelnen Aufrufe der copyFileToBlob-Funktion wurden nicht ausgegeben, sodass sie parallel ausgeführt werden können. Wenn wir dieses Array von Aufgaben an context.df.Task.all übergeben, erhalten wir eine Aufgabe zurück, die erst nach Abschluss aller Kopiervorgänge abgeschlossen wird. Wenn Sie mit Promise.all in JavaScript vertraut sind, ist dies nicht neu für Sie. Der Unterschied besteht darin, dass diese Aufgaben gleichzeitig auf mehreren virtuellen Computern ausgeführt werden können, und die Durable Functions Erweiterung stellt sicher, dass die End-to-End-Ausführung widerstandsfähig für die Prozesswiederverwendung ist.
Hinweis
Obwohl Aufgaben ein ähnliches Konzept zugrunde liegt wie JavaScript-Zusagen, sollten für Orchestratorfunktionen context.df.Task.all und context.df.Task.any anstelle von Promise.all und Promise.race verwendet werden, um die Parallelisierung von Aufgaben zu verwalten.
Nach der Übergabe von context.df.Task.all wissen wir, dass alle Funktionsaufrufe abgeschlossen wurden und Werte an uns übertragen haben. Jeder Aufruf von copyFileToBlob gibt die Anzahl von hochgeladenen Bytes zurück. Die Summe der Gesamtbytezahl wird folglich berechnet, indem alle diese Rückgabewerte addiert werden.
Die Funktion verwendet die Standarddatei function.json für Orchestratorfunktionen.
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
Im Folgenden wird der Code dargestellt, der die Orchestratorfunktion implementiert:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Beachten Sie die Zeile yield context.task_all(tasks);. Der Code liefert nicht die einzelnen Aufrufe von E2_CopyFileToBlob, daher werden sie parallel ausgeführt. Wenn der Orchestrator das Aufgabenarray an context.task_all übermittelt, wird eine Aufgabe zurückgegeben, die erst abgeschlossen ist, wenn alle Kopiervorgänge abgeschlossen sind. Wenn Sie mit asyncio.gather in Python vertraut sind, ist dies für Sie nicht neu. Der Unterschied besteht darin, dass diese Aufgaben gleichzeitig auf mehreren virtuellen Computern ausgeführt werden können, und die Durable Functions Erweiterung stellt sicher, dass die End-to-End-Ausführung widerstandsfähig für die Prozesswiederverwendung ist.
Hinweis
Obwohl Aufgaben konzeptuell mit Python awaitables vergleichbar sind, sollten Orchestratorfunktionen yield sowie die context.task_all und context.task_any-APIs zum Verwalten der Aufgaben-Parallelisierung verwenden.
Nach der Rückgabe des Orchestrators context.task_allsind alle Funktionsaufrufe abgeschlossen und geben Werte zurück. Jeder Aufruf von E2_CopyFileToBlob gibt die Anzahl von hochgeladenen Bytes zurück. Die Summe der Gesamtbytezahl wird folglich berechnet, indem alle diese Rückgabewerte addiert werden.
Ein PowerShell-Beispiel ist noch nicht verfügbar.
Ein Java Beispiel ist noch nicht verfügbar.
Der Orchestrator führt die folgenden Aktionen aus:
- Verwendet eine Liste der Arbeitsaufgaben als Eingabe.
- Fans out, indem sie eine Aufgabe für jede Arbeitsaufgabe erstellen und parallel verarbeiten.
- Wartet auf den Abschluss aller parallelen Aufgaben.
- Fans durch Aggregieren der Ergebnisse.
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
public override async Task<Dictionary<string, int>> RunAsync(
TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
var processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
}
Verwenden Sie Task.WhenAll(), um auf den Abschluss aller parallelen Aufgaben zu warten. Das Durable Task SDK stellt sicher, dass die Aufgaben gleichzeitig auf mehreren Computern ausgeführt werden können, und die Ausführung ist robust für Neustarts.
import {
OrchestrationContext,
TOrchestrator,
whenAll,
} from "@microsoft/durabletask-js";
const fanOutFanInOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
workItems: string[]
): any {
// Fan-out: create a task for each work item in parallel
const tasks = workItems.map((item) => ctx.callActivity(processWorkItem, item));
// Wait for all parallel tasks to complete
const results: number[] = yield whenAll(tasks);
// Fan-in: aggregate all results
const aggregatedResult = yield ctx.callActivity(aggregateResults, results);
return aggregatedResult;
};
Verwenden Sie whenAll(), um auf den Abschluss aller parallelen Aufgaben zu warten. Das Durable Task SDK stellt sicher, dass die Aufgaben gleichzeitig auf mehreren Computern ausgeführt werden können, und die Ausführung ist robust für Neustarts.
from durabletask import task
def fan_out_fan_in_orchestrator(ctx: task.OrchestrationContext, work_items: list) -> dict:
"""Orchestrator that demonstrates fan-out/fan-in pattern."""
# Fan-out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity(process_work_item, input=item))
# Wait for all tasks to complete
results = yield task.when_all(parallel_tasks)
# Fan-in: Aggregate all the results
final_result = yield ctx.call_activity(aggregate_results, input=results)
return final_result
Verwenden Sie task.when_all(), um auf den Abschluss aller parallelen Aufgaben zu warten. Das Durable Task SDK stellt sicher, dass die Aufgaben gleichzeitig auf mehreren Computern ausgeführt werden können, und die Ausführung ist robust für Neustarts.
Dieses Beispiel ist für .NET, JavaScript, Java und Python verfügbar.
import com.microsoft.durabletask.*;
import java.util.List;
import java.util.stream.Collectors;
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
// Fan-out: Create a task for each input item
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
// Wait for all parallel tasks to complete
List<Integer> allResults = ctx.allOf(tasks).await();
// Fan-in: Aggregate results
int totalCount = allResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalCount);
};
}
})
.build();
Verwenden Sie ctx.allOf(tasks).await(), um auf den Abschluss aller parallelen Aufgaben zu warten. Das Durable Task SDK stellt sicher, dass die Aufgaben gleichzeitig auf mehreren Computern ausgeführt werden können, und die Ausführung ist robust für Neustarts.
Aktivitäten
Die Hilfsaktivitätsfunktionen sind normale Funktionen, die die activityTrigger Bindung verwenden.
Aktivitätsfunktion „E2_GetFileList“
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
ILogger log)
{
log.LogInformation($"Searching for files under '{rootDirectory}'...");
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");
return files;
}
V3-Programmiermodell
Die function.json Datei E2_GetFileList sieht wie im folgenden Beispiel aus:
{
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
],
"disabled": false
}
Dies ist die Implementierung:
const readdirp = require("readdirp");
module.exports = function (context, rootDirectory) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
readdirp(
{ root: rootDirectory, entryType: "all" },
function (fileInfo) {
if (!fileInfo.stat.isDirectory()) {
allFilePaths.push(fileInfo.fullPath);
}
},
function (err, res) {
if (err) {
throw err;
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
context.done(null, allFilePaths);
}
);
};
Die Funktion verwendet das Modul readdirp, Version 2.x, um rekursiv die Verzeichnisstruktur zu lesen.
V4-Programmiermodell
Dies ist die Implementierung der getFileList Aktivitätsfunktion:
const df = require("durable-functions");
const readdirp = require("readdirp");
const getFileListActivityName = "getFileList";
df.app.activity(getFileListActivityName, {
handler: async function (rootDirectory, context) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
for await (const entry of readdirp(rootDirectory, { type: "files" })) {
allFilePaths.push(entry.fullPath);
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
return allFilePaths;
},
});
Die Funktion verwendet das readdirp-Modul (Version 3.x), um die Verzeichnisstruktur rekursiv zu lesen.
Die function.json Datei E2_GetFileList sieht wie im folgenden Beispiel aus:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
]
}
Dies ist die Implementierung:
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
PowerShell-Beispiel wird in Kürze verfügbar sein.
Java Beispiel in Kürze verfügbar.
Hinweis
Fügen Sie diesen Code nicht in die Orchestratorfunktion ein. Orchestrator-Funktionen sollten keine E/A ausführen, einschließlich des lokalen Dateisystemzugriffs. Weitere Informationen finden Sie unter Codeeinschränkungen für Orchestratorfunktionen.
Aktivitätsfunktion „E2_CopyFileToBlob“
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Hinweis
Um den Beispielcode auszuführen, installieren Sie das Microsoft.Azure.WebJobs.Extensions.Storage NuGet-Paket.
Die Funktion verwendet Azure Functions Bindungsfeatures wie den Binder Parameter. Für diese Anleitung benötigen Sie solche Details nicht.
V3-Programmiermodell
Die Datei function.json für E2_CopyFileToBlob ist ebenso einfach:
{
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
},
{
"name": "out",
"type": "blob",
"path": "",
"connection": "AzureWebJobsStorage",
"direction": "out"
}
],
"disabled": false
}
Die JavaScript-Implementierung verwendet das Azure Storage SDK für Node, um die Dateien in Azure Blob Storage hochzuladen.
const fs = require("fs");
const path = require("path");
const storage = require("azure-storage");
module.exports = function (context, filePath) {
const container = "backups";
const root = path.parse(filePath).root;
const blobPath = filePath.substring(root.length).replace("\\", "/");
const outputLocation = `backups/${blobPath}`;
const blobService = storage.createBlobService();
blobService.createContainerIfNotExists(container, (error) => {
if (error) {
throw error;
}
fs.stat(filePath, function (error, stats) {
if (error) {
throw error;
}
context.log(
`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`
);
const readStream = fs.createReadStream(filePath);
blobService.createBlockBlobFromStream(
container,
blobPath,
readStream,
stats.size,
function (error) {
if (error) {
throw error;
}
context.done(null, stats.size);
}
);
});
});
};
V4-Programmiermodell
Die JavaScript-Implementierung von copyFileToBlob verwendet eine Azure Storage Ausgabebindung, um die Dateien in Azure Blob Storage hochzuladen.
const df = require("durable-functions");
const fs = require("fs/promises");
const { output } = require("@azure/functions");
const copyFileToBlobActivityName = "copyFileToBlob";
const blobOutput = output.storageBlob({
path: "backups/{backupPath}",
connection: "StorageConnString",
});
df.app.activity(copyFileToBlobActivityName, {
extraOutputs: [blobOutput],
handler: async function ({ backupPath, filePath }, context) {
const outputLocation = `backups/${backupPath}`;
const stats = await fs.stat(filePath);
context.log(`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`);
const fileContents = await fs.readFile(filePath);
context.extraOutputs.set(blobOutput, fileContents);
return stats.size;
},
});
Die Datei function.json für E2_CopyFileToBlob ist ebenso einfach:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
}
]
}
Die Python-Implementierung verwendet das Azure Storage SDK für Python, um die Dateien in Azure Blob Storage hochzuladen.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
PowerShell-Beispiel wird in Kürze verfügbar sein.
Java Beispiel in Kürze verfügbar.
Die Implementierung lädt die Datei vom Datenträger und streamt den Inhalt asynchron in einen Blob mit demselben Namen im backups Container. Die Funktion gibt die Anzahl der in den Speicher kopierten Bytes zurück. Der Orchestrator verwendet diesen Wert, um die Aggregatsumme zu berechnen.
Hinweis
In diesem Beispiel werden E/A-Vorgänge in eine activityTrigger Funktion verschoben. Die Arbeit kann auf mehreren Computern ausgeführt werden und unterstützt die Fortschrittsüberprüfung. Wenn der Hostprozess endet, wissen Sie, welche Uploads abgeschlossen sind.
Aktivitäten erledigen die Arbeit. Im Gegensatz zu Orchestratoren können Aktivitäten E/A-Vorgänge und nicht deterministische Logik ausführen.
Arbeitsaufgabe verarbeiten
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
private readonly ILogger<ProcessWorkItemActivity> _logger;
public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
{
_logger.LogInformation("Processing work item: {WorkItem}", workItem);
// Process the work item (this is where you do the actual work)
var result = new Dictionary<string, int>
{
{ workItem, workItem.Length }
};
return Task.FromResult(result);
}
}
import { ActivityContext } from "@microsoft/durabletask-js";
const processWorkItem = async (
_ctx: ActivityContext,
item: string
): Promise<number> => {
console.log(`Processing work item: "${item}"`);
return item.length;
};
Im Gegensatz zu Orchestratoren können Aktivitäten E/A-Vorgänge wie HTTP-Aufrufe, Datenbankabfragen und Dateizugriff ausführen.
from durabletask import task
def process_work_item(ctx: task.ActivityContext, item: int) -> dict:
"""Activity that processes a single work item."""
# Process the work item (this is where you do the actual work)
result = item * item
return {"item": item, "result": result}
Dieses Beispiel wird für .NET, JavaScript, Java und Python gezeigt.
import java.util.StringTokenizer;
// Activity registration
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
Aggregierte Ergebnisaktivität
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
private readonly ILogger<AggregateResultsActivity> _logger;
public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(
TaskActivityContext context, Dictionary<string, int>[] results)
{
_logger.LogInformation("Aggregating {Count} results", results.Length);
// Combine all results into one aggregated result
var aggregatedResult = new Dictionary<string, int>();
foreach (var result in results)
{
foreach (var kvp in result)
{
aggregatedResult[kvp.Key] = kvp.Value;
}
}
return Task.FromResult(aggregatedResult);
}
}
import { ActivityContext } from "@microsoft/durabletask-js";
const aggregateResults = async (
_ctx: ActivityContext,
results: number[]
): Promise<object> => {
const total = results.reduce((sum, val) => sum + val, 0);
return {
totalItems: results.length,
sum: total,
average: results.length > 0 ? total / results.length : 0,
};
};
Im Gegensatz zu Orchestratoren können Aktivitäten E/A-Vorgänge wie HTTP-Aufrufe, Datenbankabfragen und Dateizugriff ausführen.
from durabletask import task
def aggregate_results(ctx: task.ActivityContext, results: list) -> dict:
"""Activity that aggregates results from multiple work items."""
sum_result = sum(item["result"] for item in results)
return {
"total_items": len(results),
"sum": sum_result,
"average": sum_result / len(results) if results else 0
}
Dieses Beispiel wird für .NET, JavaScript, Java und Python gezeigt.
Im Java Beispiel aggregiert der Orchestrator Ergebnisse, nachdem ctx.allOf(tasks).await() zurückgegeben wurde.
Beispiel ausführen
Starten Sie die Orchestrierung auf Windows, indem Sie die folgende HTTP POST-Anforderung senden:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Alternativ können Sie in einer Linux-Funktions-App die Orchestrierung starten, indem Sie die folgende HTTP POST-Anforderung senden. Python wird derzeit unter Linux für App Service ausgeführt:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Hinweis
Die HttpStart Funktion erwartet JSON. Fügen Sie den Content-Type: application/json Header ein, und codieren Sie den Verzeichnispfad als JSON-Zeichenfolge. Der HTTP-Codeausschnitt geht davon aus, dasshost.json einen Eintrag enthält, der das Standardpräfix api/ aus allen HTTP-Triggerfunktions-URLs entfernt. Suchen Sie das Markup für diese Konfiguration im Beispiel host.json Datei.
Diese HTTP-Anforderung löst den Orchestrator E2_BackupSiteContent aus und übergibt die Zeichenfolge D:\home\LogFiles als Parameter. Die Antwort verfügt über einen Link, um den Status des Sicherungsvorgangs zu überprüfen:
HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
(...trimmed...)
Je nach Anzahl der Protokolldateien in Ihrer Funktions-App kann dieser Vorgang mehrere Minuten dauern. Rufen Sie den neuesten Status ab, indem Sie die URL im Location Header der vorherigen HTTP 202-Antwort abfragen:
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}
In diesem Fall wird die Funktion weiterhin ausgeführt. Die Antwort zeigt die im Orchestratorzustand gespeicherte Eingabe und die uhrzeit der letzten Aktualisierung an. Verwenden Sie den Location Kopfzeilenwert, um den Abschluss abzufragen. Wenn der Status "Abgeschlossen" ist, ähnelt die Antwort dem folgenden Beispiel:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Die Antwort zeigt, dass die Orchestrierung abgeschlossen ist und die ungefähre Endzeit. Das output Feld gibt an, dass die Orchestrierung ca. 450 KB Protokolle hochgeladen hat.
So führen Sie das Beispiel aus:
Starten Sie den Emulator "Durable Task Scheduler " für die lokale Entwicklung.
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
Starten Sie den Worker , um den Orchestrator und die Aktivitäten zu registrieren.
Führen Sie den Client aus, um eine Orchestrierung mit einer Liste von Arbeitsaufgaben zu planen:
// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(ParallelProcessingOrchestration), workItems);
// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");
import {
DurableTaskAzureManagedClientBuilder,
} from "@microsoft/durabletask-js-azuremanaged";
const connectionString =
process.env.DURABLE_TASK_SCHEDULER_CONNECTION_STRING ||
"Endpoint=http://localhost:8080;Authentication=None;TaskHub=default";
const client = new DurableTaskAzureManagedClientBuilder()
.connectionString(connectionString)
.build();
const workItems = ["item1", "item2", "item3", "item4", "item5"];
const instanceId = await client.scheduleNewOrchestration(fanOutFanInOrchestrator, workItems);
const state = await client.waitForOrchestrationCompletion(instanceId, true, 30);
console.log(`Result: ${state?.serializedOutput}`);
Erstellen Sie die DurableTaskAzureManagedClientBuilder mithilfe einer Verbindungszeichenfolge für den dauerhaften Aufgabenplaner. Verwenden Sie scheduleNewOrchestration zum Starten einer Orchestrierung und waitForOrchestrationCompletion zum Warten auf den Abschluss.
# Schedule the orchestration with a list of work items
work_items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
instance_id = client.schedule_new_orchestration(fan_out_fan_in_orchestrator, input=work_items)
# Wait for completion
result = client.wait_for_orchestration_completion(instance_id, timeout=60)
print(f"Result: {result.serialized_output}")
Dieses Beispiel wird für .NET, JavaScript, Java und Python gezeigt.
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
// Schedule the orchestration with a list of strings
List<String> sentences = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"Always remember that you are absolutely unique.");
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(sentences));
// Wait for completion
OrchestrationMetadata result = client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(30), true);
System.out.println("Total word count: " + result.readOutputAs(int.class));
Nächste Schritte
Dieses Beispiel zeigt das Lüfter-Out-/Fan-In-Muster. Das nächste Beispiel zeigt, wie das Monitormuster mit dauerhaften Timern implementiert wird.