Hızlı Başlangıç: Azure Container Apps'te Dayanıklı Görev SDK'sı uygulaması barındırma

Önemli

Şu anda PowerShell Dayanıklı Görev SDK'sı kullanılamıyor.

Bu hızlı başlangıçta şunları yapmayı öğrenirsiniz:

  • Dayanıklı Görev Zamanlayıcı örnek projesini kopyalayın ve hazırlayın.
  • Azure Developer CLI kullanarak çalışan ve istemci uygulamalarını Azure Container Apps dağıtın.
  • Azure Container Apps günlük akışlarını kullanarak dağıtımı doğrulayın.
  • Dayanıklı Görev Zamanlayıcı panosu aracılığıyla düzenleme durumunu ve geçmişini gözden geçirin.

Önkoşullar

Başlamadan önce:

Projeyi hazırlama

Yeni bir terminal penceresinde, kopyalanan Azure-Samples/Durable-Task-Scheduler dizininden işlev zincirleme örneğine gidin:

cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining
cd /samples/durable-task-sdks/javascript/function-chaining

Azure Geliştirici CLI'sı kullanarak dağıtma

Azure Geliştirici CLI'sı (azd) tüm gerekli Azure altyapısını sağlar ve hem çalışan hem de istemci uygulamalarını tek bir komutla dağıtır.

  1. Tek bir komutla altyapıyı sağlamak ve uygulamayı Azure Container Apps'e dağıtmak için komutunu çalıştırın azd up .

    azd up
    
  2. Terminalde istendiğinde aşağıdaki parametreleri sağlayın.

    Parametre Açıklama
    Ortam Adı Tüm Azure kaynaklarını barındırmak için oluşturulan kaynak grubunun ön eki.
    Azure Konumu Kaynaklarınızın Azure konumu.
    Azure aboneliği Kaynaklarınız için Azure aboneliği.

    Bu işlemin tamamlanması biraz zaman alabilir. azd up Komut tamamlandıktan sonra CLI çıkışında dağıtım ilerleme durumunu izlemek için iki Azure portalı bağlantısı görüntülenir. Çıktı, azd up ile ilgili nasıl olduğunu da gösterir.

    • ./infra dizininde sağlanan Bicep dosyaları aracılığıyla azd provision kullanarak gerekli tüm Azure kaynaklarını oluşturur ve yapılandırır. Azure Geliştirici CLI'sı tarafından sağlandıktan sonra bu kaynaklara Azure portalı üzerinden erişebilirsiniz. Azure kaynaklarını sağlayan dosyalar şunlardır:
      • main.parameters.json
      • main.bicep
      • app İşlevselliğe göre düzenlenmiş bir kaynak dizini
      • core şablonu tarafından kullanılan Bicep modüllerini içeren azd başvuru kitaplığı
    • Kodu azd deploy kullanarak dağıtır

    Beklenen çıkış

    Packaging services (azd package)
    
    (✓) Done: Packaging service client
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    (✓) Done: Packaging service worker
    - Image Hash: {IMAGE_HASH}
    - Target Image: {TARGET_IMAGE}
    
    
    Provisioning Azure resources (azd provision)
    Provisioning Azure resources can take some time.
    
    Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID)
    Location: West US 2
    
     You can view detailed progress in the Azure portal:
     https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT
    
     (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s)
     (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s)
     (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s)
     (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s)
     (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s)   
    
    Deploying services (azd deploy)
    
     (✓) Done: Deploying service client
     - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/
    
     (✓) Done: Deploying service worker
     - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/
    
    
    SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.   
    

Başarılı dağıtımı onaylayın

Azure portalında düzenlemelerin başarıyla çalıştığını doğrulayın.

  1. Kaynak grubu adını terminal çıkışından kopyalayın.

  2. Azure portalında oturum açın ve bu kaynak grubu adını arayın.

  3. Kaynak grubuna genel bakış sayfasından istemci kapsayıcı uygulaması kaynağını seçin.

  4. İzleme>Günlük akışı'nı seçin.

  5. Örnek kapsayıcı uygulamasının işlev zincirleme görevlerini günlüğe kaydetmesini onaylayın.

     Azure portalında Java örnek uygulamanın günlük akışının ekran görüntüsü.

  1. Kaynak grubu adını terminal çıkışından kopyalayın.

  2. Azure portalında oturum açın ve bu kaynak grubu adını arayın.

  3. Kaynak grubuna genel bakış sayfasından istemci kapsayıcı uygulaması kaynağını seçin.

  4. İzleme>Günlük akışı'nı seçin.

  5. İstemci kapsayıcısının işlev zincirleme görevlerini günlüğe kaydedip kaydetmediğini doğrulayın.

    Azure portalında istemci kapsayıcısının günlük akışının ekran görüntüsü.

  6. Kaynağı grubu sayfasına geri dönün ve worker kapsayıcıyı seçin.

  7. İzleme>Günlük akışı'nı seçin.

  8. Çalışan kapsayıcısının işlev zincirleme görevlerini kaydettiğini onaylayın.

    Azure portalında worker container'ın günlük akışının ekran görüntüsü.

