Bagikan melalui


Apa itu Durable Functions?

Durable Functions adalah fitur Azure Functions yang memungkinkan Anda menulis fungsi stateful di lingkungan komputasi tanpa server. Ekstensi ini memungkinkan Anda menentukan alur kerja stateful dengan menulis fungsi orkestrator dan entitas stateful dengan menulis fungsi entitas menggunakan model pemrograman Azure Functions. Di balik layar, ekstensi ini mengelola keadaan, titik pemeriksaan, dan memulai ulang untuk Anda, memungkinkan Anda untuk fokus pada logika bisnis Anda.

Bahasa yang didukung

Durable Functions dirancang untuk bekerja dengan semua bahasa pemrograman Azure Functions tetapi mungkin memiliki persyaratan minimum yang berbeda untuk setiap bahasa. Tabel berikut menunjukkan konfigurasi aplikasi minimum yang didukung:

Tumpukan bahasa komputer Versi runtime Azure Functions Versi pekerja bahasa Versi bundel minimum
.NET / C# / F# Fungsi 1.0+ Sedang dalam proses
Di luar proses
n/a
JavaScript/TypeScript (v3 prog. model) Fungsi 2.0+ Node 8+ Paket 2.x
JavaScript/TypeScript (model prog. v4) Fungsi 4.25+ Node 18+ 3.15+ paket
Python Fungsi 2.0+ Python 3.7+ Paket 2.x
Python (model prog. v2) Functions 4.0+ Python 3.7+ 3.15+ bundel
PowerShell Functions 3.0+ PowerShell 7+ Paket 2.x
Java Fungsi 4.0+ Java 8+ Paket 4.x

Penting

Artikel ini menggunakan tab untuk mendukung beberapa versi model pemrograman Node.js. Model v4 umumnya tersedia dan dirancang untuk memiliki pengalaman yang lebih fleksibel dan intuitif untuk pengembang JavaScript dan TypeScript. Untuk detail selengkapnya tentang cara kerja model v4, lihat panduan pengembang Node.js Azure Functions. Untuk mempelajari selengkapnya tentang perbedaan antara v3 dan v4, lihat panduan migrasi.

Penting

Artikel ini menggunakan tab untuk mendukung beberapa versi model pemrograman Python. Model v2 telah tersedia secara umum dan dirancang untuk menyediakan cara yang lebih berfokus pada kode untuk menulis fungsi melalui dekorator. Untuk informasi selengkapnya tentang model v2, lihat panduan pengembang Azure Functions Python.

Seperti Azure Functions, ada templat untuk membantu Anda mengembangkan Durable Functions menggunakan Visual Studio, Visual Studio Code, dan portal Azure.

Pola aplikasi

Kasus penggunaan utama Durable Functions adalah menyederhanakan persyaratan koordinasi yang kompleks dan stateful dalam aplikasi tanpa server. Bagian berikut menjelaskan pola aplikasi umum yang dapat mengambil keuntungan dari Durable Functions:

Pola #1: Penautan fungsi

Dalam pola penautan fungsi, serangkai fungsi dijalankan dalam urutan tertentu. Dalam pola ini, output dari satu fungsi diterapkan ke input fungsi lain. Penggunaan antrean antara setiap fungsi memastikan bahwa sistem tetap tahan lama dan dapat diskalakan, meskipun ada aliran kontrol dari satu fungsi ke fungsi berikutnya.

Diagram pola penautan fungsi.

Anda dapat menggunakan Durable Functions untuk menerapkan pola penautan fungsi secara ringkas seperti yang ditunjukkan dalam contoh berikut.

Dalam contoh ini, nilai F1, F2, F3, dan F4 adalah nama fungsi lain di dalam aplikasi fungsi yang sama. Anda dapat menerapkan alur kontrol dengan menggunakan konstruksi pengodean imperatif normal. Kode dijalankan dari atas ke bawah. Kode ini dapat menyertakan semantik alur kontrol bahasa komputer yang ada, seperti kondisional dan perulangan. Anda dapat menyertakan logika penanganan kesalahan dalam blok try/catch/finally.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

