다음을 통해 공유


Durable Functions란?

Durable Functions는 서버리스 컴퓨팅 환경에서 상태 저장 함수를 작성할 수 있도록 하는 Azure Functions의 기능입니다. 확장을 통해 Azure Functions 프로그래밍 모델에서 오케스트레이터 함수를 작성하여 상태 저장 워크플로를 정의하고, 엔터티 함수를 작성하여 상태 저장 엔터티를 정의할 수 있습니다. 확장은 내부적으로 상태, 검사점 및 다시 시작을 관리하므로 비즈니스 논리에 집중할 수 있습니다.

지원되는 언어

Durable Functions는 모든 Azure Functions 프로그래밍 언어로 작동하도록 설계되었지만 각 언어에 대한 최소 요구 사항은 다를 수 있습니다. 다음 표에서는 지원되는 최소 앱 구성을 보여줍니다.

언어 스택 Azure Functions 런타임 버전 언어 작업자 버전 최소 번들 버전
.NET/C#/F# Functions 1.0+ In-process
Out of Process
해당 없음
JavaScript/TypeScript(V3 prog. 모델) Functions 2.0+ Node 8+ 2.x 번들
JavaScript/TypeScript(V4 prog. 모델) Functions 4.25+ Node 18+ 3.15+ 번들
Python Functions 2.0+ Python 3.7+ 2.x 번들
Python(V2 prog. 모델) Functions 4.0+ Python 3.7+ 3.15+ 번들
PowerShell Functions 3.0+ PowerShell 7+ 2.x 번들
Java Functions 4.0+ Java 8+ 4.x 번들

Important

이 문서에서는 탭을 사용하여 여러 버전의 Node.js 프로그래밍 모델을 지원합니다. v4 모델은 일반적으로 사용 가능하며 JavaScript 및 TypeScript 개발자를 위해 보다 유연하고 직관적인 환경을 제공하도록 설계되었습니다. v4 모델의 작동 방식에 대한 자세한 내용은 Azure Functions Node.js 개발자 가이드를 참조하세요. v3과 v4의 차이점에 대해 자세히 알아보려면 마이그레이션 가이드를 참조하세요.

Important

이 문서에서는 탭을 사용하여 여러 버전의 Python 프로그래밍 모델을 지원합니다. v2 모델은 정식 출시되었으며 데코레이터를 통해 함수를 작성하는 더욱 코드 중심적인 방법을 제공하도록 설계되었습니다. v2 모델의 작동 방식에 대한 자세한 내용은 Azure Functions Python 개발자 가이드를 참조하세요.

Azure Functions와 마찬가지로 Visual Studio, Visual Studio CodeAzure Portal을 사용하여 Durable Functions를 개발하는 데 도움이 되는 템플릿이 있습니다.

애플리케이션 패턴

Durable Functions에 대한 기본 사용 사례는 서버리스 애플리케이션에서 복잡한 상태 저장 조정 요구 사항을 단순화하는 것입니다. 다음 섹션에서는 Durable Functions의 이점을 누릴 수 있는 일반적인 애플리케이션 패턴에 대해 설명합니다.

패턴 #1: 함수 체이닝

함수 체이닝 패턴에서는 일련의 함수가 특정 순서로 실행됩니다. 이 패턴에서 한 함수의 출력은 다른 함수의 입력에 적용됩니다. 각 함수 간에 큐를 사용하면 한 함수에서 다음 함수로의 제어 흐름이 있더라도 시스템이 내구성과 확장성을 유지할 수 있습니다.

함수 체이닝 패턴의 다이어그램

다음 예제와 같이 Durable Functions를 사용하여 함수 체이닝 패턴을 간결하게 구현할 수 있습니다.

이 예제에서 F1, F2, F3F4 값은 동일한 함수 앱에 있는 다른 함수의 이름입니다. 일반적인 명령적 코딩 구문을 사용하여 제어 흐름을 구현할 수 있습니다. 코드는 위에서 아래로 실행됩니다. 코드에는 조건부 및 루프와 같은 기존 언어 제어 흐름 의미 체계가 포함될 수 있습니다. try/catch/finally 블록에는 오류 처리 논리가 포함될 수 있습니다.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

