Gyors kezdés: Durable Task SDK alkalmazás üzemeltetése Azure Container Apps-on

Fontos

A PowerShell Durable Task SDK jelenleg nem érhető el.

Ezen rövid útmutató segítségével megtanulhatja a következőket:

  • Klónozza és előkészíti a Durable Task Scheduler mintaprojektet.
  • Futtassa a munkavégző és kliens alkalmazásokat az Azure Container Apps-on az Azure Developer CLI használatával.
  • Ellenőrizze az üzembe helyezést Azure Container Apps naplóstreamek használatával.
  • Ellenőrizze az orchestration állapotát és előzményeit a Durable Task Scheduler irányítópultján.

Előfeltételek

Mielőtt hozzákezdene:

  • Győződjön meg arról, hogy a Python 3.9+ vagy újabb verzióval rendelkezik.
  • Telepítse a Dockert. A Docker szükséges a tárolólemezképek üzembe helyezése során történő létrehozásához.
  • Az Azure Developer CLI telepítése
  • Klónozza a Durable Task Scheduler GitHub-adattárat a gyorsútmutató-minta használatához.
  • Győződjön meg arról, hogy Java 8 vagy 11 verzióval rendelkezik.
  • Telepítse a Dockert. A Docker szükséges a tárolólemezképek üzembe helyezése során történő létrehozásához.
  • Az Azure Developer CLI telepítése
  • Klónozza a Durable Task Scheduler GitHub-adattárat a gyorsútmutató-minta használatához.
  • Győződjön meg arról, hogy 22 -Node.js vagy újabb verzióval rendelkezik.
  • Telepítse a Dockert. A Docker szükséges a tárolólemezképek üzembe helyezése során történő létrehozásához.
  • Az Azure Developer CLI telepítése
  • Klónozza a Durable Task Scheduler GitHub-adattárat a gyorsútmutató-minta használatához.

A projekt előkészítése

Egy új terminálablakban, a klónozott Azure-Samples/Durable-Task-Scheduler könyvtárból keresse meg a függvényláncoló mintát:

cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining
cd /samples/durable-task-sdks/javascript/function-chaining

Üzembe helyezés az Azure Developer CLI használatával

A Azure fejlesztői parancssori felület (azd) minden szükséges Azure infrastruktúrát kiépít, és egyetlen parancsban helyezi üzembe a feldolgozó és az ügyfélalkalmazást is.

  1. Egyszerűen futtassa a azd up parancsot, hogy előállítsa az infrastruktúrát és telepítse az alkalmazást az Azure Container Apps alkalmazásban.

    azd up
    
  2. Amikor a rendszer kéri a terminálban, adja meg a következő paramétereket.

    Paraméter Leírás
    Környezet neve Az összes Azure-erőforrás tárolására létrehozott erőforráscsoport előtagja.
    Azure-helyszín Az erőforrások Azure-beli helye.
    Azure-előfizetés Az erőforrásokhoz tartozó Azure-előfizetés.

    Ez a folyamat eltarthat egy ideig. A azd up parancs befejeződése után a parancssori felület kimenete két Azure portálhivatkozást jelenít meg az üzembehelyezési folyamat figyeléséhez. A kimenet azt is bemutatja, hogyan azd up:

    • Létrehozza és konfigurálja az összes szükséges Azure erőforrást a ./infra könyvtárban található Bicep fájlokon keresztül a azd provision használatával. Az Azure Developer CLI üzembe helyezése után ezeket az erőforrásokat az Azure Portalon érheti el. Az Azure-erőforrásokat kiosztó fájlok a következők:
      • main.parameters.json
      • main.bicep
      • Erőforrások app könyvtára funkciók szerint rendszerezve
      • Egy core referenciatár, amely a azd sablon által használt Bicep modulokat tartalmazza
    • A kód üzembe helyezése a következő használatával: azd deploy

    Várt kimenet

    Packaging services (azd package)
    
    (✓) Done: Packaging service client
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    (✓) Done: Packaging service worker
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    Provisioning Azure resources (azd provision)
    Provisioning Azure resources can take some time.
    
    Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID)
    Location: West US 2
    
     You can view detailed progress in the Azure portal:
     https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT
    
     (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s)
     (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s)
     (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s)
     (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s)
     (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s)   
    
    Deploying services (azd deploy)
    
     (✓) Done: Deploying service client
     - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/
    
     (✓) Done: Deploying service worker
     - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/
    
    
    SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.   
    