Anda dapat menggunakan parameter context untuk menjalankan fungsi lain dengan nama, mengoperkan parameter, dan mengembalikan output fungsi. Setiap kali kode memanggil await, kerangka kerja Durable Functions memeriksa kemajuan instans fungsi saat ini. Jika proses atau komputer virtual mendaur ulang di tengah-tengah eksekusi, instans fungsi akan melanjutkan dari panggilan await sebelumnya. Untuk informasi selengkapnya, lihat bagian berikutnya, Pola #2: Fan-out/fan-in.

Dalam contoh ini, nilai F1, F2, F3, dan F4 adalah nama fungsi lain di dalam aplikasi fungsi yang sama. Anda dapat menerapkan alur kontrol dengan menggunakan konstruksi pengodean imperatif normal. Kode dijalankan dari atas ke bawah. Kode ini dapat menyertakan semantik alur kontrol bahasa komputer yang ada, seperti kondisional dan perulangan. Anda dapat menyertakan logika penanganan kesalahan dalam blok try/catch/finally.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

Anda dapat menggunakan context.df objek untuk memanggil fungsi lain berdasarkan namanya, melewatkan parameter, dan mengembalikan keluaran fungsi. Setiap kali kode memanggil yield, kerangka kerja Durable Functions memeriksa kemajuan instans fungsi saat ini. Jika proses atau komputer virtual mendaur ulang di tengah-tengah eksekusi, instans fungsi akan melanjutkan dari panggilan yield sebelumnya. Untuk informasi selengkapnya, lihat bagian berikutnya, Pola #2: Fan-out/fan-in.

Catatan

Objek context dalam JavaScript mewakili seluruh konteks fungsi. Akses konteks Durable Functions menggunakan properti df pada konteks utama.

Dalam contoh ini, nilai F1, F2, F3, dan F4 adalah nama fungsi lain di dalam aplikasi fungsi yang sama. Anda dapat menerapkan alur kontrol dengan menggunakan konstruksi pengodean imperatif normal. Kode dijalankan dari atas ke bawah. Kode ini dapat menyertakan semantik alur kontrol bahasa komputer yang ada, seperti kondisional dan perulangan.

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result

main = df.Orchestrator.create(orchestrator_function)

Anda dapat menggunakan objek context untuk menjalankan fungsi lain berdasarkan nama, mengoper parameter, dan mengembalikan hasil fungsi. Setiap kali kode memanggil yield, kerangka kerja Durable Functions memeriksa kemajuan instans fungsi saat ini. Jika proses atau komputer virtual mendaur ulang di tengah-tengah eksekusi, instans fungsi akan melanjutkan dari panggilan yield sebelumnya. Untuk informasi selengkapnya, lihat bagian berikutnya, Pola #2: Fan-out/fan-in.

Catatan

Objek context di Python mewakili konteks orkestrasi. Akses konteks utama Azure Functions menggunakan properti function_context pada konteks orkestrasi.

Dalam contoh ini, nilai F1, F2, F3, dan F4 adalah nama fungsi lain di dalam aplikasi fungsi yang sama. Anda dapat menerapkan alur kontrol dengan menggunakan konstruksi pengodean imperatif normal. Kode dijalankan dari atas ke bawah. Kode ini dapat menyertakan semantik alur kontrol bahasa komputer yang ada, seperti kondisional dan perulangan.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Anda dapat menggunakan perintah Invoke-DurableActivity untuk menjalankan fungsi lain dengan nama, melewatkan parameter, dan mengembalikan keluaran fungsi. Setiap kali kode memanggil Invoke-DurableActivity tanpa sakelar NoWait, kerangka kerja Durable Functions memeriksa kemajuan instans fungsi saat ini. Jika proses atau komputer virtual mendaur ulang di tengah-tengah eksekusi, instans fungsi akan melanjutkan dari panggilan Invoke-DurableActivity sebelumnya. Untuk informasi selengkapnya, lihat bagian berikutnya, Pola #2: Fan-out/fan-in.

Dalam contoh ini, nilai F1, F2, F3, dan F4 adalah nama fungsi lain di dalam aplikasi fungsi yang sama. Anda dapat menerapkan alur kontrol dengan menggunakan konstruksi pengodean imperatif normal. Kode dijalankan dari atas ke bawah. Kode ini dapat menyertakan semantik alur kontrol bahasa komputer yang ada, seperti kondisional dan perulangan.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

