Сценарии развертывания и объединения в устойчивых функциях. Пример резервного копирования в облако
Мақала
Развертывание и объединение — это шаблон параллельного выполнения нескольких функций с последующим статистическим вычислением результатов. В этой статье описывается пример, использующий устойчивые функции для реализации сценариев развертывания и объединения. Образец представляет устойчивую функцию, создающую резервную копию всего или некоторого содержимого сайта приложения в службе хранилища Azure.
Примечание.
Общедоступна версия 4 модели программирования Node.js для Функции Azure. Новая модель версии 4 предназначена для более гибкого и интуитивно понятного интерфейса для разработчиков JavaScript и TypeScript. Дополнительные сведения о различиях между версиями 3 и 4 см. в руководстве по миграции.
В следующих фрагментах кода JavaScript (PM4) обозначает модель программирования версии 4, новый интерфейс.
В этом образце функции рекурсивно отправляют все файлы в указанном каталоге в хранилище BLOB-объектов и подсчитывают общее число отправленных байтов.
Можно написать одну функцию, которая отвечает за все. Основная проблема, которая может возникнуть, связана с выполнением масштабируемости. На одной виртуальной машине можно запустить выполнение только одной функции, поэтому пропускная способность будет ограничена пропускной способностью этой отдельной виртуальной машины. Следующая проблема заключается в надежности. Если в середине процесса происходит сбой или весь процесс занимает более 5 минут, резервное копирование может завершиться ошибкой в состоянии частичного завершения. В таком случае его необходимо начать сначала.
Более надежным подходом было бы записать две обычные функции: одна для перечисления файлов и добавления их имен в очередь, а другая для чтения из очереди и отправки файлов в хранилище BLOB-объектов. Это лучше с точки зрения пропускной способности и надежности, но для этого необходимо подготовить очередь и управлять ею. Если требуется выполнить какое-либо действие (например, сообщить об общем количестве отправленных байтов), могут возникнуть значительные проблемы в контексте управления состоянием и координацией.
Подход к устойчивым функциям обеспечивает получение всех упомянутых преимуществ с очень низкими издержками.
Функции
В этой статье описаны следующие функции в примере приложения:
E2_BackupSiteContent — функция оркестратора, которая вызывает E2_GetFileList для получения списка файлов для резервного копирования, а затем вызывает E2_CopyFileToBlob для резервного копирования каждого файла.
E2_GetFileList — функция действия, возвращающая список файлов в каталоге.
E2_CopyFileToBlob — функция действия, которая создает резервную копию одного файла в Хранилище BLOB-объектов Azure.
Функция оркестратора E2_BackupSiteContent
Эта функция оркестратора выполняет следующие задачи:
Принимает значение rootDirectory в качестве входного параметра.
Вызывает функцию для получения рекурсивного списка файлов в rootDirectory.
Выполняет несколько параллельных вызовов функции для отправки каждого файла в хранилище BLOB-объектов Azure.
Ожидает завершения всех передач.
Возвращает сумму общего количества байтов, отправленных в хранилище BLOB-объектов Azure.
Ниже приведен код, реализующий функцию оркестратора.
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Обратите внимание на строку await Task.WhenAll(tasks);. Все отдельные вызовы функции E2_CopyFileToBlobне ожидают выполнения, что позволяет выполнять их параллельно. При передаче массива задач в Task.WhenAll мы получаем задачу, которая не выполнится до завершения всех операций копирования. Если вы знакомы с библиотекой параллельных задач (TPL) в .NET, то вы уже знаете об этом. Разница заключается в том, что задачи могут выполняться на нескольких виртуальных машинах одновременно, а расширение Устойчивых функций гарантирует, что сквозное выполнение устойчиво к перезапуску процессов.
Если задача Task.WhenAll завершена, это говорит о том, что все вызовы функций завершены и значения возвращены. Каждый вызов E2_CopyFileToBlob возвращает число переданных байтов, поэтому, чтобы узнать сумму всех байтов, нужно сложить все возвращаемые значения.
Функция использует стандартный файл function.json для функций оркестратора.
Ниже приведен код, реализующий функцию оркестратора.
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Обратите внимание на строку yield context.df.Task.all(tasks);. Все отдельные вызовы функции E2_CopyFileToBlobне приостанавливаются, что позволяет выполнять их параллельно. При передаче массива задач в context.df.Task.all мы получаем задачу, которая не выполнится до завершения всех операций копирования. Если вы знакомы с Promise.all в JavaScript, вы знаете об этом. Разница заключается в том, что задачи могут выполняться на нескольких виртуальных машинах одновременно, а расширение Устойчивых функций гарантирует, что сквозное выполнение устойчиво к перезапуску процессов.
Примечание.
Несмотря на то что задачи концептуально похожи на обещания JavaScript, функции оркестратора должны использовать context.df.Task.all и context.df.Task.any вместо Promise.all и Promise.race для управления параллелизацией задач.
Если задача context.df.Task.all приостановлена, это говорит о том, что все вызовы функций завершены и значения возвращены. Каждый вызов E2_CopyFileToBlob возвращает число переданных байтов, поэтому, чтобы узнать сумму всех байтов, нужно сложить все возвращаемые значения.
Ниже приведен код, реализующий функцию оркестратора.
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Обратите внимание на строку yield context.df.Task.all(tasks);. Все отдельные вызовы функции copyFileToBlobне приостанавливаются, что позволяет выполнять их параллельно. При передаче массива задач в context.df.Task.all мы получаем задачу, которая не выполнится до завершения всех операций копирования. Если вы знакомы с Promise.all в JavaScript, вы знаете об этом. Разница заключается в том, что задачи могут выполняться на нескольких виртуальных машинах одновременно, а расширение Устойчивых функций гарантирует, что сквозное выполнение устойчиво к перезапуску процессов.
Примечание.
Хотя задачи концептуально похожи на обещания JavaScript, функции оркестратора должны использовать context.df.Task.all , context.df.Task.any а не Promise.allPromise.race управлять параллелизацией задач.
Если задача context.df.Task.all приостановлена, это говорит о том, что все вызовы функций завершены и значения возвращены. Каждый вызов copyFileToBlob возвращает число переданных байтов, поэтому, чтобы узнать сумму всех байтов, нужно сложить все возвращаемые значения.
Функция использует стандартный файл function.json для функций оркестратора.
Ниже приведен код, реализующий функцию оркестратора.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Обратите внимание на строку yield context.task_all(tasks);. Все отдельные вызовы функции E2_CopyFileToBlobне приостанавливаются, что позволяет выполнять их параллельно. При передаче массива задач в context.task_all мы получаем задачу, которая не выполнится до завершения всех операций копирования. Если вы знакомы с asyncio.gather в Python, вы знаете об этом. Разница заключается в том, что задачи могут выполняться на нескольких виртуальных машинах одновременно, а расширение Устойчивых функций гарантирует, что сквозное выполнение устойчиво к перезапуску процессов.
Примечание.
Хотя задачи концептуально похожи на допускающие ожидание задачи Python, функции оркестратора должны использовать yield, а также API context.task_all и context.task_any для управления параллелизацией задач.
Если задача context.task_all приостановлена, это говорит о том, что все вызовы функций завершены и значения возвращены. Каждый вызов E2_CopyFileToBlob возвращает число переданных байтов, поэтому, чтобы узнать сумму всех байтов, нужно сложить все возвращаемые значения.
Вспомогательные функции действий
Вспомогательные функции действий, как и другие примеры, являются обычными функциями, которые используют привязку триггера activityTrigger.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Примечание.
Вы можете спросить, почему нельзя просто поместить этот код непосредственно в функцию оркестратора. Это действие может нарушить одно из основных правил функций оркестратора — они не должны выполнять операции ввода-вывода, включая доступ к локальной файловой системе. Дополнительные сведения см. в статье Ограничения кода функции оркестратора.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Примечание.
Чтобы запустить пример кода, потребуется установить пакет NuGet Microsoft.Azure.WebJobs.Extensions.Storage.
Иногда используются некоторые дополнительные функции привязок Функций Azure (то есть параметр Binder). Но не нужно беспокоиться об этом в рамках этого пошагового руководства.
Файл unction.json для E2_CopyFileToBlob такой же простой:
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
Реализация загружает файл с диска и асинхронно выполняет потоковую передачу содержимого в одноименный большой двоичный объект в контейнере резервных копий. В результате возвращается количество байтов, скопированных в хранилище, которое затем используется функцией оркестратора для вычисления общей суммы.
Примечание.
Это отличный пример перемещения операций ввода-вывода в функцию activityTrigger. Можно не только распределить работу между несколькими машинами, но и получить преимущества создания контрольных точек этапов выполнения. Если хост-процесс по какой-либо причине завершится, вы будете знать, какие передачи уже выполнены.
Запуск примера
Можно начать оркестрацию на Windows, отправив приведенный ниже запрос HTTP POST.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Кроме того, в приложении-функции Linux (в настоящее время Python работает только в Linux для Службы приложений) можно запустить оркестрацию следующим образом:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Примечание.
Вызываемая функция HttpStart работает только с содержимым в формате JSON. По этой причине требуется заголовок Content-Type: application/json и путь к каталогу кодируется в виде строки JSON. Кроме того, в предыдущем фрагменте HTTP предполагается, что в файле host.json существует запись, которая позволяет удалить префикс api/ по умолчанию из всех URL-адресов функций для триггеров HTTP. Разметку для этой конфигурации можно найти в файле host.json в примерах.
Этот запрос HTTP активирует оркестратор E2_BackupSiteContent и передает строку D:\home\LogFiles в качестве параметра. В ответе содержится ссылка на получение информации о состоянии операции резервного копирования:
В зависимости от количества файлов журнала в приложении-функции, эта операция может занять несколько минут. Информацию об актуальном состоянии можно получить, отправив запрос на URL-адрес в заголовке Location предыдущего ответа HTTP 202.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
В этом случае функция все еще выполняется. Вы можете просмотреть входные данные, сохраненные в состоянии оркестратора, и последнее время обновления. Для опроса состояния завершения операции можно продолжать использовать значения заголовка Location. Если состояние — "Завершено", вы увидите HTTP-ответ, как в примере ниже:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Теперь оркестрация завершена, и вы можете оценить, сколько времени ушло на ее выполнение. Вы также увидите значение поля output, в котором указано, что отправлено примерно 450 КБ данных журналов.
Следующие шаги
В этом примере показано, как реализовать шаблон развертывания и объединения. В следующем примере показано, как реализовать шаблон мониторинга с помощью устойчивых таймеров.