Dayanıklı Görev Zamanlayıcı panosunu kullanarak düzenleme durumunu ve geçmişini de gözden geçirebilirsiniz. Daha fazla bilgi için bkz Dayanıklı Görev Zamanlayıcı panosu.

Kodu anlama

Müşteri Projesi

Müşteri Projesi:

  • Çalışanla aynı bağlantı dizesi mantığını kullanır
  • Aşağıdakilere sahip sıralı bir düzenleme zamanlayıcısı uygular:
    • Birer birer 20 orkestrasyon örneğini planlar
    • Her düzenlemeyi zamanlama arasında 5 saniye bekler
    • Listedeki tüm düzenleme örneklerini izler
    • Çıkmadan önce tüm düzenlemelerin tamamlanmasını bekler
  • İlerleme durumunu ve sonuçları göstermek için standart günlüğü kullanır
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
    // Create a unique instance ID
    string instanceName = $"{name}_{i+1}";

    // Schedule the orchestration
    string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
        "GreetingOrchestration", 
        instanceName);

    // Wait 5 seconds before scheduling the next one
    await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}

// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
    OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
        id, getInputsAndOutputs: false, CancellationToken.None);
}

İşçi Projesi

Çalışan projesi aşağıdakileri içerir:

  • GreetingOrchestration.cs: Tek bir dosyada düzenleyici ve etkinlik işlevlerini tanımlar
  • Program.cs: Çalışan host'u uygun bağlantı dizesi yönetimi ile ayarlar

Orkestrasyon Uygulaması

Orkestrasyon, standart CallActivityAsync yöntemi kullanarak her etkinliği sırayla doğrudan çağırır.

public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
    // Step 1: Say hello to the person
    string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);

    // Step 2: Process the greeting
    string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);

    // Step 3: Finalize the response
    string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);

    return finalResponse;
}

Her etkinlik, özniteliğiyle [DurableTask] süslenmiş ayrı bir sınıf olarak uygulanır:

[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
    // Implementation details
}

Çalışan, uygun yaşam döngüsü yönetimi için kullanır Microsoft.Extensions.Hosting :

var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
    .AddTasks(registry => {
        registry.AddAllGeneratedTasks();
    })
    .UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();

Müşteri

Müşteri Projesi:

  • Çalışanla aynı bağlantı dizesi mantığını kullanır
  • Aşağıdakilere sahip sıralı bir düzenleme zamanlayıcısı uygular:
    • Birer birer 20 orkestrasyon örneğini planlar
    • Her düzenlemeyi zamanlama arasında 5 saniye bekler
    • Listedeki tüm düzenleme örneklerini izler
    • Çıkmadan önce tüm düzenlemelerin tamamlanmasını bekler
  • İlerleme durumunu ve sonuçları göstermek için standart günlüğü kullanır
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
    try:
        # Create a unique instance name
        instance_name = f"{name}_{i+1}"
        logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")

        # Schedule the orchestration
        instance_id = client.schedule_new_orchestration(
            "function_chaining_orchestrator",
            input=instance_name
        )

        instance_ids.append(instance_id)
        logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")

        # Wait before scheduling next orchestration (except for the last one)
        if i < TOTAL_ORCHESTRATIONS - 1:
            logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
        await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
    try:
        logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
        result = client.wait_for_orchestration_completion(
            instance_id,
            timeout=120
        )

Çalışan

Orkestrasyon Uygulaması

Düzenleme, standart call_activity işlevi kullanarak her etkinliği sırayla doğrudan çağırır:

# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
    """Orchestrator that demonstrates function chaining pattern."""
    logger.info(f"Starting function chaining orchestration for {name}")

    # Call first activity - passing input directly without named parameter
    greeting = yield ctx.call_activity('say_hello', input=name)

    # Call second activity with the result from first activity
    processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)

    # Call third activity with the result from second activity
    final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)

    return final_response

Her etkinlik ayrı bir işlev olarak uygulanır:

# Activity functions
def say_hello(ctx, name: str) -> str:
    """First activity that greets the user."""
    logger.info(f"Activity say_hello called with name: {name}")
    return f"Hello {name}!"

def process_greeting(ctx, greeting: str) -> str:
    """Second activity that processes the greeting."""
    logger.info(f"Activity process_greeting called with greeting: {greeting}")
    return f"{greeting} How are you today?"

def finalize_response(ctx, response: str) -> str:
    """Third activity that finalizes the response."""
    logger.info(f"Activity finalize_response called with response: {response}")
    return f"{response} I hope you're doing well!"

Çalışan, uygun yaşam döngüsü yönetimi için kullanır DurableTaskSchedulerWorker :

with DurableTaskSchedulerWorker(
    host_address=host_address, 
    secure_channel=endpoint != "http://localhost:8080",
    taskhub=taskhub_name, 
    token_credential=credential
) as worker:

    # Register activities and orchestrators
    worker.add_activity(say_hello)
    worker.add_activity(process_greeting)
    worker.add_activity(finalize_response)
    worker.add_orchestrator(function_chaining_orchestrator)

    # Start the worker (without awaiting)
    worker.start()

