Устойчивые функции — это расширение Функций Azure, которое позволяет писать функции с отслеживанием состояния в беcсерверной вычислительной среде. Расширение позволяет определять рабочие процессы с отслеживанием состояния, создавая функции оркестратора, и сущности с отслеживанием состояния, создавая функции сущностей с помощью модели программирования Функций Azure. В фоновом режиме расширение автоматически управляет состоянием, создает контрольные точки и перезагружается, позволяя вам сосредоточиться на бизнес-логике.
Поддерживаемые языки
Устойчивые функции предназначены для работы со всеми языками программирования в службе "Функции Azure", однако они могут предъявлять разные минимальные требования для каждого языка. В следующей таблице показаны минимальные поддерживаемые конфигурации приложений:
Языковой стек
Версии среды выполнения службы "Функции Azure"
Версия рабочей роли языка
Минимальная версия пакетов
.NET / C# / F#
Функции 1.0 и более поздней версии
Внутрипроцессно Внепроцессно
Недоступно
JavaScript/TypeScript (модель prog. версии 3)
Функции 2.0 и более поздней версии
Node 8 и более поздней версии
Пакеты 2.х
JavaScript/TypeScript (модель prog. версии 4)
Функции 4.16.5+
Узел 18+
Пакеты версии 3.15+
Python
Функции 2.0 и более поздней версии
Python 3.7 и более поздней версии
Пакеты 2.х
Python (модель prog. версии 2)
Функции 4.0 и более поздних версий
Python 3.7 и более поздней версии
Пакеты версии 3.15+
PowerShell
Функции 3.0 и более поздней версии
PowerShell 7 и более поздней версии
Пакеты 2.х
Java
Функции 4.0 и более поздних версий
Java 8 и более поздней версии
Пакеты 4.х
Примечание
Новые модели программирования для разработки функций в Python (версия 2) и Node.js (версия 4) в настоящее время находятся в предварительной версии. По сравнению с текущими моделями новые возможности предназначены для более гибких и интуитивно понятных для разработчиков Python и JavaScript/TypeScript. Дополнительные сведения о различиях между моделями см. в руководстве разработчика python и Node.js руководстве по обновлению.
В следующих фрагментах кода Python (PM2) обозначает модель программирования версии 2, а JavaScript (PM4) — модель программирования версии 4, новые возможности.
Основной вариант использования Устойчивых функций — это упрощение комплексных требований координации с отслеживанием состояния в безсерверных приложениях. В следующих разделах описываются типичные шаблоны приложений, в которых можно эффективно использовать Устойчивые функции.
В шаблоне цепочки функций последовательность функций выполняется в определенном порядке. В этом шаблоне выходные данные одной функции применяются к входным данным другой. Использование очередей между каждой функцией гарантирует, что система остается устойчивой и масштабируемой, даже если существует поток управления от одной функции к другой.
Устойчивые функции можно использовать для реализации шаблона цепочки функций, как показано в следующем примере.
В этом примере значения F1, F2, F3 и F4 являются именами других функций в том же приложении-функции. Поток управления можно реализовать с помощью обычных принудительных конструкций программирования. Код выполняется сверху вниз. Код может включать в себя имеющуюся семантику языка потока управления, такую как условные обозначения и циклы. Вы можете включить логику обработки ошибок в блоках try/catch/finally.
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
Вы можете использовать параметр context для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает await, платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова await. Этот процесс описан в следующем разделе Шаблон 2: развертывание и объединение.
[Function("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
Вы можете использовать параметр context для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает await, платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова await. Этот процесс описан в следующем разделе Шаблон 2: развертывание и объединение.
Вы можете использовать объект context.df для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает yield, платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова yield. Этот процесс описан в следующем разделе Шаблон 2: развертывание и объединение.
Примечание
Объект context в JavaScript представляет весь контекст функции. Доступ к контексту Устойчивых функций с помощью свойства df в основном контексте.
Вы можете использовать объект context.df для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает yield, платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова yield. Этот процесс описан в следующем разделе Шаблон 2: развертывание и объединение.
Примечание
Объект context в JavaScript представляет весь контекст функции. Доступ к контексту Устойчивых функций с помощью свойства df в основном контексте.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
Вы можете использовать объект context для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает yield, платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова yield. Этот процесс описан в следующем разделе Шаблон 2: развертывание и объединение.
Примечание
Объект context в Python представляет контекст оркестрации. Получите доступ к основному контексту Функций Azure, используя свойство function_context в контексте оркестрации.
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.orchestration_trigger(context_name="context")
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
Вы можете использовать объект context для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает yield, платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова yield. Этот процесс описан в следующем разделе Шаблон 2: развертывание и объединение.
Примечание
Объект context в Python представляет контекст оркестрации. Получите доступ к основному контексту Функций Azure, используя свойство function_context в контексте оркестрации.
Вы можете использовать команду Invoke-DurableActivity для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает Invoke-DurableActivity без параметра NoWait, среда Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова Invoke-DurableActivity. Этот процесс описан в следующем разделе Шаблон 2: развертывание и объединение.
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
Вы можете использовать объект ctx для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Выходными данными этих вызовов метода является объект Task<V>, где V — это тип данных, возвращаемых вызываемой функцией. Каждый раз, когда вызывается Task<V>.await(), платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс неожиданно перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова Task<V>.await(). Этот процесс описан в следующем разделе Шаблон 2: развертывание и объединение.
Шаблон 2. Развертывание и объединение
В шаблоне развертывания и объединения вы выполняете параллельно несколько функций, а затем ожидаете их завершения. Часто некоторые агрегирования завершаются по результатам, возвращаемым функциями.
При использовании обычных функций вы можете выполнить развертывание за счет отправки функцией нескольких сообщений в очередь. Войти обратно гораздо сложнее. Чтобы выполнить объединение в обычной функции, напишите код для отслеживания момента, когда функции, активируемые очередью, завершаются, а затем их выходные значения сохраняются.
Расширение "Устойчивые функции" обрабатывает этот шаблон с помощью относительно простого кода:
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
Процесс развертывания распределяется по нескольким экземплярам функции F2. И отслеживается с использованием динамического списка задач. Task.WhenAll вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.
Автоматическое создание контрольных точек, которое происходит при вызове await к Task.WhenAll, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.
[Function("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
Процесс развертывания распределяется по нескольким экземплярам функции F2. И отслеживается с использованием динамического списка задач. Task.WhenAll вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.
Автоматическое создание контрольных точек, которое происходит при вызове await к Task.WhenAll, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
Процесс развертывания распределяется по нескольким экземплярам функции F2. И отслеживается с использованием динамического списка задач. context.df.Task.all API вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.
Автоматическое создание контрольных точек, которое происходит при вызове yield к context.df.Task.all, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.
const df = require("durable-functions");
df.app.orchestration("fanOutFanInDemo", function* (context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
Процесс развертывания распределяется по нескольким экземплярам функции F2. И отслеживается с использованием динамического списка задач. context.df.Task.all API вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.
Автоматическое создание контрольных точек, которое происходит при вызове yield к context.df.Task.all, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
Процесс развертывания распределяется по нескольким экземплярам функции F2. И отслеживается с использованием динамического списка задач. context.task_all API вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.
Автоматическое создание контрольных точек, которое происходит при вызове yield к context.task_all, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.orchestration_trigger(context_name="context")
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
Процесс развертывания распределяется по нескольким экземплярам функции F2. И отслеживается с использованием динамического списка задач. context.task_all API вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.
Автоматическое создание контрольных точек, которое происходит при вызове yield к context.task_all, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
Процесс развертывания распределяется по нескольким экземплярам функции F2. Обратите внимание, что параметр NoWait при вызове функции F2 позволяет оркестратору продолжать вызов F2 без ожидания завершения действия. И отслеживается с использованием динамического списка задач. Команда Wait-ActivityFunction вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.
Автоматическое создание контрольных точек, которое происходит при вызове Wait-ActivityFunction, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
Процесс развертывания распределяется по нескольким экземплярам функции F2. И отслеживается с использованием динамического списка задач. ctx.allOf(parallelTasks).await() вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из динамического списка задач и возвращаются в виде выходных данных функции оркестратора.
Автоматическое создание контрольных точек, которое происходит при вызове .await() в ctx.allOf(parallelTasks), гарантирует, что неожиданная перезагрузка процесса не потребуют перезапуска уже завершенных задач.
Примечание
В редких случаях может произойти сбой в окне после завершения функции действия, но до того, как ее выполнение будет сохранено в журнале оркестрации. В этом случае функция действия будет повторно выполнена с начала после восстановления процесса.
Шаблон 3. Асинхронные API-интерфейсы HTTP
Шаблон асинхронного API HTTP решает проблему координации состояния длительных операций с внешними клиентами. Распространенный способ реализации этого шаблона — наличие триггера конечной точки HTTP для выполнения длительного действия. Затем перенаправьте клиент на конечную точку состояния, которую клиент опрашивает для получения сведений о завершении операции.
Устойчивые функции предоставляют встроенную поддержку для этого шаблона, упрощая или даже удаляя код, который вам нужно написать для взаимодействия с длительными выполнениями функций. Например, в примерах быстрого запуска Устойчивые функции (C#, JavaScript, TypeScript, Python, PowerShell и Java) показана простая команда REST, которую можно использовать для запуска новых экземпляров функции оркестратора. После запуска экземпляра расширение предоставляет API HTTP веб-перехватчика, которые запрашивают состояние функции оркестратора.
В следующем примере показаны команды REST, которые запускают оркестратор и запрашивают его состояние. Для ясности в примере опущены некоторые данные протокола.
Так как управление состоянием осуществляется средой выполнения Устойчивых функций, вам не нужно реализовывать собственный механизм отслеживания состояния.
Расширение "Устойчивые функции" предоставляет встроенные API HTTP, которые управляют длительными оркестрациями. Вы также можете реализовать этот шаблон самостоятельно, используя собственные триггеры функций (например, HTTP, очередь или Центры событий Azure) и устойчивую привязку клиента. Например, для активации действия прекращения можно использовать сообщение очереди. Или можно использовать триггер HTTP, защищенный политикой проверки подлинности Azure Active Directory, вместо встроенных API-интерфейсов HTTP с созданным ключом для проверки подлинности.
Дополнительные сведения см. в статье о функциях HTTP, в которой объясняется, как можно предоставлять асинхронные длительные процессы по протоколу HTTP с помощью расширения "Устойчивые функции".
Шаблон 4. Монитор
Шаблон мониторинга представляет собой гибкий, повторяющийся процесс в рабочем процессе. Например, повторение опроса, пока не будут выполнены определенные условия. Вы можете использовать регулярный триггер таймера для активации основного сценария, например задание периодической очистки. Но интервал является статическим, что усложняет управление временем существования экземпляра. Вы можете использовать Устойчивые функции, чтобы создать гибкие интервалы повторения, управлять временем существования задачи и создавать несколько процессов мониторинга в рамках одной оркестрации.
Примером шаблона мониторинга является измененный сценарий асинхронных API-интерфейсов HTTP, приведенный ранее. Вместо предоставления конечной точки внешнему клиенту для мониторинга длительной операции, мониторинг использует внешнюю конечную точку, а затем ожидает изменение состояния.
Благодаря нескольким строкам кода можно использовать Устойчивые функции, чтобы создать несколько мониторингов, которые наблюдают за произвольными конечными точками. Работа мониторингов может приостановиться, если выполнено условие, или другая функция может использовать клиент оркестрации для завершения мониторинга. Можно изменить интервал мониторинга wait в зависимости от определенного условия (например, экспоненциальной задержки).
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", machineId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
[Function("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] TaskOrchestrationContext context, int jobId)
{
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", machineId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", machineId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
const df = require("durable-functions");
const { DateTime } = require("luxon");
df.app.orchestration("monitorDemo", function* (context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (DateTime.fromJSDate(context.df.currentUtcDateTime) < DateTime.fromJSDate(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", machineId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = DateTime.fromJSDate(context.df.currentUtcDateTime).plus({
seconds: pollingInterval,
});
yield context.df.createTimer(nextCheck.toJSDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
import json
from datetime import timedelta
import azure.functions as func
import azure.durable_functions as df
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@myApp.orchestration_trigger(context_name="context")
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
При получении запроса создается экземпляр оркестрации для указанного идентификатора события. Экземпляр опрашивает состояние до тех пор, пока не будет выполнено условие или до истечения времени ожидания. Для управления интервалом опроса используется устойчивый таймер. После этого можно перейти к другим задачам или завершить оркестрацию.
Шаблон 5. Участие пользователя
Многие автоматизированные процессы требуют некоторого участия пользователя. Трудность вовлечения людей в автоматизированный процесс заключается в том, что пользователи не так доступны и не так быстро реагируют, как облачные службы. Автоматизированный процесс должен учитывать это. Для этого используется время ожидания и логика компенсации.
Процесс утверждения является примером бизнес-процесса, который предполагает взаимодействие с пользователями. Утверждение от менеджера может потребоваться для отчета о расходах, превышающих определенную денежную сумму. Если менеджер не подтверждает отчет о расходах в течение 72 часов (возможно, он отправился в отпуск), начинается процесс эскалации, чтобы получить одобрение от кого-то другого (возможно, вышестоящего менеджера).
Этот шаблон в примере можно реализовать с помощью функции оркестратора. Оркестратор использует устойчивый таймер, чтобы запросить одобрение. Оркестратор начинает эскалацию, если истекло время ожидания. Он ожидает внешнее событие в виде уведомления, созданного в результате участия пользователя.
В этих примерах создается процесс утверждения для демонстрации шаблона участия пользователя:
Для создания устойчивого таймера вызовите context.CreateTimer. Уведомление получает context.WaitForExternalEvent. Затем Task.WhenAny вызывается для того, чтобы решить, следует ли ускорить процесс (сначала истечет время ожидания) или утвердить процесс (утверждение получено до истечения времени ожидания).
Для создания устойчивого таймера вызовите context.CreateTimer. Уведомление получает context.WaitForExternalEvent. Затем Task.WhenAny вызывается для того, чтобы решить, следует ли ускорить процесс (сначала истечет время ожидания) или утвердить процесс (утверждение получено до истечения времени ожидания).
Для создания устойчивого таймера вызовите context.df.createTimer. Уведомление получает context.df.waitForExternalEvent. Затем context.df.Task.any вызывается для того, чтобы решить, следует ли ускорить процесс (сначала истечет время ожидания) или утвердить процесс (утверждение получено до истечения времени ожидания).
Для создания устойчивого таймера вызовите context.df.createTimer. Уведомление получает context.df.waitForExternalEvent. Затем context.df.Task.any вызывается для того, чтобы решить, следует ли ускорить процесс (сначала истечет время ожидания) или утвердить процесс (утверждение получено до истечения времени ожидания).
Для создания устойчивого таймера вызовите context.create_timer. Уведомление получает context.wait_for_external_event. Затем context.task_any вызывается для того, чтобы решить, следует ли ускорить процесс (сначала истечет время ожидания) или утвердить процесс (утверждение получено до истечения времени ожидания).
Для создания устойчивого таймера вызовите context.create_timer. Уведомление получает context.wait_for_external_event. Затем context.task_any вызывается для того, чтобы решить, следует ли ускорить процесс (сначала истечет время ожидания) или утвердить процесс (утверждение получено до истечения времени ожидания).
Для создания устойчивого таймера вызовите Start-DurableTimer. Уведомление получает Start-DurableExternalEventListener. Затем Wait-DurableTask вызывается для того, чтобы решить, следует ли ускорить процесс (сначала истечет время ожидания) или утвердить процесс (утверждение получено до истечения времени ожидания).
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
Вызов метода ctx.waitForExternalEvent(...).await() приостанавливает оркестрацию, пока он не получит событие с именем ApprovalEvent, содержащее полезные данные boolean. При получении события вызывается функция действия для обработки результата утверждения. Однако если такое событие не получено до истечения срока действия timeout (72 часа), вызываются TaskCanceledException и функция действия Escalate.
Примечание
Плата за время, затраченное на ожидание внешних событий при выполнении в плане потребления, не взимается.
Внешний клиент может доставить уведомление о событии в функцию оркестратора, находящуюся в состоянии ожидания, через встроенные интерфейсы API HTTP:
Шаблон 6. Агрегатор (сущности с отслеживанием состояния)
Шестой шаблон заключается в статистической обработке данных событий за определенный период времени в одну доступную для адресации сущность. В этом шаблоне данные, для которых выполняется агрегирование, могут поступать из нескольких источников, могут быть доставлены пакетами или разбросаны по длительным временным периодам. Агрегатору может потребоваться выполнить действия над данными событий по мере их поступления, а внешним клиентам может потребоваться запросить агрегированные данные.
Сложности при реализации этого шаблона с обычными функциями без отслеживания состояния заключаются в том, что управление параллелизмом крайне усложняется. Нужно не только беспокоиться о том, что несколько потоков одновременно изменяют одни и те же данные, также важно и то, чтобы агрегатор одновременно выполнялся только на одной виртуальной машине.
Устойчивые сущности можно использовать, чтобы легко реализовать этот шаблон как отдельную функцию.
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
Устойчивые сущности можно также моделировать как классы в .NET. Эта модель может быть полезной, если список операций является фиксированным и становится большим. Следующий пример представляет собой эквивалентную реализацию сущности Counter с помощью классов и методов .NET.
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
Устойчивые сущности в настоящее время не поддерживаются в . Рабочая роль, изолированная от NET.
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
Примечание
Динамически создаваемые прокси-серверы также доступны в .NET для сигнализации сущностям типобезопасным способом. Кроме сигнализации, клиенты также могут запрашивать состояние функции сущности с помощью типобезопасных методов в привязке клиента оркестрации.
Устойчивые сущности в настоящее время не поддерживаются в . Рабочая роль, изолированная от NET.
Устойчивые сущности в настоящее время не поддерживаются в PowerShell.
В настоящее время в Java не поддерживаются устойчивые сущности.
Функции сущности доступны в расширении Устойчивые функции 2.0 и более поздних версий для C#, JavaScript и Python.
Технология
Расширение "Устойчивые функции" создано на платформе устойчивых задач, библиотеки с открытым кодом на GitHub для создания рабочих процессов в коде. Подобно тому, как Функции Azure являются бессерверным развитием веб-заданий Azure, Устойчивые функции — это бессерверное развитие платформы устойчивых задач. Платформа устойчивых задач широко используется как корпорацией Майкрософт, так и другими организациями для автоматизации критически важных процессов. Это естественный выбор для бессерверной среды функций Azure.
Ограничения кода
Чтобы обеспечить надежность и гарантировать длительное выполнение, в функциях оркестратора предусмотрен набор правил кода, которым нужно следовать. Дополнительные сведения см. в статье Orchestrator function code constraints (Ограничения кода функции оркестратора).
Выставление счетов
Счета за устойчивые функции выставляются так же, как и за Функции Azure. Дополнительные сведения см. на странице цен на Функции Azure. При выполнении функций оркестратора в рамках плана потребления Функций Azure необходимо учитывать некоторые особенности при выставлении счетов. Дополнительные сведения об этом см. в статье Durable Functions Billing (Выставление счетов за Устойчивые функции).
Приступайте к работе
Вы можете начать работу с Устойчивыми функциями менее чем за 10 минут, просмотрев одно из следующих кратких руководств по конкретным языкам.
В этих кратких руководствах показано, как локально создать и протестировать устойчивую функцию "Hello world". Затем вы опубликуете код функции в Azure. Функция, которую вы создаете, организовывает и объединяет в цепочку вызовы других функций.
Публикации
Расширение "Устойчивые функции" разработано совместно с Microsoft Research. Команда, работающая над Устойчивыми функциями, активно создает исследовательские статьи и артефакты, включая следующее:
В следующем видео показаны преимущества Устойчивых функций.
Более подробное обсуждение Устойчивых функций и базовой технологии см. в следующем видео (оно ориентировано на .NET, но понятия также применимы и к другим поддерживаемым языкам):