Anda dapat menggunakan objek ctx untuk menjalankan fungsi lain dengan nama, mengirim parameter, dan mengembalikan hasil fungsi. Output metode ini adalah Task<V> objek di mana V adalah jenis data yang dikembalikan oleh fungsi yang dipanggil. Setiap kali Anda memanggil Task<V>.await(), kerangka kerja Durable Functions memeriksa kemajuan instans fungsi saat ini. Jika proses tiba-tiba didaur ulang di tengah eksekusi, instans fungsi akan melanjutkan dari panggilan Task<V>.await() sebelumnya. Untuk informasi selengkapnya, lihat bagian berikutnya, Pola #2: Fan-out/fan-in.

Pola #2: Penyebaran/Pengumpulan

Dalam pola fan-out/fan-in, Anda menjalankan beberapa fungsi secara paralel dan kemudian menunggu semua fungsi selesai. Seringkali, beberapa pekerjaan agregasi dilakukan pada hasil yang dikembalikan dari fungsi.

Diagram pola fan-out fan-in.

Dengan fungsi normal, Anda dapat melakukan fan out dengan meminta fungsi mengirim sejumlah pesan ke antrean. Melakukan fan in jauh lebih rumit. Untuk melakukan fan in dalam fungsi normal, Anda tulis kode untuk melacak kapan fungsi yang dipicu antrean berakhir, lalu simpan output fungsi.

Ekstensi Durable Functions menangani pola ini dengan kode yang relatif sederhana:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

Pekerjaan fan-out didistribusikan ke beberapa instance fungsi F2. Pekerjaan ini dilacak menggunakan daftar tugas dinamis. Task.WhenAll dipanggil untuk menunggu semua fungsi yang dipanggil selesai. Kemudian, output dari fungsi F2 diagregasikan dari daftar tugas dinamis dan diteruskan ke fungsi F3.

Pemeriksaan otomatis yang terjadi pada await panggilan Task.WhenAll memastikan bahwa potensi crash atau boot ulang di tengah jalan tidak akan menghidupkan ulang tugas yang sudah selesai.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

Pekerjaan fan-out didistribusikan ke beberapa instans F2 fungsi. Pekerjaan ini dilacak menggunakan daftar tugas dinamis. context.df.Task.all API dipanggil untuk menunggu semua fungsi yang dipanggil selesai. Kemudian, keluaran fungsi F2 diagregasikan dari daftar tugas dinamis dan diteruskan ke fungsi F3.

Pemeriksaan otomatis yang terjadi pada yield panggilan context.df.Task.all memastikan bahwa jika terjadi kemungkinan crash atau reboot di tengah proses, tidak perlu memulai ulang tugas yang sudah selesai.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

Pekerjaan fan-out didistribusikan ke beberapa instansi fungsi F2. Pekerjaan ini dilacak menggunakan daftar tugas dinamis. context.task_all API dipanggil untuk menunggu semua fungsi yang dipanggil selesai. Kemudian, F2 output fungsi diagregat dari daftar tugas dinamis dan diteruskan ke fungsi F3.

Pemeriksaan otomatis yang terjadi pada yield panggilan context.task_all memastikan bahwa potensi kerusakan atau restart di tengah proses tidak memerlukan pengulangan tugas yang sudah selesai.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

Pekerjaan fan-out didistribusikan ke beberapa instans fungsi F2. Perhatikan penggunaan pengalih NoWait pada pemanggilan fungsi F2: pengalih ini memungkinkan orkestrator untuk melanjutkan pemanggilan dari F2 tanpa menunggu penyelesaian aktivitas. Pekerjaan ini dilacak menggunakan daftar tugas dinamis. Perintah Wait-ActivityFunction dipanggil untuk menunggu semua fungsi yang dipanggil selesai. Kemudian, output fungsi F2 diagregasi dari daftar tugas dinamis dan diteruskan ke fungsi F3.

Pemeriksaan otomatis yang terjadi pada saat panggilan Wait-ActivityFunction memastikan bahwa kemungkinan crash atau setel ulang di tengah proses tidak memerlukan memulai ulang tugas yang sudah selesai.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

Pekerjaan fan-out didistribusikan ke beberapa instans fungsi F2. Pekerjaan ini dilacak menggunakan daftar tugas dinamis. ctx.allOf(parallelTasks).await() dipanggil untuk menunggu semua fungsi yang dipanggil selesai. Kemudian, output fungsi F2 diagregat dari daftar tugas dinamis dan dikembalikan sebagai output fungsi orkestrator.