context 매개 변수를 사용하여 이름을 기준으로 다른 함수를 호출하고, 매개 변수를 전달하며, 함수 출력을 반환할 수 있습니다. 코드에서 await를 호출할 때마다 Durable Functions 프레임워크는 현재 함수 인스턴스의 진행률 검사점을 설정합니다. 프로세스 또는 가상 머신이 실행 중간에 재생되면 함수 인스턴스가 이전 await 호출에서 다시 시작됩니다. 자세한 내용은 다음 섹션인 패턴 #2: 팬아웃/팬인을 참조하세요.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

context.df 개체를 사용하여 이름을 기준으로 다른 함수를 호출하고, 매개 변수를 전달하며, 함수 출력을 반환할 수 있습니다. 코드에서 yield를 호출할 때마다 Durable Functions 프레임워크는 현재 함수 인스턴스의 진행률 검사점을 설정합니다. 프로세스 또는 가상 머신이 실행 중간에 재생되면 함수 인스턴스가 이전 yield 호출에서 다시 시작됩니다. 자세한 내용은 다음 섹션인 패턴 #2: 팬아웃/팬인을 참조하세요.

참고 항목

JavaScript의 context 개체는 함수 컨텍스트 전체를 나타냅니다. 기본 컨텍스트의 df 속성을 사용하여 Durable Functions 컨텍스트에 액세스하세요.

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result


main = df.Orchestrator.create(orchestrator_function)

context 개체를 사용하여 이름을 기준으로 다른 함수를 호출하고, 매개 변수를 전달하며, 함수 출력을 반환할 수 있습니다. 코드에서 yield를 호출할 때마다 Durable Functions 프레임워크는 현재 함수 인스턴스의 진행률 검사점을 설정합니다. 프로세스 또는 가상 머신이 실행 중간에 재생되면 함수 인스턴스가 이전 yield 호출에서 다시 시작됩니다. 자세한 내용은 다음 섹션인 패턴 #2: 팬아웃/팬인을 참조하세요.

참고 항목

Python의 context 개체는 오케스트레이션 컨텍스트를 나타냅니다. 오케스트레이션 컨텍스트의 function_context 속성을 사용하여 주 Azure Functions 컨텍스트에 액세스하세요.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Invoke-DurableActivity 명령을 사용하여 이름을 기준으로 다른 함수를 호출하고, 매개 변수를 전달하며, 함수 출력을 반환할 수 있습니다. 코드에서 NoWait 스위치 없이 Invoke-DurableActivity을 호출할 때마다 Durable Functions 프레임워크는 현재 함수 인스턴스의 진행률 검사점을 설정합니다. 프로세스 또는 가상 머신이 실행 중간에 재생되면 함수 인스턴스가 이전 Invoke-DurableActivity 호출에서 다시 시작됩니다. 자세한 내용은 다음 섹션인 패턴 #2: 팬아웃/팬인을 참조하세요.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

ctx 개체를 사용하여 이름을 기준으로 다른 함수를 호출하고, 매개 변수를 전달하며, 함수 출력을 반환할 수 있습니다. 해당 메서드의 출력은 Task<V> 개체이며, 여기서 V는 호출된 함수에서 반환되는 데이터 형식입니다. Task<V>.await()를 호출할 때마다 Durable Functions 프레임워크는 현재 함수 인스턴스의 진행률 검사점을 설정합니다. 프로세스가 실행 중간에 예기치 않게 재활용되면 함수 인스턴스가 이전 Task<V>.await() 호출에서 다시 시작됩니다. 자세한 내용은 다음 섹션인 패턴 #2: 팬아웃/팬인을 참조하세요.

패턴 #2: 팬아웃/팬인

팬아웃/팬인 패턴에서는 여러 함수를 병렬로 실행한 다음, 모든 함수가 완료될 때까지 기다립니다. 일부 집계 작업은 함수에서 반환된 결과에 대해 수행되는 경우가 많습니다.

팬아웃/팬인 패턴의 다이어그램

일반 함수를 사용하면 함수에서 여러 메시지를 큐에 보내도록 하여 팬아웃할 수 있습니다. 다시 팬인하는 것은 훨씬 더 어렵습니다. 일반 함수에서 팬인하려면 큐 트리거 함수가 종료되는 시점을 추적한 다음, 함수 출력을 저장하는 코드를 작성해야 합니다.