Sikeres üzembe helyezés megerősítése

Az Azure portálon ellenőrizze, hogy az orchestrációk sikeresen futnak-e.

  1. Másolja ki az erőforráscsoport nevét a terminál kimenetéből.

  2. Jelentkezzen be a Azure portálra és keresse meg az erőforráscsoport nevét.

  3. Az erőforráscsoport áttekintési lapján válassza ki az ügyféltároló alkalmazás erőforrását.

  4. Válassza a Figyelési>naplóstream lehetőséget.

  5. Ellenőrizze, hogy a mintatároló-alkalmazás naplózta-e a függvényláncolási feladatokat.

    Képernyőkép a Java-mintaalkalmazás naplóstreaméről az Azure Portalon.

  1. Másolja ki az erőforráscsoport nevét a terminál kimenetéből.

  2. Jelentkezzen be a Azure portálra és keresse meg az erőforráscsoport nevét.

  3. Az erőforráscsoport áttekintési lapján válassza ki az ügyféltároló alkalmazás erőforrását.

  4. Válassza a Figyelési>naplóstream lehetőséget.

  5. Ellenőrizze, hogy az ügyféltároló naplóozza-e a függvényláncolási feladatokat.

    Az Azure portálon a kliens konténer naplófolyamának képernyőképe.

  6. Lépjen vissza az erőforráscsoport lapjára a worker tároló kiválasztásához.

  7. Válassza a Figyelési>naplóstream lehetőséget.

  8. Ellenőrizze, hogy a munkatároló naplózza-e a függvényláncolási feladatokat.

    A munkavégző konténer naplóstreamjének képernyőképe az Azure portalon.

Az orchestration állapotát és előzményeit a Durable Task Scheduler irányítópultján is áttekintheti. További információ: Durable Task Scheduler irányítópult.

A kód értelmezése

Ügyfélprojekt

Az ügyfélprojekt:

  • Ugyanazt a kapcsolat string logikát használja, mint a dolgozó
  • Egy szekvenciális ütemezőt implementál, amely lehetővé teszi, hogy:
    • Egyenként 20 vezénylési példányt ütemez.
    • 5 másodperc várakozás az egyes vezénylések ütemezése között
    • Nyomon követi a listában szereplő összes orkesztrációs folyamatot
    • Várakozás az összes vezénylés befejezésére a kilépés előtt
  • Szabványos naplózás használatával jeleníti meg az előrehaladást és az eredményeket
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
    // Create a unique instance ID
    string instanceName = $"{name}_{i+1}";

    // Schedule the orchestration
    string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
        "GreetingOrchestration", 
        instanceName);

    // Wait 5 seconds before scheduling the next one
    await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}

// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
    OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
        id, getInputsAndOutputs: false, CancellationToken.None);
}

Dolgozói projekt

A Munkás projekt a következőket tartalmazza:

  • GreetingOrchestration.cs: A vezénylési és tevékenységfüggvények meghatározása egyetlen fájlban
  • Program.cs: A feldolgozó gazdagép beállítása megfelelő kapcsolati karakterlánc kezeléssel

Vezénylés implementálása

A vezénylés közvetlenül meghívja az egyes tevékenységeket sorrendben a standard CallActivityAsync módszerrel:

public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
    // Step 1: Say hello to the person
    string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);

    // Step 2: Process the greeting
    string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);

    // Step 3: Finalize the response
    string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);

    return finalResponse;
}

