Fan-out/fan-in voert meerdere functies parallel uit en voegt vervolgens de resultaten samen. In dit artikel wordt een voorbeeld weergegeven waarin Durable Functions wordt gebruikt om een back-up te maken van bepaalde of alle site-inhoud van een app naar Azure Storage.
Vereiste voorwaarden
V3-programmeermodel
V4-programmeermodel
Fan-out/fan-in voert meerdere activiteiten parallel uit en voegt vervolgens de resultaten samen. In dit artikel wordt beschreven hoe u het patroon implementeert met behulp van de Durable Task SDK's voor .NET, JavaScript, Python en Java.
Overzicht van scenario
In dit voorbeeld uploaden de functies alle bestanden onder een opgegeven map (recursief) naar blobopslag. Ze tellen ook het totale aantal geüploade bytes.
Eén functie kan alles verwerken, maar niet schalen. Een uitvoering van één functie wordt uitgevoerd op één virtuele machine (VM), dus de doorvoer is beperkt tot die VM. Betrouwbaarheid is een andere zorg. Als het proces halverwege mislukt of langer dan vijf minuten duurt, kan de back-up een gedeeltelijk voltooide status hebben. Vervolgens start u de back-up opnieuw op.
Een krachtigere benadering is het gebruik van twee afzonderlijke functies: een opsomming van de bestanden en voegt bestandsnamen toe aan een wachtrij en de andere leest uit de wachtrij en uploadt de bestanden naar blob-opslag. Deze aanpak verbetert de doorvoer en betrouwbaarheid, maar u moet de wachtrij instellen en beheren. Belangrijker is dat deze aanpak complexiteit toevoegt voor statusbeheer en coördinatie, zoals het rapporteren van het totale aantal geüploade bytes.
Durable Functions biedt al deze voordelen met weinig overhead.
In het volgende voorbeeld verwerkt de orchestrator meerdere werkitems parallel en worden de resultaten vervolgens samengevoegd. Dit patroon is handig wanneer u het volgende moet doen:
- Een batch met items verwerken waarbij elk item onafhankelijk kan worden verwerkt
- Werk verdelen over meerdere computers voor betere doorvoer
- Resultaten van alle parallelle bewerkingen aggregeren
Zonder het fan-out/fan-in patroon verwerkt u items sequentieel, waardoor de doorvoer wordt beperkt of u uw eigen wachtrij- en coördinatielogica beheert, wat complexiteit toevoegt.
De DURABLE Task SDK's verwerken parallellisatie en coördinatie, dus het patroon is eenvoudig te implementeren.
De functies
In dit artikel worden de functies in de voorbeeld-app beschreven:
-
E2_BackupSiteContent: Een orchestratorfunctie die E2_GetFileList aanroept om een lijst met bestanden op te halen waarvan een back-up moet worden gemaakt, en die vervolgens E2_CopyFileToBlob aanroept voor elk bestand.
-
E2_GetFileList: Een activiteitsfunctie die een lijst met bestanden in een map retourneert.
-
E2_CopyFileToBlob: een activiteitsfunctie die een back-up maakt van één bestand naar Azure Blob Storage.
In dit artikel worden de onderdelen in de voorbeeldcode beschreven:
-
ParallelProcessingOrchestration, fanOutFanInOrchestrator, fan_out_fan_in_orchestrator of FanOutFanIn_WordCount: Een orchestrator die activiteiten parallel uitwaaiert, wacht totdat alle activiteiten zijn voltooid, en vervolgens inwaaiert door de resultaten te aggregeren.
-
ProcessWorkItemActivity, processWorkItem, process_work_itemof CountWords: een activiteit die één werkitem verwerkt.
-
AggregateResultsActivity, aggregateResultsof aggregate_results: een activiteit die resultaten van alle parallelle bewerkingen samenvoegt.
Orchestrator
Deze orchestratorfunctie doet het volgende:
- Neemt
rootDirectory als invoer.
- Roept een functie aan om een recursieve lijst met bestanden onder
rootDirectoryte krijgen.
- Hiermee worden parallelle functie-aanroepen uitgevoerd om elk bestand te uploaden naar Azure Blob Storage.
- Wacht totdat alle uploads zijn voltooid.
- Retourneert het totale aantal geüploade bytes naar Azure Blob Storage.
Hier volgt de code waarmee de orchestratorfunctie wordt geïmplementeerd:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Let op de await Task.WhenAll(tasks); regel. De code wacht niet op de afzonderlijke aanroepen, E2_CopyFileToBlobzodat ze parallel worden uitgevoerd. Wanneer de orchestrator de takenlijst naar Task.WhenAll doorgeeft, wordt er een taak geretourneerd die pas wordt voltooid als alle kopieerbewerkingen zijn afgerond. Als u bekend bent met de TPL (Task Parallel Library) in .NET, is dit patroon bekend. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en dat de Durable Functions-extensie ervoor zorgt dat de end-to-end-uitvoering bestand is tegen het verwerken van recycling.
Nadat de orchestrator wacht Task.WhenAll, zijn alle functieaanroepen voltooid en worden de waarden geretourneerd. Elke aanroep voor E2_CopyFileToBlob geeft het aantal geüploade bytes terug. Bereken het totaal door de retourwaarden toe te voegen.
V3-programmeermodel
De functie maakt gebruik van de standaard function.json voor orchestratorfuncties.
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"disabled": false
}
Hier volgt de code waarmee de orchestratorfunctie wordt geïmplementeerd:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Let op de yield context.df.Task.all(tasks); regel. De code levert de afzonderlijke aanroepen niet op E2_CopyFileToBlob, zodat ze parallel worden uitgevoerd. Wanneer de orchestrator de takenlijst naar context.df.Task.all doorgeeft, wordt er een taak geretourneerd die pas wordt voltooid als alle kopieerbewerkingen zijn afgerond. Als u bekend bent met Promise.all JavaScript, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en dat de Durable Functions-extensie ervoor zorgt dat de end-to-end-uitvoering bestand is tegen het verwerken van recycling.
Opmerking
Hoewel taken conceptueel vergelijkbaar zijn met JavaScript-beloften, moeten orchestrator-functies worden gebruikt context.df.Task.all en context.df.Task.any in plaats van Promise.all en Promise.race om taakparallellisatie te beheren.
Nadat de orchestrator context.df.Task.all heeft vrijgegeven, zijn alle functieaanroepen voltooid en worden de terugkeerwaarden geretourneerd. Elke aanroep om E2_CopyFileToBlob het aantal geüploade bytes te retourneren, dus het berekenen van het totale byteaantal is een kwestie van het samenvoegen van al deze retourwaarden.
V4-programmeermodel
Hier volgt de code waarmee de orchestratorfunctie wordt geïmplementeerd:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace(/\\/g, "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Let op de yield context.df.Task.all(tasks); regel. Alle afzonderlijke aanroepen naar de copyFileToBlob functie zijn niet geretourneerd, waardoor ze parallel kunnen worden uitgevoerd. Wanneer we deze reeks taken context.df.Task.alldoorgeven, krijgen we een taak terug die niet wordt voltooid totdat alle kopieerbewerkingen zijn voltooid. Als u bekend bent met Promise.all JavaScript, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en dat de Durable Functions-extensie ervoor zorgt dat de end-to-end-uitvoering bestand is tegen het verwerken van recycling.
Opmerking
Hoewel taken conceptueel vergelijkbaar zijn met JavaScript-beloften, moeten orchestrator-functies worden gebruikt context.df.Task.all en context.df.Task.any in plaats van Promise.all en Promise.race om taakparallellisatie te beheren.
Na het opleveren van context.df.Task.all, weten we dat alle functieaanroepen zijn voltooid en waarden aan ons hebben teruggegeven. Elke aanroep om copyFileToBlob het aantal geüploade bytes te retourneren, dus het berekenen van het totale byteaantal is een kwestie van het samenvoegen van al deze retourwaarden.
De functie maakt gebruik van de standaard function.json voor orchestratorfuncties.
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
Hier volgt de code waarmee de orchestratorfunctie wordt geïmplementeerd:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Let op de yield context.task_all(tasks); regel. De code levert de afzonderlijke aanroepen niet op E2_CopyFileToBlob, zodat ze parallel worden uitgevoerd. Wanneer de orchestrator de takenlijst naar context.task_all doorgeeft, wordt er een taak geretourneerd die pas wordt voltooid als alle kopieerbewerkingen zijn afgerond. Als u bekend bent met asyncio.gather in Python, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en dat de Durable Functions-extensie ervoor zorgt dat de end-to-end-uitvoering bestand is tegen het verwerken van recycling.
Opmerking
Hoewel taken conceptueel vergelijkbaar zijn met Python awaitables, moeten orchestratorfuncties yield en de context.task_all en context.task_any API's gebruiken om taakparallellisatie te beheren.
Nadat de orchestrator context.task_all heeft vrijgegeven, zijn alle functieaanroepen voltooid en worden de terugkeerwaarden geretourneerd. Elke aanroep om E2_CopyFileToBlob het aantal geüploade bytes te retourneren, zodat we het totale byteaantal kunnen berekenen door alle retourwaarden samen te voegen.
Een PowerShell-voorbeeld is nog niet beschikbaar.
Er is nog geen Java voorbeeld beschikbaar.
De orchestrator doet het volgende:
- Hiermee wordt een lijst met werkitems als invoer gebruikt.
- Verspreidt zich door voor elk werkitem een taak te maken en ze parallel te verwerken.
- Wacht totdat alle parallelle taken zijn voltooid.
- Fans binnenhalen door de resultaten te aggregeren.
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
public override async Task<Dictionary<string, int>> RunAsync(
TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
var processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
}
Gebruik Task.WhenAll() dit om te wachten tot alle parallelle taken zijn voltooid. De Durable Task SDK zorgt ervoor dat de taken gelijktijdig op meerdere computers kunnen worden uitgevoerd en dat de uitvoering bestand is tegen het opnieuw opstarten van processen.
import {
OrchestrationContext,
TOrchestrator,
whenAll,
} from "@microsoft/durabletask-js";
const fanOutFanInOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
workItems: string[]
): any {
// Fan-out: create a task for each work item in parallel
const tasks = workItems.map((item) => ctx.callActivity(processWorkItem, item));
// Wait for all parallel tasks to complete
const results: number[] = yield whenAll(tasks);
// Fan-in: aggregate all results
const aggregatedResult = yield ctx.callActivity(aggregateResults, results);
return aggregatedResult;
};
Gebruik whenAll() dit om te wachten tot alle parallelle taken zijn voltooid. De Durable Task SDK zorgt ervoor dat de taken gelijktijdig op meerdere computers kunnen worden uitgevoerd en dat de uitvoering bestand is tegen het opnieuw opstarten van processen.
from durabletask import task
def fan_out_fan_in_orchestrator(ctx: task.OrchestrationContext, work_items: list) -> dict:
"""Orchestrator that demonstrates fan-out/fan-in pattern."""
# Fan-out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity(process_work_item, input=item))
# Wait for all tasks to complete
results = yield task.when_all(parallel_tasks)
# Fan-in: Aggregate all the results
final_result = yield ctx.call_activity(aggregate_results, input=results)
return final_result
Gebruik task.when_all() dit om te wachten tot alle parallelle taken zijn voltooid. De Durable Task SDK zorgt ervoor dat de taken gelijktijdig op meerdere computers kunnen worden uitgevoerd en dat de uitvoering bestand is tegen het opnieuw opstarten van processen.
Dit voorbeeld is beschikbaar voor .NET, JavaScript, Java en Python.
import com.microsoft.durabletask.*;
import java.util.List;
import java.util.stream.Collectors;
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
// Fan-out: Create a task for each input item
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
// Wait for all parallel tasks to complete
List<Integer> allResults = ctx.allOf(tasks).await();
// Fan-in: Aggregate results
int totalCount = allResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalCount);
};
}
})
.build();
Gebruik ctx.allOf(tasks).await() dit om te wachten tot alle parallelle taken zijn voltooid. De Durable Task SDK zorgt ervoor dat de taken gelijktijdig op meerdere computers kunnen worden uitgevoerd en dat de uitvoering bestand is tegen het opnieuw opstarten van processen.
Activiteiten
De helperactiviteitsfuncties zijn reguliere functies die gebruikmaken van de activityTrigger binding.
E2_GetFileList activiteitsfunctie
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
ILogger log)
{
log.LogInformation($"Searching for files under '{rootDirectory}'...");
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");
return files;
}
V3-programmeermodel
Het bestandfunction.jsonE2_GetFileList ziet eruit zoals in het volgende voorbeeld:
{
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
],
"disabled": false
}
Dit is de implementatie:
const readdirp = require("readdirp");
module.exports = function (context, rootDirectory) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
readdirp(
{ root: rootDirectory, entryType: "all" },
function (fileInfo) {
if (!fileInfo.stat.isDirectory()) {
allFilePaths.push(fileInfo.fullPath);
}
},
function (err, res) {
if (err) {
throw err;
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
context.done(null, allFilePaths);
}
);
};
De functie gebruikt de readdirp module, versie 2.x, om de mapstructuur recursief te lezen.
V4-programmeermodel
Dit is de implementatie van de getFileList activiteitsfunctie:
const df = require("durable-functions");
const readdirp = require("readdirp");
const getFileListActivityName = "getFileList";
df.app.activity(getFileListActivityName, {
handler: async function (rootDirectory, context) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
for await (const entry of readdirp(rootDirectory, { type: "files" })) {
allFilePaths.push(entry.fullPath);
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
return allFilePaths;
},
});
De functie gebruikt de readdirp module (versie 3.x) om de mapstructuur recursief te lezen.
Het bestandfunction.jsonE2_GetFileList ziet eruit zoals in het volgende voorbeeld:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
]
}
Dit is de implementatie:
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
PowerShell-voorbeeld komt binnenkort beschikbaar.
Java voorbeeld binnenkort beschikbaar.
Opmerking
Plaats deze code niet in de orchestratorfunctie. Orchestrator-functies mogen geen I/O uitvoeren, inclusief toegang tot het lokale bestandssysteem. Zie Codebeperkingen voor Orchestrator-functies voor meer informatie.
E2_CopyFileToBlob activiteitsfunctie
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Opmerking
Als u de voorbeeldcode wilt uitvoeren, installeert u het NuGet-pakket Microsoft.Azure.WebJobs.Extensions.Storage.
De functie gebruikt Azure Functions bindingsfuncties zoals de parameter Binder. U hebt deze details niet nodig voor deze stap-voor-stap instructies.
V3-programmeermodel
Het function.json-bestand voor E2_CopyFileToBlob is vergelijkbaar eenvoudig:
{
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
},
{
"name": "out",
"type": "blob",
"path": "",
"connection": "AzureWebJobsStorage",
"direction": "out"
}
],
"disabled": false
}
De JavaScript-implementatie maakt gebruik van de Azure Storage SDK voor Node om de bestanden te uploaden naar Azure Blob Storage.
const fs = require("fs");
const path = require("path");
const storage = require("azure-storage");
module.exports = function (context, filePath) {
const container = "backups";
const root = path.parse(filePath).root;
const blobPath = filePath.substring(root.length).replace("\\", "/");
const outputLocation = `backups/${blobPath}`;
const blobService = storage.createBlobService();
blobService.createContainerIfNotExists(container, (error) => {
if (error) {
throw error;
}
fs.stat(filePath, function (error, stats) {
if (error) {
throw error;
}
context.log(
`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`
);
const readStream = fs.createReadStream(filePath);
blobService.createBlockBlobFromStream(
container,
blobPath,
readStream,
stats.size,
function (error) {
if (error) {
throw error;
}
context.done(null, stats.size);
}
);
});
});
};
V4-programmeermodel
De JavaScript-implementatie van copyFileToBlob maakt gebruik van een Azure Storage uitvoerbinding om de bestanden te uploaden naar Azure Blob Storage.
const df = require("durable-functions");
const fs = require("fs/promises");
const { output } = require("@azure/functions");
const copyFileToBlobActivityName = "copyFileToBlob";
const blobOutput = output.storageBlob({
path: "backups/{backupPath}",
connection: "StorageConnString",
});
df.app.activity(copyFileToBlobActivityName, {
extraOutputs: [blobOutput],
handler: async function ({ backupPath, filePath }, context) {
const outputLocation = `backups/${backupPath}`;
const stats = await fs.stat(filePath);
context.log(`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`);
const fileContents = await fs.readFile(filePath);
context.extraOutputs.set(blobOutput, fileContents);
return stats.size;
},
});
Het function.json-bestand voor E2_CopyFileToBlob is vergelijkbaar eenvoudig:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
}
]
}
De Python-implementatie maakt gebruik van de Azure Storage SDK voor Python om de bestanden te uploaden naar Azure Blob Storage.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
PowerShell-voorbeeld komt binnenkort beschikbaar.
Java voorbeeld binnenkort beschikbaar.
De implementatie laadt het bestand vanaf de schijf en streamt de inhoud asynchroon naar een blob met dezelfde naam in de backups container. De functie retourneert het aantal bytes dat naar de opslag is gekopieerd. De orchestrator gebruikt die waarde om de geaggregeerde som te berekenen.
Opmerking
In dit voorbeeld worden I/O-bewerkingen verplaatst naar een activityTrigger functie. Het werk kan worden uitgevoerd op meerdere computers en ondersteunt voortgangscontrolepunten. Als het hostproces eindigt, weet u welke uploads zijn voltooid.
Activiteiten doen het werk. In tegenstelling tot orchestrators kunnen activiteiten I/O-bewerkingen en niet-deterministische logica uitvoeren.
Verwerk werkitemactiviteit
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
private readonly ILogger<ProcessWorkItemActivity> _logger;
public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
{
_logger.LogInformation("Processing work item: {WorkItem}", workItem);
// Process the work item (this is where you do the actual work)
var result = new Dictionary<string, int>
{
{ workItem, workItem.Length }
};
return Task.FromResult(result);
}
}
import { ActivityContext } from "@microsoft/durabletask-js";
const processWorkItem = async (
_ctx: ActivityContext,
item: string
): Promise<number> => {
console.log(`Processing work item: "${item}"`);
return item.length;
};
In tegenstelling tot orchestrators kunnen activiteiten I/O-bewerkingen uitvoeren, zoals HTTP-aanroepen, databasequery's en bestandstoegang.
from durabletask import task
def process_work_item(ctx: task.ActivityContext, item: int) -> dict:
"""Activity that processes a single work item."""
# Process the work item (this is where you do the actual work)
result = item * item
return {"item": item, "result": result}
Dit voorbeeld wordt weergegeven voor .NET, JavaScript, Java en Python.
import java.util.StringTokenizer;
// Activity registration
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
Activiteit van geaggregeerde resultaten
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
private readonly ILogger<AggregateResultsActivity> _logger;
public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(
TaskActivityContext context, Dictionary<string, int>[] results)
{
_logger.LogInformation("Aggregating {Count} results", results.Length);
// Combine all results into one aggregated result
var aggregatedResult = new Dictionary<string, int>();
foreach (var result in results)
{
foreach (var kvp in result)
{
aggregatedResult[kvp.Key] = kvp.Value;
}
}
return Task.FromResult(aggregatedResult);
}
}
import { ActivityContext } from "@microsoft/durabletask-js";
const aggregateResults = async (
_ctx: ActivityContext,
results: number[]
): Promise<object> => {
const total = results.reduce((sum, val) => sum + val, 0);
return {
totalItems: results.length,
sum: total,
average: results.length > 0 ? total / results.length : 0,
};
};
In tegenstelling tot orchestrators kunnen activiteiten I/O-bewerkingen uitvoeren, zoals HTTP-aanroepen, databasequery's en bestandstoegang.
from durabletask import task
def aggregate_results(ctx: task.ActivityContext, results: list) -> dict:
"""Activity that aggregates results from multiple work items."""
sum_result = sum(item["result"] for item in results)
return {
"total_items": len(results),
"sum": sum_result,
"average": sum_result / len(results) if results else 0
}
Dit voorbeeld wordt weergegeven voor .NET, JavaScript, Java en Python.
In het Java voorbeeld voegt de orkestrator resultaten samen nadat ctx.allOf(tasks).await() teruggeeft.
Het voorbeeld uitvoeren
Start de orchestratie op Windows door de volgende HTTP POST-verzoek te verzenden.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
U kunt ook in een Linux-functie-app de indeling starten door de volgende HTTP POST-aanvraag te verzenden. Python wordt momenteel uitgevoerd op Linux voor App Service:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Opmerking
De HttpStart functie verwacht JSON. Neem de Content-Type: application/json header op en codeer het mappad als een JSON-tekenreeks. In het HTTP-fragment wordt ervan uitgegaan dathost.json een vermelding heeft waarmee het standaardvoorvoegsel api/ wordt verwijderd uit alle URL's van de HTTP-triggerfunctie. Zoek de opmaak voor deze configuratie in het voorbeeldbestandhost.json .
Deze HTTP-aanvraag activeert de E2_BackupSiteContent orchestrator en geeft de tekenreeks D:\home\LogFiles door als een parameter. Het antwoord heeft een koppeling om de status van de back-upbewerking te controleren:
HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
(...trimmed...)
Afhankelijk van het aantal logboekbestanden in uw functie-app kan het enkele minuten duren voordat deze bewerking is voltooid. Haal de meest recente status op door een query uit te voeren op de URL in de Location header van het vorige HTTP 202-antwoord:
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}
In dit geval wordt de functie nog steeds uitgevoerd. Het antwoord toont de invoer die is opgeslagen in de orchestratorstatus en de laatst bijgewerkte tijd. Gebruik de Location headerwaarde om de voltooiing te controleren. Wanneer de status 'Voltooid' is, lijkt het antwoord op het volgende voorbeeld:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Het antwoord laat zien dat de orkestratie is voltooid en de geschatte tijd om te voltooien. Het output veld geeft aan dat de indeling ongeveer 450 kB aan logboeken heeft geüpload.
Ga als volgt te werk om het voorbeeld uit te voeren:
Start de Durable Task Scheduler-emulator voor lokale ontwikkeling.
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
Start de worker om de orkestrator en activiteiten te registreren.
Voer de client uit om een orkestratie in te plannen met een werkitemlijst:
// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(ParallelProcessingOrchestration), workItems);
// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");
import {
DurableTaskAzureManagedClientBuilder,
} from "@microsoft/durabletask-js-azuremanaged";
const connectionString =
process.env.DURABLE_TASK_SCHEDULER_CONNECTION_STRING ||
"Endpoint=http://localhost:8080;Authentication=None;TaskHub=default";
const client = new DurableTaskAzureManagedClientBuilder()
.connectionString(connectionString)
.build();
const workItems = ["item1", "item2", "item3", "item4", "item5"];
const instanceId = await client.scheduleNewOrchestration(fanOutFanInOrchestrator, workItems);
const state = await client.waitForOrchestrationCompletion(instanceId, true, 30);
console.log(`Result: ${state?.serializedOutput}`);
Maak de DurableTaskAzureManagedClientBuilder met behulp van een verbindingsreeks naar de Durable Task Scheduler. Gebruik scheduleNewOrchestration om een orkestratie te starten en waitForOrchestrationCompletion om te wachten op voltooiing.
# Schedule the orchestration with a list of work items
work_items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
instance_id = client.schedule_new_orchestration(fan_out_fan_in_orchestrator, input=work_items)
# Wait for completion
result = client.wait_for_orchestration_completion(instance_id, timeout=60)
print(f"Result: {result.serialized_output}")
Dit voorbeeld wordt weergegeven voor .NET, JavaScript, Java en Python.
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
// Schedule the orchestration with a list of strings
List<String> sentences = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"Always remember that you are absolutely unique.");
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(sentences));
// Wait for completion
OrchestrationMetadata result = client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(30), true);
System.out.println("Total word count: " + result.readOutputAs(int.class));
Volgende stappen
In dit voorbeeld ziet u het fan-out/fan-in patroon. In het volgende voorbeeld ziet u hoe u het monitorpatroon implementeert met duurzame timers.