Durable Functions 확장에서는 비교적 간단한 코드를 사용하여 이 패턴을 처리합니다.

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

팬아웃 작업은 F2 함수의 여러 인스턴스에 배포됩니다. 작업은 동적 작업 목록을 사용하여 추적됩니다. 호출된 모든 함수가 완료될 때까지 기다리기 위해 Task.WhenAll이 호출됩니다. 그런 다음, F2 함수 출력이 동적 작업 목록에서 집계되어 F3 함수에 전달됩니다.

Task.WhenAllawait 호출에서 검사점이 자동으로 설정되어 잠재적인 중간 충돌 또는 다시 부팅에서 이미 완료된 작업을 다시 시작할 필요가 없습니다.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

팬아웃 작업은 F2 함수의 여러 인스턴스에 배포됩니다. 작업은 동적 작업 목록을 사용하여 추적됩니다. 호출된 모든 함수가 완료될 때까지 기다리기 위해 context.df.Task.all API가 호출됩니다. 그런 다음, F2 함수 출력이 동적 작업 목록에서 집계되어 F3 함수에 전달됩니다.

context.df.Task.allyield 호출에서 검사점이 자동으로 설정되어 잠재적인 중간 충돌 또는 다시 부팅에서 이미 완료된 작업을 다시 시작할 필요가 없습니다.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

팬아웃 작업은 F2 함수의 여러 인스턴스에 배포됩니다. 작업은 동적 작업 목록을 사용하여 추적됩니다. 호출된 모든 함수가 완료될 때까지 기다리기 위해 context.task_all API가 호출됩니다. 그런 다음, F2 함수 출력이 동적 작업 목록에서 집계되어 F3 함수에 전달됩니다.

context.task_allyield 호출에서 검사점이 자동으로 설정되어 잠재적인 중간 충돌 또는 다시 부팅에서 이미 완료된 작업을 다시 시작할 필요가 없습니다.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

팬아웃 작업은 F2 함수의 여러 인스턴스에 배포됩니다. F2 함수 호출에서 NoWait 스위치 사용법에 유의하세요. 이 스위치를 사용하면 오케스트레이터가 작업이 완료될 때까지 기다리지 않고 F2를 계속 호출할 수 있습니다. 작업은 동적 작업 목록을 사용하여 추적됩니다. 호출된 모든 함수가 완료될 때까지 기다리기 위해 Wait-ActivityFunction 명령이 호출됩니다. 그런 다음, F2 함수 출력이 동적 작업 목록에서 집계되어 F3 함수에 전달됩니다.

Wait-ActivityFunction 호출에서 발생하는 검사점이 자동으로 설정되어 잠재적인 중간 충돌 또는 다시 부팅에서 이미 완료된 작업을 다시 시작할 필요가 없습니다.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

팬아웃 작업은 F2 함수의 여러 인스턴스에 배포됩니다. 작업은 동적 작업 목록을 사용하여 추적됩니다. 호출된 모든 함수가 완료될 때까지 기다리기 위해 ctx.allOf(parallelTasks).await()이 호출됩니다. 그런 다음, F2 함수 출력이 동적 태스크 목록에서 집계되어 오케스트레이터 함수의 출력으로 반환됩니다.

ctx.allOf(parallelTasks).await() 호출에서 검사점이 자동으로 설정되므로 예기치 않은 프로세스 재활용에서 이미 완료된 태스크를 다시 시작할 필요가 없습니다.

참고 항목

드문 경우이지만 활동 함수가 완료된 후 해당 완료가 오케스트레이션 기록에 저장되기 전에 창에서 충돌이 발생할 수 있습니다. 이 경우 프로세스가 복구되면 활동 함수가 처음부터 다시 실행됩니다.

패턴 #3: 비동기 HTTP API

비동기 HTTP API 패턴은 외부 클라이언트와 장기 실행 작업의 상태를 조정하는 문제를 해결합니다. 이 패턴을 구현하는 일반적인 방법은 HTTP 엔드포인트에서 장기 실행 작업을 트리거하도록 하는 것입니다. 그런 다음, 클라이언트를 클라이언트에서 폴링하는 상태 엔드포인트로 리디렉션하여 작업이 완료된 시점을 알아봅니다.