Minden tevékenység egy külön osztályként van megvalósítva, az [DurableTask] attribútummal díszítve:

[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
    // Implementation details
}

A munkás a Microsoft.Extensions.Hosting használja megfelelő életciklus-kezeléshez.

var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
    .AddTasks(registry => {
        registry.AddAllGeneratedTasks();
    })
    .UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();

Ügyfél

Az ügyfélprojekt:

  • Ugyanazt a kapcsolat string logikát használja, mint a dolgozó
  • Egy szekvenciális ütemezőt implementál, amely lehetővé teszi, hogy:
    • Egyenként 20 vezénylési példányt ütemez.
    • 5 másodperc várakozás az egyes vezénylések ütemezése között
    • Nyomon követi a listában szereplő összes orkesztrációs folyamatot
    • Várakozás az összes vezénylés befejezésére a kilépés előtt
  • Szabványos naplózás használatával jeleníti meg az előrehaladást és az eredményeket
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
    try:
        # Create a unique instance name
        instance_name = f"{name}_{i+1}"
        logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")

        # Schedule the orchestration
        instance_id = client.schedule_new_orchestration(
            "function_chaining_orchestrator",
            input=instance_name
        )

        instance_ids.append(instance_id)
        logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")

        # Wait before scheduling next orchestration (except for the last one)
        if i < TOTAL_ORCHESTRATIONS - 1:
            logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
        await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
    try:
        logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
        result = client.wait_for_orchestration_completion(
            instance_id,
            timeout=120
        )

Munkás

Vezénylés implementálása

Az orchesztráció közvetlenül hívja meg az egyes tevékenységeket sorrendben a standard call_activity függvény használatával.

# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
    """Orchestrator that demonstrates function chaining pattern."""
    logger.info(f"Starting function chaining orchestration for {name}")

    # Call first activity - passing input directly without named parameter
    greeting = yield ctx.call_activity('say_hello', input=name)

    # Call second activity with the result from first activity
    processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)

    # Call third activity with the result from second activity
    final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)

    return final_response

Minden tevékenység külön függvényként van implementálva:

# Activity functions
def say_hello(ctx, name: str) -> str:
    """First activity that greets the user."""
    logger.info(f"Activity say_hello called with name: {name}")
    return f"Hello {name}!"

def process_greeting(ctx, greeting: str) -> str:
    """Second activity that processes the greeting."""
    logger.info(f"Activity process_greeting called with greeting: {greeting}")
    return f"{greeting} How are you today?"

def finalize_response(ctx, response: str) -> str:
    """Third activity that finalizes the response."""
    logger.info(f"Activity finalize_response called with response: {response}")
    return f"{response} I hope you're doing well!"

A munkás a DurableTaskSchedulerWorker használja megfelelő életciklus-kezeléshez.

with DurableTaskSchedulerWorker(
    host_address=host_address, 
    secure_channel=endpoint != "http://localhost:8080",
    taskhub=taskhub_name, 
    token_credential=credential
) as worker:

    # Register activities and orchestrators
    worker.add_activity(say_hello)
    worker.add_activity(process_greeting)
    worker.add_activity(finalize_response)
    worker.add_orchestrator(function_chaining_orchestrator)

    # Start the worker (without awaiting)
    worker.start()

A példakonténer-alkalmazás tartalmazza mind a feldolgozó, mind a klienskódot.

Ügyfél

Az ügyfélkód:

  • Ugyanazt a kapcsolat string logikát használja, mint a dolgozó
  • Egy szekvenciális ütemezőt implementál, amely lehetővé teszi, hogy:
    • Egyenként 20 vezénylési példányt ütemez.
    • 5 másodperc várakozás az egyes vezénylések ütemezése között
    • Nyomon követi a listában szereplő összes orkesztrációs folyamatot
    • Várakozás az összes vezénylés befejezésére a kilépés előtt
  • Szabványos naplózás használatával jeleníti meg az előrehaladást és az eredményeket
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null 
    ? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();

// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
        "ActivityChaining",
        new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))

Munkás

A vezénylés közvetlenül meghívja az egyes tevékenységeket sorrendben a standard callActivity módszerrel:

DurableTaskGrpcWorker worker = (credential != null 
    ? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "ActivityChaining"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                String x = ctx.callActivity("Reverse", input, String.class).await();
                String y = ctx.callActivity("Capitalize", x, String.class).await();
                String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
                ctx.complete(z);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Reverse"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringBuilder builder = new StringBuilder(input);
                builder.reverse();
                return builder.toString();
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Capitalize"; }

        @Override
        public TaskActivity create() {
            return ctx -> ctx.getInput(String.class).toUpperCase();
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "ReplaceWhitespace"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                return input.trim().replaceAll("\\s", "-");
            };
        }
    })
    .build();

// Start the worker
worker.start();

Ügyfél

Az ügyfélkód:

  • Ugyanazt a kapcsolat string logikát használja, mint a dolgozó
  • Egy szekvenciális ütemezőt implementál, amely lehetővé teszi, hogy:
    • Egyenként 20 vezénylési példányt ütemez.
    • 5 másodperc várakozás az egyes vezénylések ütemezése között
    • Nyomon követi a listában szereplő összes orkesztrációs folyamatot
    • Várakozás az összes vezénylés befejezésére a kilépés előtt
  • Szabványos naplózás használatával jeleníti meg az előrehaladást és az eredményeket
const TOTAL_ORCHESTRATIONS = Number(process.env.TOTAL_ORCHESTRATIONS ?? 20);
const INTERVAL_SECONDS = Number(process.env.ORCHESTRATION_INTERVAL ?? 5);

const orchestrationIds = [];

for (let index = 0; index < TOTAL_ORCHESTRATIONS; index += 1) {
    const orchestrationInput = `${baseName}_${index + 1}`;

    const instanceId = await client.scheduleNewOrchestration(
        "functionChainingOrchestrator",
        orchestrationInput
    );

    orchestrationIds.push(instanceId);

    if (index < TOTAL_ORCHESTRATIONS - 1) {
        await sleep(INTERVAL_SECONDS * 1000);
    }
}

for (const instanceId of orchestrationIds) {
    const state = await client.waitForOrchestrationCompletion(instanceId, true, 120);
}

Munkás

Vezénylés implementálása

A vezénylés közvetlenül meghívja az egyes tevékenységeket sorrendben a standard callActivity módszerrel:

const functionChainingOrchestrator = async function* functionChainingOrchestrator(ctx, name) {
    const greeting = yield ctx.callActivity(sayHello, name);
    const processedGreeting = yield ctx.callActivity(processGreeting, greeting);
    const finalResponse = yield ctx.callActivity(finalizeResponse, processedGreeting);

    return finalResponse;
};

Minden tevékenység külön függvényként van implementálva:

const sayHello = async (_ctx, name) => {
    const safeName = typeof name === "string" && name.length ? name : "User";
    return `Hello ${safeName}!`;
};

const processGreeting = async (_ctx, greeting) => {
    const value = typeof greeting === "string" ? greeting : "Hello User!";
    return `${value} How are you today?`;
};

const finalizeResponse = async (_ctx, response) => {
    const value = typeof response === "string" ? response : "Hello User! How are you today?";
    return `${value} I hope you're doing well!`;
};

A munkás a createAzureManagedWorkerBuilder használja megfelelő életciklus-kezeléshez.

worker = getWorkerBuilder()
    .addOrchestrator(functionChainingOrchestrator)
    .addActivity(sayHello)
    .addActivity(processGreeting)
    .addActivity(finalizeResponse)
    .build();

await worker.start();

Erőforrások tisztítása

A tesztelés befejezése után távolítsa el az üzembe helyezett erőforrásokat:

azd down