Titik pemeriksaan otomatis yang terjadi pada panggilan .await() di ctx.allOf(parallelTasks) memastikan bahwa daur ulang proses yang tidak diharapkan tidak akan menghidupkan ulang tugas yang sudah selesai.

Catatan

Meskipun jarang terjadi, bukan tidak mungkin crash bisa terjadi dalam rentang waktu setelah fungsi aktivitas selesai tetapi sebelum hasilnya disimpan ke dalam riwayat orkestrasi. Jika ini terjadi, fungsi aktivitas akan dijalankan kembali dari awal setelah proses pulih.

Pola #3: API HTTP Asinkron

Pola API HTTP asinkron mengatasi masalah pengoordinasian status operasi jangka panjang dengan klien eksternal. Cara umum untuk menerapkan pola ini adalah dengan meminta titik akhir HTTP memicu tindakan jangka panjang tersebut. Kemudian, alihkan klien ke titik akhir status yang akan dicek klien secara berkala untuk mengetahui kapan operasi selesai.

Diagram yang memperlihatkan pola API HTTP.

Fungsi Tahan Lama menyediakan dukungan bawaan untuk pola ini, menyederhanakan atau bahkan menghapus kode yang perlu Anda tulis untuk berinteraksi dengan eksekusi fungsi yang berjalan lama. Misalnya, sampel mulai cepat Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell, dan Java) menunjukkan perintah REST sederhana yang dapat Anda gunakan untuk memulai instans fungsi orkestrator baru. Setelah instans dimulai, ekstensi mengekspos API HTTP webhook yang mengkueri status fungsi orkestrator.

Contoh berikut menampilkan perintah REST yang memulai orkestrator dan mengkueri statusnya. Sekadar kejelasan, beberapa detail protokol dihilangkan dari contoh ini.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Karena runtime bahasa umum Durable Functions mengelola status untuk Anda, Anda tidak perlu menerapkan mekanisme penelusuran status Anda sendiri.

Ekstensi Durable Functions mengekspos API HTTP bawaan yang mengelola orkestrasi jangka panjang. Anda dapat menerapkan pola ini sendiri dengan menggunakan pemicu fungsi Anda sendiri (seperti HTTP, antrean, atau Azure Event Hubs) dan pengikatan klien yang tahan lama. Misalnya, Anda dapat menggunakan pesan antrean untuk memicu penghentian. Atau, Anda mungkin menggunakan pemicu HTTP yang dilindungi oleh kebijakan autentikasi Microsoft Entra alih-alih API HTTP bawaan yang menggunakan kunci yang dihasilkan untuk autentikasi.

Untuk informasi selengkapnya, lihat artikel Fitur HTTP, yang menjelaskan cara mengekspos proses asinkron jangka panjang melalui HTTP menggunakan ekstensi Durable Functions.

Pola #4: Monitor

Pola monitor mengacu pada proses berulang yang fleksibel dalam alur kerja. Contohnya adalah melakukan polling sampai kondisi tertentu terpenuhi. Anda dapat menggunakan pemicu timer reguler untuk menangani skenario dasar, seperti pekerjaan pembersihan berkala, tetapi intervalnya statik dan durasi masa pakai instans sulit dikelola. Anda dapat menggunakan Durable Functions untuk membuat interval pengulangan yang fleksibel, mengelola masa pakai tugas, dan membuat banyak proses monitor dari satu orkestrasi.

Contoh pola monitor adalah membalikkan skenario API HTTP asinkron sebelumnya. Alih-alih mengekspos titik akhir bagi klien eksternal untuk memantau operasi jangka panjang, monitor jangka panjang menggunakan titik akhir eksternal, lalu menunggu perubahan status.

Diagram yang memperlihatkan pola monitor.

Dalam beberapa baris kode, Anda dapat menggunakan Durable Functions untuk membuat sejumlah monitor yang mengamati titik akhir arbitrer. Monitor dapat mengakhiri eksekusi ketika kondisi terpenuhi, atau fungsi lain dapat menggunakan klien orkestrasi tahan lama untuk mengakhiri monitor. Anda dapat mengubah interval wait monitor berdasarkan kondisi tertentu (misalnya, eksponensial backoff.)