HTTP API 패턴의 다이어그램

Durable Functions는 이 패턴에 대한 기본 제공 지원을 제공하여 장기 실행 함수의 실행과 상호 작용하기 위해 작성해야 하는 코드를 간소화하거나 제거합니다. 예를 들어 Durable Functions 빠른 시작 샘플(C#, JavaScript, TypeScript, Python, PowerShellJava)에서는 새 오케스트레이터 함수 인스턴스를 시작하는 데 사용할 수 있는 간단한 REST 명령을 보여 줍니다. 인스턴스가 시작되면 확장에서 오케스트레이터 함수 상태를 쿼리하는 웹후크 HTTP API를 공개합니다.

다음 예제에서는 오케스트레이터를 시작하고 해당 상태를 쿼리하는 REST 명령을 보여 줍니다. 명확히 하기 위해 예제에서 일부 프로토콜 세부 정보가 생략되었습니다.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Durable Functions 런타임에서 사용자를 위해 상태를 관리하므로 사용자 고유의 상태 추적 메커니즘을 구현할 필요가 없습니다.

Durable Functions 확장은 장기 실행 오케스트레이션을 관리하는 기본 제공 HTTP API를 공개합니다. 또는 사용자 고유의 함수 트리거(예: HTTP, 큐 또는 Azure Event Hubs)와 지속성 클라이언트 바인딩을 사용하여 이 패턴을 직접 구현할 수 있습니다. 예를 들어 큐 메시지를 사용하여 종료를 트리거할 수 있습니다. 또는 생성된 키를 인증에 사용하는 기본 제공 HTTP API 대신 Microsoft Entra 인증 정책으로 보호되는 HTTP 트리거를 사용할 수 있습니다.

자세한 내용은 HTTP 기능 문서를 참조하세요. 이 문서에서는 Durable Functions 확장을 사용하여 HTTP를 통해 비동기 장기 실행 프로세스를 공개할 수 있는 방법을 설명합니다.

패턴 #4: 모니터

모니터 패턴은 워크플로에서 유연한 되풀이 프로세스를 나타냅니다. 예를 들어 특정 조건이 충족될 때까지 폴링하는 경우가 있습니다. 일반 타이머 트리거를 사용하여 정기적인 정리 작업과 같은 기본 시나리오를 처리할 수 있지만, 간격이 정적이고 인스턴스 수명 관리가 복잡해집니다. Durable Functions를 사용하여 유연한 되풀이 간격을 만들고, 작업 수명을 관리하며, 단일 오케스트레이션에서 여러 모니터 프로세스를 만들 수 있습니다.

모니터 패턴의 예로 앞의 비동기 HTTP API 시나리오를 반대로 바꾸는 것이 있습니다. 장기 실행 모니터는 외부 클라이언트의 엔드포인트를 공개하여 장기 실행 작업을 모니터링하는 대신, 외부 엔드포인트를 사용하여 상태 변경을 기다립니다.

모니터 패턴의 다이어그램

몇 줄의 코드에서 Durable Functions를 사용하여 임의의 엔드포인트를 관찰하는 여러 모니터를 만들 수 있습니다. 조건이 충족되면 모니터에서 실행을 종료하거나, 다른 함수에서 지속형 오케스트레이션 클라이언트를 사용하여 모니터를 종료할 수 있습니다. 특정 조건(예: 지수 백오프)에 따라 모니터의 wait 간격을 변경할 수 있습니다.

다음 코드에서는 기본 모니터를 구현합니다.

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

요청을 받으면 해당 작업 ID에 대해 새 오케스트레이션 인스턴스가 만들어집니다. 인스턴스는 조건이 충족되거나 제한 시간이 만료될 때까지 상태를 폴링합니다. 지속성 타이머는 폴링 간격을 제어합니다. 그런 다음, 더 많은 작업을 수행하거나 오케스트레이션을 종료할 수 있습니다.

패턴 #5: 인간 상호 작용

자동화된 많은 프로세스에는 일종의 사용자 개입이 포함됩니다. 사용자는 클라우드 서비스만큼 가용성과 응답성이 높지 않으므로 자동화된 프로세스에 사람을 참여시키는 것은 까다로운 작업입니다. 자동화된 프로세스에서는 시간 제한 및 보정 논리를 사용하여 이 상호 작용을 허용할 수 있습니다.

승인 프로세스는 사용자 개입을 수반하는 비즈니스 프로세스의 한 예입니다. 특정 달러 금액을 초과하는 경비 보고서에는 관리자의 승인이 필요할 수도 있습니다. 관리자가 72시간 이내에 경비 보고서를 승인하지 않으면(관리자가 휴가 중이었을 수도 있음) 에스컬레이션 프로세스가 시작되어 다른 사람(아마도 관리자의 관리자)의 승인을 받습니다.

사용자 개입 패턴의 다이어그램

이 예제에서는 오케스트레이터 함수를 사용하여 패턴을 구현할 수 있습니다. 오케스트레이터는 지속성 타이머를 사용하여 승인을 요청합니다. 시간이 초과되면 오케스트레이터가 에스컬레이션됩니다. 오케스트레이터는 사용자 개입으로 생성된 알림과 같은 외부 이벤트를 기다립니다.

다음 예제에서는 사용자 개입 패턴을 보여 주는 승인 프로세스를 만듭니다.

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

지속성 타이머를 만들려면 context.CreateTimer를 호출합니다. context.WaitForExternalEvent에서 알림을 받습니다. 그런 다음, Task.WhenAny를 호출하여 에스컬레이션할지(시간 제한이 먼저 발생함) 또는 승인을 처리할지(시간 제한 전에 승인이 수신됨)를 결정합니다.

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

지속성 타이머를 만들려면 context.df.createTimer를 호출합니다. context.df.waitForExternalEvent에서 알림을 받습니다. 그런 다음, context.df.Task.any를 호출하여 에스컬레이션할지(시간 제한이 먼저 발생함) 또는 승인을 처리할지(시간 제한 전에 승인이 수신됨)를 결정합니다.

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

지속성 타이머를 만들려면 context.create_timer를 호출합니다. context.wait_for_external_event에서 알림을 받습니다. 그런 다음, context.task_any를 호출하여 에스컬레이션할지(시간 제한이 먼저 발생함) 또는 승인을 처리할지(시간 제한 전에 승인이 수신됨)를 결정합니다.

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

지속성 타이머를 만들려면 Start-DurableTimer를 호출합니다. Start-DurableExternalEventListener에서 알림을 받습니다. 그런 다음, Wait-DurableTask를 호출하여 에스컬레이션할지(시간 제한이 먼저 발생함) 또는 승인을 처리할지(시간 제한 전에 승인이 수신됨)를 결정합니다.

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

ctx.waitForExternalEvent(...).await() 메서드 호출은 boolean 페이로드가 있는 ApprovalEvent라는 이벤트를 받을 때까지 오케스트레이션을 일시 중지합니다. 이벤트가 수신되면 승인 결과를 처리하기 위해 활동 함수가 호출됩니다. 그러나 timeout(72시간)이 만료되기 전에 이러한 이벤트가 수신되지 않으면 TaskCanceledException이 발생하고 Escalate 활동 함수가 호출됩니다.

참고 항목

Consumption 계획에서 실행할 때 외부 이벤트를 기다리는 데 소요된 시간에는 요금이 부과되지 않습니다.

외부 클라이언트는 기본 제공 HTTP API를 사용하여 이벤트 알림을 대기 중인 오케스트레이터 함수에 전달할 수 있습니다.

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

동일한 함수 앱의 다른 함수에서 지속성 오케스트레이션 클라이언트를 사용하여 이벤트를 발생시킬 수도 있습니다.

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

패턴 #6: 집계(상태 저장 엔터티)

여섯 번째 패턴은 일정 기간 동안의 이벤트 데이터를 주소 지정 가능한 단일 엔터티로 집계하는 것입니다. 이 패턴에서 집계되는 데이터는 여러 원본에서 제공되거나, 일괄 처리로 전달되거나, 장기간에 걸쳐 분산될 수 있습니다. 집계는 도착한 이벤트 데이터에 대한 작업을 수행해야 할 수 있으며, 외부 클라이언트는 집계된 데이터를 쿼리해야 할 수도 있습니다.

집계 다이어그램

일반적인 상태 비저장 함수를 사용하여 이러한 패턴을 구현하려고 할 때 어려운 점은 동시성 제어가 엄청난 과제가 된다는 것입니다. 여러 스레드에서 동일한 데이터를 동시에 수정하는 것과 집계가 한 번에 하나의 VM에서만 실행되도록 하는 것에 대해 걱정할 필요가 있습니다.

지속성 엔터티를 사용하여 이 패턴을 단일 함수로 쉽게 구현할 수 있습니다.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

지속성 엔터티는 .NET에서 클래스로 모델링할 수도 있습니다. 작업 목록이 고정되어 커지는 경우 이 모델이 유용할 수 있습니다. 다음 예제에서는 .NET 클래스와 메서드를 사용하여 Counter 엔터티를 동일하게 구현합니다.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

참고 항목

지속성 엔터티는 현재 PowerShell에서 지원되지 않습니다.

참고 항목

지속성 엔터티는 현재 Java에서 지원되지 않습니다.

클라이언트는 엔터티 클라이언트 바인딩을 사용하여 엔터티 함수에 대한 작업("신호 보내기"라고도 함)을 큐에 넣을 수 있습니다.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

참고 항목

동적으로 생성된 프록시는 .NET에서 엔터티에 대한 신호를 형식이 안전한 방식으로 보내는 데에도 사용할 수 있습니다. 그리고 신호를 보내는 것 외에도 클라이언트는 오케스트레이션 클라이언트 바인딩에서 형식이 안전한 메서드를 사용하여 엔터티 함수의 상태를 쿼리할 수 있습니다.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

엔터티 함수는 C#, JavaScript 및 Python용 Durable Functions 2.0 이상에서 사용할 수 있습니다.

기술

Durable Functions 확장은 내부적으로 코드에서 워크플로를 작성하는 데 사용되는 GitHub의 오픈 소스 라이브러리인 지속성 작업 프레임워크를 기반으로 하여 빌드됩니다. Azure Functions가 Azure WebJobs의 서버리스 진화인 것처럼 Durable Functions는 지속성 작업 프레임워크의 서버리스 진화입니다. Microsoft 및 기타 조직에서는 지속성 작업 프레임워크를 광범위하게 사용하여 중요 업무용 프로세스를 자동화합니다. 이는 서버를 사용하지 않는 Azure Functions 환경에 적합합니다.

코드 제약 조건

안정적이고 장기 실행되는 실행을 보장하기 위해 오케스트레이터 함수에는 따라야 하는 코딩 규칙 세트가 있습니다. 자세한 내용은 오케스트레이터 함수 코드 제약 조건 문서를 참조하세요.

결제

Durable Functions 요금은 Azure Functions와 동일하게 청구됩니다. 자세한 내용은 Azure Functions 가격 책정을 참조하세요. Azure Functions 소비 계획에서 오케스트레이터 함수를 실행할 때 알고 있어야 할 몇 가지 청구 동작이 있습니다. 이러한 동작에 대한 자세한 내용은 Durable Functions 청구 문서를 참조하세요.

지금 바로 시작

다음 언어별 빠른 시작 자습서 중 하나를 완료하여 10분 이내에 Durable Functions를 시작할 수 있습니다.

이러한 빠른 시작에서는 "hello world" 지속성 함수를 로컬에서 만들고 테스트합니다. 그런 후 함수 코드를 Azure에 게시합니다. 생성한 함수는 다른 함수에 대한 호출을 오케스트레이션하고 함께 연결합니다.

게시

Durable Functions는 Microsoft Research와 공동으로 개발되었습니다. 결과적으로 Durable Functions 팀에서 연구 논문과 아티팩트를 적극적으로 생성합니다. 여기에는 다음이 포함되었습니다.

자세한 정보

다음 비디오는 Durable Functions의 이점을 강조합니다.

Durable Functions는 Azure Functions의 고급 확장이므로 모든 애플리케이션에 적합하지는 않습니다. 다른 Azure 오케스트레이션 기술과 비교해 보려면 Azure Functions 및 Azure Logic Apps 비교를 참조하세요.

다음 단계