سيناريو توزيع المهام إلى عدة وجهات وتوزيع المهام إلى الواجهة في وظائف دائمة - مثال النسخ الاحتياطي للسحابة
مقالة
Fan-out/Fan-in يشير إلى نمط تنفيذ وظائف متعددة في وقت واحد ومن ثم تنفيذ بعض التجميع على النتائج. توضح هذه المقالة عينة تستخدم "دوال دائمة" لتنفيذ سيناريو Fan-in/Fan-out. العينة هي وظيفة دائمة تدعم كل أو بعض محتوى موقع التطبيق في تخزين Azure.
إشعار
يتوفر الإصدار 4 من نموذج البرمجة Node.js ل Azure Functions بشكل عام. تم تصميم نموذج v4 الجديد للحصول على تجربة أكثر مرونة وبديهية لمطوري JavaScript وTypeScript. تعرف على المزيد حول الاختلافات بين v3 وv4 في دليل الترحيل.
في القصاصات البرمجية التالية، يشير JavaScript (PM4) إلى نموذج البرمجة V4، التجربة الجديدة.
في هذه العينة، تقوم الدوال بتحميل كافة الملفات ضمن دليل محدد بشكل متكرر في تخزين blob. كما أنها تحسب العدد الإجمالي للبايتات التي تم تحميلها.
من الممكن كتابة وظيفة واحدة تهتم بكل شيء. المشكلة الرئيسية التي قد تواجهها هي قابلية التوسع. يمكن تشغيل تنفيذ دالة واحدة فقط على جهاز افتراضي واحد، لذلك سيتم تقييد الإنتاجية بواسطة الإنتاجية من هذا VM الوحيد. مشكلة أخرى هي الاعتمادية. إذا كان هناك فشل في منتصف الطريق، أو إذا كانت العملية بأكملها تستغرق أكثر من 5 دقائق، فقد تفشل النسخة الاحتياطية في حالة اكتمال جزئي. ثم تحتاج بعد ذلك إلى إعادة تشغيل.
سيكون الأسلوب الأكثر قوة هو كتابة دالتين منتظمين: واحدة لذكر الملفات وإضافة أسماء الملفات إلى قائمة انتظار، وأخرى ستقرأ من قائمة الانتظار وتحميل الملفات إلى تخزين blob. هذا الأسلوب هو الأفضل من حيث الإنتاجية والاعتمادية، لكنه يتطلب منك توفير وإدارة قائمة الانتظار. والأهم من ذلك، يتم إدخال تعقيد كبير من حيث إدارة الدولةوالتنسيق إذا كنت ترغب في القيام بأي شيء أكثر من ذلك، مثل الإبلاغ عن العدد الإجمالي للبايت التي تم تحميلها.
يمنحك أسلوب الدوال الدائمة كل من الفوائد المذكورة مع نفقات عامة منخفضة جدًّا.
الدوال
توضح هذه المقالة الدوال التالية في تطبيق العينة:
E2_BackupSiteContentدالة منسق التي تستدعي E2_GetFileList للحصول على قائمة الملفات لإجراء نسخة احتياطية، ثم يستدعي E2_CopyFileToBlob إجراء نسخة احتياطية من كل ملف.
E2_GetFileListدالة نشاط تقوم بإرجاع قائمة الملفات في دليل.
E2_CopyFileToBlobدالة نشاط تقوم بعمل نسخة احتياطية من ملف واحد إلى تخزين Azure Blob.
E2_BackupSiteContent دالة المنسق
تقوم دالة التزامن هذه بما يلي بشكل أساسي:
يأخذ rootDirectory قيمة كضابط إدخال.
استدعاء دالة للحصول على قائمة متكررة من الملفات ضمن rootDirectory .
إجراء استدعاءات دالة متوازية متعددة لتحميل كل ملف في تخزين Azure Blob.
ينتظر اكتمال كافة التحميلات.
إرجاع مجموع البايتات التي تم تحميلها إلى تخزين Azure Blob.
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
لاحظ السطر await Task.WhenAll(tasks);. لم يتم انتظار كل المكالمات الفردية إلى الدالة E2_CopyFileToBlob، مما يسمح لهم بالتشغيل بشكل متوازٍ. عندما نمرر هذه المجموعة من المهام إلى Task.WhenAll، نستعيد مهمة لن تكتمل حتى تكتمل جميع عمليات النسخ. إذا كنت معتادًا على مكتبة المهام المتوازية (TPL) في .NET، فهذا ليس جديدًا عليك. الفرق هو أن هذه المهام يمكن أن تعمل على أجهزة افتراضية متعددة في وقت واحد، ويضمن ملحق "وظائف دائمة" أن التنفيذ من طرف إلى طرف مرن لمعالجة إعادة التدوير.
بعد انتظار من Task.WhenAll ، ونحن نعلم أن جميع مكالمات الدالة قد أكملت وعادت القيم مرة أخرى إلينا. كل استدعاء E2_CopyFileToBlob لإرجاع عدد وحدات البايت التي تم تحميلها، لذا فإن حساب مجموع عدد البايت هو مسألة إضافة لكل هذه القيم معًا.
تستخدم الدالة function.js القياسية لدالات التزامن.
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
لاحظ السطر yield context.df.Task.all(tasks);. لم يتم انتظار كل المكالمات الفردية إلى الدالة E2_CopyFileToBlob، مما يسمح لهم بالتشغيل بشكل متواز. عندما نمرر هذه المجموعة من المهام إلى context.df.Task.all، نستعيد مهمة لن تكتمل حتى تكتمل جميع عمليات النسخ. إذا كنت على دراية Promise.allبـ جافا سكريبت، سيكون هذا غير جديد لك. الفرق هو أن هذه المهام يمكن أن تعمل على أجهزة افتراضية متعددة في وقت واحد، ويضمن ملحق "وظائف دائمة" أن التنفيذ من طرف إلى طرف مرن لمعالجة إعادة التدوير.
إشعار
على الرغم من أن المهام تشبه من الناحية المفاهيمية وعود JavaScript ، يجب أن تستخدم وظائف التزامن context.df.Task.allcontext.df.Task.any وبدلًا من وإدارة Promise.allPromise.race توازي المهام.
بعد انتظار من context.df.Task.all، ونحن نعلم أن جميع مكالمات الدالة قد أكملت وعادت القيم مرة أخرى إلينا. كل استدعاء E2_CopyFileToBlob لإرجاع عدد وحدات البايت التي تم تحميلها، لذا فإن حساب مجموع عدد البايت هو مسألة إضافة لكل هذه القيم معًا.
هذا هو كود البرمجيات الذي ينفذ دالة التزامن:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
لاحظ السطر yield context.df.Task.all(tasks);. لم يتم انتظار كل المكالمات الفردية إلى الدالة copyFileToBlob، مما يسمح لهم بالتشغيل بشكل متواز. عندما نمرر هذه المجموعة من المهام إلى context.df.Task.all، نستعيد مهمة لن تكتمل حتى تكتمل جميع عمليات النسخ. إذا كنت على دراية Promise.allبـ جافا سكريبت، سيكون هذا غير جديد لك. الفرق هو أن هذه المهام يمكن أن تعمل على أجهزة افتراضية متعددة في وقت واحد، ويضمن ملحق "وظائف دائمة" أن التنفيذ من طرف إلى طرف مرن لمعالجة إعادة التدوير.
إشعار
على الرغم من أن المهام مشابهة من الناحية المفاهيمية لوعود JavaScript، يجب أن تستخدم context.df.Task.all وظائف المنسق بدلا context.df.Task.any من Promise.all وإدارة Promise.race توازي المهام.
بعد انتظار من context.df.Task.all، ونحن نعلم أن جميع مكالمات الدالة قد أكملت وعادت القيم مرة أخرى إلينا. كل استدعاء copyFileToBlob لإرجاع عدد وحدات البايت التي تم تحميلها، لذا فإن حساب مجموع عدد البايت هو مسألة إضافة لكل هذه القيم معًا.
تستخدم الدالة function.js القياسية لدالات التزامن.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
لاحظ السطر yield context.task_all(tasks);. لم يتم انتظار كل المكالمات الفردية إلى الدالة E2_CopyFileToBlob، مما يسمح لهم بالتشغيل بشكل متواز. عندما نمرر هذه المجموعة من المهام إلى context.task_all، نستعيد مهمة لن تكتمل حتى تكتمل جميع عمليات النسخ. إذا كنت على دراية asyncio.gatherبـ جافا سكريبت، سيكون هذا غير جديد لك. الفرق هو أن هذه المهام يمكن أن تعمل على أجهزة افتراضية متعددة في وقت واحد، ويضمن ملحق "وظائف دائمة" أن التنفيذ من طرف إلى طرف مرن لمعالجة إعادة التدوير.
إشعار
على الرغم من أن المهام تشبه من الناحية المفاهيمية وعود JavaScript، يجب أن تستخدم وظائف التزامن واجهات برمجة التطبيقات yield بالإضافة إلى context.task_all وcontext.task_any لإدارة توازي المهام.
بعد انتظار من context.task_all، ونحن نعلم أن جميع مكالمات الدالة قد أكملت وعادت القيم مرة أخرى إلينا. كل استدعاء E2_CopyFileToBlob لإرجاع عدد وحدات البايت التي تم تحميلها، لذا فإن حساب مجموع عدد البايت هو مسألة إضافة لكل هذه القيم معًا.
دالات نشاط المساعد
وظائف نشاط المساعد، كما هو الحال مع عينات أخرى، هي مجرد دوال منتظمة تستخدم activityTrigger ربط الإطلاق.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
إشعار
قد تتساءل لماذا لا يمكنك وضع هذا الرمز مباشرة في دالة التزامن. يمكنك، ولكن هذا من شأنه كسر إحدى القواعد الأساسية لدوال التزامن، وهي أنه لا ينبغي لهم أبدًا القيام بالإدخال/أو الإخراج، بما في ذلك الوصول إلى نظام الملفات المحلي. لمزيد من المعلومات، راجع المقالة قيود رمز دالة المنسق.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
يقوم التطبيق بتحميل الملف من القرص ويقوم بتدفق المحتويات بطريقة غير مزامنة في النقطة التي تحمل نفس الاسم في خانة "النسخ الاحتياطي". قيمة الإرجاع هو عدد وحدات البايت المنسوخة إلى التخزين، ثم يتم استخدامها من قبل الدالة orchestrator لحساب المجموع الكلي.
إشعار
هذا مثال مثالي لنقل عمليات الإدخال/والإخراج إلى activityTrigger دالة. ليس فقط يمكن توزيع العمل عبر العديد من الآلات المختلفة ، ولكن يمكنك أيضًا الحصول على فوائد نقاط التفتيش على التقدم. إذا تم إنهاء عملية المضيف لأي سبب من الأسباب، فأنت تعرف التحميلات التي اكتملت بالفعل.
تشغيل تطبيق العرض التوضيحي
يمكنك بدء التزامن، على نظام تشغيل Windows، عن طريق إرسال طلب HTTP POST التالي.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
بدلًا من ذلك، على تطبيق دالة Linux (Python يعمل حاليًّا فقط على Linux لخدمة التطبيقات)، يمكنك بدء التزامن مثل ذلك:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
إشعار
HttpStartتعمل الدالة التي تقوم بالاستدعاء فقط مع المحتوى منسق JSON. لهذا السبب، Content-Type: application/json الرأس مطلوب ويتم ترميز مسار الدليل كسلسلة JSON. علاوة علي ذلك، يفترض قصاصة محتوى HTTP السابق وجود إدخال في host.json الملف الذي يزيل api/ البادئة الافتراضية من كافة عناوين URL دوال تشغيل HTTP. يمكنك العثور على علامات هذا التكوين في host.json الملف في العينات.
هذا الطلب لـ HTTP يقوم بتشغيل E2_BackupSiteContent التزامن وتمرير السلسلة D:\home\LogFiles كمعلمة. توفر الاستجابة رابطًا للحصول على حالة عملية النسخ الاحتياطي:
اعتمادًا على عدد ملفات السجل التي لديك في تطبيق الدوال، قد تستغرق هذه العملية عدة دقائق لإكمالها. يمكنك الحصول على أحدث حالة عن طريق الاستعلام عن عنوان URL في Location رأس استجابة HTTP 202 السابقة.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
في هذه الحالة، لا تزال الدالة قيد التشغيل. يمكنك مشاهدة الإدخال الذي تم حفظه في حالة التزامن وآخر مرة تم تحديثه. يمكنك الاستمرار في استخدام Location قيم الرأس للاستطلاع لإكمالها. عندما تكون الحالة "مكتملة"، سترى قيمة استجابة HTTP مشابهة لما يلي:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
الآن يمكنك أن ترى أن التزامن مكتمل وتقريبًا كم من الوقت استغرق لإكماله. كما ترى قيمة output للحقل، مما يشير إلى أنه تم تحميل حوالي 450 كيلوبايت من السجلات.
الخطوات التالية
وقد أظهرت هذه العينة كيفية توزيع المهام في عدة وجهات وتوزيع المهام إلى نفس الوجهة. يوضح النموذج التالي كيفية تطبيق نمط جهاز العرض باستخدام أجهزة ضبط الوقت المتينة.