Örnek kapsayıcı uygulaması hem çalışan hem de istemci kodunu içerir.

Müşteri

İstemci kodu:

  • Çalışanla aynı bağlantı dizesi mantığını kullanır
  • Aşağıdakilere sahip sıralı bir düzenleme zamanlayıcısı uygular:
    • Birer birer 20 orkestrasyon örneğini planlar
    • Her düzenlemeyi zamanlama arasında 5 saniye bekler
    • Listedeki tüm düzenleme örneklerini izler
    • Çıkmadan önce tüm düzenlemelerin tamamlanmasını bekler
  • İlerleme durumunu ve sonuçları göstermek için standart günlüğü kullanır
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null 
    ? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();

// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
        "ActivityChaining",
        new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);

// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
        instanceId,
        Duration.ofSeconds(30),
        true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))

Çalışan

Orkestrasyon, standart callActivity yöntemi kullanarak her etkinliği sırayla doğrudan çağırır.

DurableTaskGrpcWorker worker = (credential != null 
    ? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
    : DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
    .addOrchestration(new TaskOrchestrationFactory() {
        @Override
        public String getName() { return "ActivityChaining"; }

        @Override
        public TaskOrchestration create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                String x = ctx.callActivity("Reverse", input, String.class).await();
                String y = ctx.callActivity("Capitalize", x, String.class).await();
                String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
                ctx.complete(z);
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Reverse"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                StringBuilder builder = new StringBuilder(input);
                builder.reverse();
                return builder.toString();
            };
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "Capitalize"; }

        @Override
        public TaskActivity create() {
            return ctx -> ctx.getInput(String.class).toUpperCase();
        }
    })
    .addActivity(new TaskActivityFactory() {
        @Override
        public String getName() { return "ReplaceWhitespace"; }

        @Override
        public TaskActivity create() {
            return ctx -> {
                String input = ctx.getInput(String.class);
                return input.trim().replaceAll("\\s", "-");
            };
        }
    })
    .build();

// Start the worker
worker.start();

Müşteri

İstemci kodu:

  • Çalışanla aynı bağlantı dizesi mantığını kullanır
  • Aşağıdakilere sahip sıralı bir düzenleme zamanlayıcısı uygular:
    • Birer birer 20 orkestrasyon örneğini planlar
    • Her düzenlemeyi zamanlama arasında 5 saniye bekler
    • Listedeki tüm düzenleme örneklerini izler
    • Çıkmadan önce tüm düzenlemelerin tamamlanmasını bekler
  • İlerleme durumunu ve sonuçları göstermek için standart günlüğü kullanır
const TOTAL_ORCHESTRATIONS = Number(process.env.TOTAL_ORCHESTRATIONS ?? 20);
const INTERVAL_SECONDS = Number(process.env.ORCHESTRATION_INTERVAL ?? 5);

const orchestrationIds = [];

for (let index = 0; index < TOTAL_ORCHESTRATIONS; index += 1) {
    const orchestrationInput = `${baseName}_${index + 1}`;

    const instanceId = await client.scheduleNewOrchestration(
        "functionChainingOrchestrator",
        orchestrationInput
    );

    orchestrationIds.push(instanceId);

    if (index < TOTAL_ORCHESTRATIONS - 1) {
        await sleep(INTERVAL_SECONDS * 1000);
    }
}

for (const instanceId of orchestrationIds) {
    const state = await client.waitForOrchestrationCompletion(instanceId, true, 120);
}

Çalışan

Orkestrasyon Uygulaması

Orkestrasyon, standart callActivity yöntemi kullanarak her etkinliği sırayla doğrudan çağırır.

const functionChainingOrchestrator = async function* functionChainingOrchestrator(ctx, name) {
    const greeting = yield ctx.callActivity(sayHello, name);
    const processedGreeting = yield ctx.callActivity(processGreeting, greeting);
    const finalResponse = yield ctx.callActivity(finalizeResponse, processedGreeting);

    return finalResponse;
};

Her etkinlik ayrı bir işlev olarak uygulanır:

const sayHello = async (_ctx, name) => {
    const safeName = typeof name === "string" && name.length ? name : "User";
    return `Hello ${safeName}!`;
};

const processGreeting = async (_ctx, greeting) => {
    const value = typeof greeting === "string" ? greeting : "Hello User!";
    return `${value} How are you today?`;
};

const finalizeResponse = async (_ctx, response) => {
    const value = typeof response === "string" ? response : "Hello User! How are you today?";
    return `${value} I hope you're doing well!`;
};

Çalışan, uygun yaşam döngüsü yönetimi için kullanır createAzureManagedWorkerBuilder :

worker = getWorkerBuilder()
    .addOrchestrator(functionChainingOrchestrator)
    .addActivity(sayHello)
    .addActivity(processGreeting)
    .addActivity(finalizeResponse)
    .build();

await worker.start();

Kaynakları temizle

Testi tamamladığınızda dağıtılan kaynakları kaldırın:

azd down

Sonraki Adımlar