Kode berikut akan menerapkan monitor dasar:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

Saat permintaan diterima, instans orkestrasi baru dibuat untuk ID pekerjaan tersebut. Instans melakukan polling status hingga kondisi terpenuhi atau hingga waktu habis berakhir. Timer tahan lama mengontrol interval polling. Kemudian, lebih banyak pekerjaan dapat dilakukan, atau orkestrasi dapat berakhir.

Pola #5: Interaksi manusia

Banyak proses otomatis yang melibatkan semacam interaksi manusia. Melibatkan manusia dalam proses otomatis itu rumit karena orang tidak selalu ada dan responsif seperti layanan cloud. Proses otomatis mungkin memungkinkan interaksi ini dengan menggunakan waktu habis dan logika kompensasi.

Proses persetujuan adalah contoh proses bisnis yang melibatkan interaksi manusia. Persetujuan dari manajer mungkin diperlukan untuk laporan pengeluaran yang melebihi jumlah dolar tertentu. Jika manajer tidak menyetujui laporan pengeluaran dalam waktu 72 jam (mungkin manajer pergi berlibur), proses eskalasi dimulai untuk mendapatkan persetujuan dari orang lain (mungkin manajer manajer).

Diagram pola interaksi manusia.

Anda dapat menerapkan pola dalam contoh ini dengan menggunakan fungsi orkestrator. Orkestrator menggunakan timer tahan lama untuk meminta persetujuan. Orkestrator meningkat jika waktu habis terjadi. Orkestrator menunggu kejadian eksternal, seperti pemberitahuan yang dibuat dari interaksi manusia.

Contoh-contoh ini membuat suatu proses persetujuan untuk menunjukkan pola interaksi manusia:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Untuk membuat timer tahan lama, hubungi context.CreateTimer. Pemberitahuan diterima oleh context.WaitForExternalEvent. Kemudian, Task.WhenAny dipanggil untuk memutuskan apakah akan meneruskan eskalasi (jika batas waktu tercapai terlebih dahulu) atau memproses persetujuan (jika persetujuan diterima sebelum batas waktu).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Untuk membuat timer tahan lama, hubungi context.df.createTimer. Pemberitahuan diterima oleh context.df.waitForExternalEvent. Kemudian, context.df.Task.any dipanggil untuk memutuskan apakah akan mengeskalasi masalah (batas waktu habis terjadi terlebih dahulu) atau memproses persetujuan (persetujuan diterima sebelum batas waktu habis).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Untuk membuat timer tahan lama, hubungi context.create_timer. Pemberitahuan diterima oleh context.wait_for_external_event. Kemudian, context.task_any dipanggil untuk memutuskan apakah akan meningkatkan (batas waktu terjadi lebih dulu) atau memproses persetujuan (persetujuan diterima sebelum batas waktu).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Untuk membuat timer tahan lama, hubungi Start-DurableTimer. Pemberitahuan diterima oleh Start-DurableExternalEventListener. Kemudian, Wait-DurableTask dipanggil untuk memutuskan apakah akan ditingkatkan (batas waktu terjadi terlebih dahulu) atau memproses persetujuan (persetujuan diterima sebelum batas waktu).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

Pemanggilan metode ctx.waitForExternalEvent(...).await() menjeda orkestrasi hingga menerima peristiwa bernama ApprovalEvent, yang memiliki payload boolean. Jika peristiwa diterima, fungsi aktivitas dipanggil untuk memproses hasil persetujuan. Namun, jika tidak ada peristiwa tersebut timeout yang diterima sebelum (72 jam) kedaluwarsa, dinaikkan TaskCanceledException , dan Escalate fungsi aktivitas dipanggil.

Catatan

Tidak ada biaya untuk waktu yang dihabiskan menunggu peristiwa eksternal ketika menggunakan paket Konsumsi.

Klien eksternal dapat mengirimkan pemberitahuan kejadian ke fungsi orkestrator yang menunggu dengan menggunakan API HTTP bawaan:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Sebuah peristiwa juga dapat dipicu menggunakan klien orkestrasi yang tahan terhadap kerusakan dari fungsi lain di dalam aplikasi fungsi yang sama.

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Pola #6: Agregator (entitas berstatus)

Pola keenam adalah tentang pengumpulan data kejadian selama periode waktu tertentu ke dalam satu entitas yang dapat ditangani. Dalam pola ini, data yang dikumpulkan mungkin berasal dari beberapa sumber, mungkin dikirimkan dalam batch, atau mungkin tersebar dalam jangka waktu yang lama. Agregator mungkin perlu mengambil tindakan pada data peristiwa saat tiba, dan klien eksternal mungkin perlu mengkueri data agregat.

Diagram yang menunjukkan agregator.

Hal yang rumit tentang mencoba menerapkan pola ini dengan fungsi-fungsi normal tanpa status adalah bahwa pengendalian konkurensi menjadi tantangan besar. Anda tidak hanya perlu khawatir tentang beberapa utas yang memodifikasi data yang sama pada saat yang sama, Anda juga perlu khawatir tentang memastikan bahwa agregator hanya berjalan pada satu komputer virtual pada satu waktu.

Anda dapat menggunakan Entitas Tahan Lama untuk menerapkan pola ini sebagai satu fungsi dengan mudah.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Entitas Tahan Lama juga dapat diubah sebagai kelas di .NET. Model ini dapat berguna jika daftar operasi ditetapkan dan membesar. Contoh berikut adalah penerapan setara Counter entitas menggunakan kelas dan metode .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Catatan

Entitas tahan lama saat ini tidak didukung di PowerShell.

Catatan

Entitas tahan lama saat ini tidak didukung di Java.

Klien dapat mengantrekan operasi untuk (juga dikenal sebagai penandaan) fungsi entitas menggunakan pengikatan klien entitas.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Catatan

Proksi yang dihasilkan secara dinamis juga tersedia di .NET untuk memberi sinyal kepada entitas dengan cara tipe-aman. Dan selain memberi sinyal, klien juga dapat mengkueri status fungsi entitas menggunakan metode aman jenis pada pengikatan klien orkestrasi.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Fungsi entitas tersedia dalam Durable Functions 2.0 ke atas untuk C#, JavaScript, dan Python.

Teknologi

Di belakang layar, ekstensi Durable Functions dibangun di atas Durable Task Framework, pustaka sumber terbuka di GitHub yang digunakan untuk menyusun alur kerja dalam kode. Seperti Azure Functions yang merupakan hasil evolusi tanpa server dari Azure WebJobs, Durable Functions adalah hasil evolusi tanpa server dari Durable Task Framework. Microsoft dan organisasi lain menggunakan Durable Task Framework secara ekstensif untuk mengotomatisasi proses misi penting. Ini sangat cocok untuk lingkungan tanpa server Azure Functions.

Batasan kode

Untuk memberikan jaminan eksekusi yang andal dan tahan lama, fungsi orkestrator memiliki seperangkat aturan pengodean yang harus diikuti. Untuk informasi selengkapnya, lihat Batasan kode fungsi orkestrator.

Penagihan

Durable Functions ditagih sama dengan Azure Functions. Untuk informasi selengkapnya, lihat Harga Azure Functions. Saat menjalankan fungsi orkestrator dalam paket Konsumsi Azure Functions, ada beberapa perilaku penagihan yang perlu diperhatikan. Untuk informasi selengkapnya tentang perilaku ini, lihat artikel Penagihan Durable Functions.

Langsung saja

Anda dapat memulai dengan Durable Functions dalam waktu kurang dari 10 menit dengan menyelesaikan salah satu tutorial cepat yang spesifik untuk bahasa tertentu ini.

Dalam panduan memulai cepat ini, Anda secara lokal membuat dan menguji fungsi Hello world yang tahan lama. Kemudian Anda terbitkan kode fungsi ke Azure. Fungsi yang Anda buat ini mengoordinasikan dan menautkan panggilan ke fungsi lain.

Publikasi

Durable Functions dikembangkan bekerja sama dengan Microsoft Research. Akibatnya, tim Durable Functions secara aktif menghasilkan makalah penelitian dan artefak; Ini termasuk:

Video demo

Video berikut menyorot manfaat Durable Functions:

Opsi orkestrasi lainnya

Durable Functions adalah ekstensi tingkat lanjut untuk Azure Functions, dan mungkin tidak sesuai untuk semua aplikasi. Untuk perbandingan dengan teknologi orkestrasi Azure lainnya, lihat Membandingkan Azure Functions dan Azure Logic Apps.