Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Denetim noktası oluşturma, iş akışlarının durumlarını belirli noktalarda kaydetmesine ve işlem yeniden başlatıldıktan sonra bile yürütmeyi daha sonra sürdürmesine olanak tanır. Bu, uzun süre çalışan iş akışları, hata kurtarma ve döngüdeki insan senaryoları için çok önemlidir.
Ele Alınan Kavramlar
Önkoşullar
- .NET 8.0 SDK veya üzeri
- Yeni bir konsol uygulaması
Temel Bileşenler
NuGet paketlerini yükleme
İlk olarak, .NET projeniz için gerekli paketleri yükleyin:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
Checkpoint Yöneticisi
CheckpointManager kontrol noktası depolama ve alma işlevselliği sağlar.
using Microsoft.Agents.AI.Workflows;
// Use the default in-memory checkpoint manager
var checkpointManager = CheckpointManager.Default;
// Or create a custom checkpoint manager with JSON serialization
var checkpointManager = CheckpointManager.CreateJson(store, customOptions);
Kontrol Noktalarını Etkinleştirme
İş akışlarını InProcessExecution kullanarak yürütürken denetim noktası oluşturmayı etkinleştirin.
using Microsoft.Agents.AI.Workflows;
// Create workflow with checkpointing support
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;
// Execute with checkpointing enabled
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);
Durum Sürekliliği
Yürütücü Durumu
Yürütücüler, Executor<T> temel sınıfını kullanarak denetim noktalarından kurtulan yerel durumu kalıcı hale getirebilir.
internal sealed class GuessNumberExecutor : Executor<NumberSignal>("Guess")
{
private const string StateKey = "GuessNumberExecutor.State";
public int LowerBound { get; private set; }
public int UpperBound { get; private set; }
public GuessNumberExecutor() : this()
{
}
public override async ValueTask HandleAsync(NumberSignal message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
int guess = (LowerBound + UpperBound) / 2;
await context.SendMessageAsync(guess, cancellationToken);
}
/// <summary>
/// Checkpoint the current state of the executor.
/// This must be overridden to save any state that is needed to resume the executor.
/// </summary>
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default) =>
context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), cancellationToken);
/// <summary>
/// Restore the state of the executor from a checkpoint.
/// This must be overridden to restore any state that was saved during checkpointing.
/// </summary>
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{
var state = await context.ReadStateAsync<(int, int)>(StateKey, cancellationToken);
(LowerBound, UpperBound) = state;
}
}
Otomatik Denetim Noktası Oluşturma
Denetim noktası yöneticisi sağlandığında her süper adımın sonunda denetim noktaları otomatik olarak oluşturulur:
var checkpoints = new List<CheckpointInfo>();
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case SuperStepCompletedEvent superStepCompletedEvt:
// Checkpoints are automatically created at super step boundaries
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint created at step {checkpoints.Count}.");
}
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
break;
}
}
Denetim Noktalarıyla Çalışma
Denetim Noktası Bilgilerine Erişme
Tamamlanan çalıştırmalardan denetim noktası meta verilerine erişim sağlama:
// Get all checkpoints from a checkpointed run
var allCheckpoints = checkpointedRun.Checkpoints;
// Get the latest checkpoint
var latestCheckpoint = checkpointedRun.LatestCheckpoint;
// Access checkpoint details
foreach (var checkpoint in checkpoints)
{
Console.WriteLine($"Checkpoint ID: {checkpoint.CheckpointId}");
Console.WriteLine($"Step Number: {checkpoint.StepNumber}");
Console.WriteLine($"Parent ID: {checkpoint.Parent?.CheckpointId ?? "None"}");
}
Kontrol Noktası Depolama
Denetim noktaları arabirim üzerinden CheckpointManager yönetilir:
// Commit a checkpoint (usually done automatically)
CheckpointInfo checkpointInfo = await checkpointManager.CommitCheckpointAsync(runId, checkpoint);
// Retrieve a checkpoint
Checkpoint restoredCheckpoint = await checkpointManager.LookupCheckpointAsync(runId, checkpointInfo);
Kontrol Noktalarından Devam Etme
Akışı Devam Ettir
Denetim noktasından yürütmeyi sürdür ve olayları gerçek zamanlı olarak akışla aktar:
// Resume from a specific checkpoint with streaming
CheckpointInfo savedCheckpoint = checkpoints[checkpointIndex];
await using Checkpointed<StreamingRun> resumedRun = await InProcessExecution
.ResumeStreamAsync(workflow, savedCheckpoint, checkpointManager, runId);
await foreach (WorkflowEvent evt in resumedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent executorCompletedEvt:
Console.WriteLine($"Executor {executorCompletedEvt.ExecutorId} completed.");
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
Yayın Dışı Özgeçmiş
Devam edin ve tamamlanmasını bekleyin:
// Resume from checkpoint without streaming
Checkpointed<Run> resumedRun = await InProcessExecution
.ResumeAsync(workflow, savedCheckpoint, checkpointManager, runId);
// Wait for completion and get final result
var result = await resumedRun.Run.WaitForCompletionAsync();
In-Place Geri Yükleme
Denetim noktasını doğrudan mevcut bir çalıştırma örneğine geri yükleyin:
// Restore checkpoint to the same run instance
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint);
// Continue execution from the restored state
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
// Handle events as normal
if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine($"Resumed workflow result: {outputEvt.Data}");
break;
}
}
Yeni İş Akışı Örneği (Yeniden Doldurma)
Denetim noktasından yeni bir iş akışı örneği oluşturun:
// Create a completely new workflow instance
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
// Resume with the new instance from a saved checkpoint
await using Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager, originalRunId);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Rehydrated workflow result: {workflowOutputEvt.Data}");
break;
}
}
Denetim Noktası Oluşturma ile Döngüdeki İnsan
Kontrol noktası oluşturmayı insan müdahalesi içeren iş akışlarıyla birleştirin.
var checkpoints = new List<CheckpointInfo>();
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle external requests
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await checkpointedRun.Run.SendResponseAsync(response);
break;
case SuperStepCompletedEvent superStepCompletedEvt:
// Save checkpoint after each interaction
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint created after human interaction.");
}
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed: {workflowOutputEvt.Data}");
return;
}
}
// Later, resume from any checkpoint
if (checkpoints.Count > 0)
{
var selectedCheckpoint = checkpoints[1]; // Select specific checkpoint
await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);
// Continue from that point
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
// Handle remaining workflow execution
}
}
Tam Örnek Şablonu
Kapsamlı bir denetim noktası iş akışı düzeni aşağıdadır:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;
public static class CheckpointingExample
{
public static async Task RunAsync()
{
// Create workflow and checkpoint manager
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;
var checkpoints = new List<CheckpointInfo>();
Console.WriteLine("Starting workflow with checkpointing...");
// Execute workflow with checkpointing
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);
// Monitor execution and collect checkpoints
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent executorEvt:
Console.WriteLine($"Executor {executorEvt.ExecutorId} completed.");
break;
case SuperStepCompletedEvent superStepEvt:
var checkpoint = superStepEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint {checkpoints.Count} created.");
}
break;
case WorkflowOutputEvent outputEvt:
Console.WriteLine($"Workflow completed: {outputEvt.Data}");
goto FinishExecution;
}
}
FinishExecution:
Console.WriteLine($"Total checkpoints created: {checkpoints.Count}");
// Demonstrate resuming from a checkpoint
if (checkpoints.Count > 5)
{
var selectedCheckpoint = checkpoints[5];
Console.WriteLine($"Resuming from checkpoint 6...");
// Restore to same instance
await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent resumedOutputEvt)
{
Console.WriteLine($"Resumed workflow result: {resumedOutputEvt.Data}");
break;
}
}
}
// Demonstrate rehydration with new workflow instance
if (checkpoints.Count > 3)
{
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
var rehydrationCheckpoint = checkpoints[3];
Console.WriteLine("Rehydrating from checkpoint 4 with new workflow instance...");
await using Checkpointed<StreamingRun> newRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, rehydrationCheckpoint, checkpointManager, checkpointedRun.Run.RunId);
await foreach (WorkflowEvent evt in newRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent rehydratedOutputEvt)
{
Console.WriteLine($"Rehydrated workflow result: {rehydratedOutputEvt.Data}");
break;
}
}
}
}
}
Önemli Avantajlar
- Hataya Dayanıklılık: İş akışları, son denetim noktasından devam ederek hatalardan kurtarılabilir
- Uzun Süreli İşlemler: Uzun iş akışlarını otomatik kontrol noktası sınırlarıyla yönetilebilir segmentlere bölün
- İnsan Katılımlı Süreç: Dış girdiye izin vermek için duraklat ve daha sonra kaydedilen durumdan devam et
- Hata ayıklama: Belirli noktalarda iş akışı durumunu inceleme ve test için yürütmeyi sürdürme
- Taşınabilirlik: Denetim noktaları yeni iş akışı örneklerine geri yüklenebilir (yeniden canlandırma)
- Otomatik Yönetim: Denetim noktaları süper adım sınırlarında otomatik olarak oluşturulur
Örneği Çalıştırma
Tam çalışma uygulaması için bkz. CheckpointAndResume örneği.
Temel Bileşenler
FileCheckpointStorage
sınıfı, FileCheckpointStorage JSON dosyalarını kullanarak kalıcı denetim noktası depolaması sağlar:
from agent_framework import FileCheckpointStorage
from pathlib import Path
# Initialize checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
Kontrol Noktalarını Etkinleştirme
İş akışınızı oluştururken denetim noktası oluşturmayı etkinleştirin:
from agent_framework import WorkflowBuilder
workflow = (
WorkflowBuilder(max_iterations=5)
.add_edge(executor1, executor2)
.set_start_executor(executor1)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build()
)
Durum Sürekliliği
Yürütücü Durumu
Yürütücüler, denetim noktalarından geçebilen yerel durumu kalıcı hale getirebilir.
from agent_framework import Executor, WorkflowContext, handler
class WorkerExecutor(Executor):
"""Processes numbers to compute their factor pairs and manages executor state for checkpointing."""
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._composite_number_pairs: dict[int, list[tuple[int, int]]] = {}
@handler
async def compute(
self,
task: ComputeTask,
ctx: WorkflowContext[ComputeTask, dict[int, list[tuple[int, int]]]],
) -> None:
"""Process the next number in the task, computing its factor pairs."""
next_number = task.remaining_numbers.pop(0)
print(f"WorkerExecutor: Computing factor pairs for {next_number}")
pairs: list[tuple[int, int]] = []
for i in range(1, next_number):
if next_number % i == 0:
pairs.append((i, next_number // i))
self._composite_number_pairs[next_number] = pairs
if not task.remaining_numbers:
# All numbers processed - output the results
await ctx.yield_output(self._composite_number_pairs)
else:
# More numbers to process - continue with remaining task
await ctx.send_message(task)
@override
async def on_checkpoint_save(self) -> dict[str, Any]:
"""Save the executor's internal state for checkpointing."""
return {"composite_number_pairs": self._composite_number_pairs}
@override
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
"""Restore the executor's internal state from a checkpoint."""
self._composite_number_pairs = state.get("composite_number_pairs", {})
Denetim Noktalarıyla Çalışma
Denetim Noktalarını Listeleme
Kullanılabilir denetim noktalarını alın ve inceleyin:
# List all checkpoints
all_checkpoints = await checkpoint_storage.list_checkpoints()
# List checkpoints for a specific workflow
workflow_checkpoints = await checkpoint_storage.list_checkpoints(workflow_id="my-workflow")
# Sort by creation time
sorted_checkpoints = sorted(all_checkpoints, key=lambda cp: cp.timestamp)
Kontrol Noktalarından Devam Etme
Akışı Devam Ettir
Yürütmeyi sürdür ve olayları gerçek zamanlı olarak akışla aktar:
# Resume from a specific checkpoint
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
print(f"Resumed Event: {event}")
if isinstance(event, WorkflowOutputEvent):
print(f"Final Result: {event.data}")
break
Yayın Dışı Özgeçmiş
Bir kerede devam edin ve tüm sonuçları alın:
# Resume and wait for completion
result = await workflow.run(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
)
# Access final outputs
outputs = result.get_outputs()
print(f"Final outputs: {outputs}")
Bekleyen İsteklerle Devam Et
Bekleyen istekleri içeren bir denetim noktasından devam ederken, iş akışı bu istek olaylarını tekrar tetikleyerek bunları yakalamanıza ve yanıtlamanıza olanak tanır:
request_info_events = []
# Resume from checkpoint - pending requests will be re-emitted
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
# Capture re-emitted pending requests
print(f"Pending request re-emitted: {event.request_id}")
request_info_events.append(event)
# Handle the request and provide response
# If responses are already provided, no need to handle them again
responses = {}
for event in request_info_events:
response = handle_request(event.data)
responses[event.request_id] = response
# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
if isinstance(event, WorkflowOutputEvent):
print(f"Workflow completed: {event.data}")
Zaten yanıtlanmış bekleyen istekler içeren bir denetim noktasından devam ediyorsanız, önceden sağlanan yanıtlarla süreci devam ettirmek için yine de run_stream()'i çağırmanız ve ardından send_responses_streaming() ile iş akışına devam etmeniz gerekir.
Etkileşimli Denetim Noktası Seçimi
Kullanıcı dostu denetim noktası seçimi oluşturun:
async def select_and_resume_checkpoint(workflow, storage):
# Get available checkpoints
checkpoints = await storage.list_checkpoints()
if not checkpoints:
print("No checkpoints available")
return
# Sort and display options
sorted_cps = sorted(checkpoints, key=lambda cp: cp.timestamp)
print("Available checkpoints:")
for i, cp in enumerate(sorted_cps):
summary = get_checkpoint_summary(cp)
print(f"[{i}] {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")
# Get user selection
try:
idx = int(input("Enter checkpoint index: "))
selected = sorted_cps[idx]
# Resume from selected checkpoint
print(f"Resuming from checkpoint: {selected.checkpoint_id}")
async for event in workflow.run_stream(
selected.checkpoint_id,
checkpoint_storage=storage
):
print(f"Event: {event}")
except (ValueError, IndexError):
print("Invalid selection")
Tam Örnek Şablonu
Tipik bir denetim noktası oluşturma iş akışı düzeni aşağıda verilmiştir:
import asyncio
from pathlib import Path
from agent_framework import (
FileCheckpointStorage,
WorkflowBuilder,
WorkflowOutputEvent,
get_checkpoint_summary
)
async def main():
# Setup checkpoint storage
checkpoint_dir = Path("./checkpoints")
checkpoint_dir.mkdir(exist_ok=True)
storage = FileCheckpointStorage(checkpoint_dir)
# Build workflow with checkpointing
workflow = (
WorkflowBuilder()
.add_edge(executor1, executor2)
.set_start_executor(executor1)
.with_checkpointing(storage)
.build()
)
# Initial run
print("Running workflow...")
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
# List and inspect checkpoints
checkpoints = await storage.list_checkpoints()
for cp in sorted(checkpoints, key=lambda c: c.timestamp):
summary = get_checkpoint_summary(cp)
print(f"Checkpoint: {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")
# Resume from a checkpoint
if checkpoints:
latest = max(checkpoints, key=lambda cp: cp.timestamp)
print(f"Resuming from: {latest.checkpoint_id}")
async for event in workflow.run_stream(latest.checkpoint_id):
print(f"Resumed: {event}")
if __name__ == "__main__":
asyncio.run(main())
Önemli Avantajlar
- Hataya Dayanıklılık: İş akışları, son denetim noktasından devam ederek hatalardan kurtarılabilir
- Uzun Süreli İşlemler: Uzun iş akışlarını kontrol noktaları ile yönetilebilir segmentlere bölün
- İnsanın Döngüde Olduğu Sistem: İnsan girişi için duraklatma ve daha sonra devam etme - devam edildiğinde bekleyen istekler yeniden iletilir
- Hata ayıklama: Belirli noktalarda iş akışı durumunu inceleme ve test için yürütmeyi sürdürme
- Kaynak Yönetimi: Kaynak kullanılabilirliğine göre iş akışlarını durdurma ve yeniden başlatma
Örneği Çalıştırma
Tam çalışma uygulaması için Özgeçmiş ile Denetim Noktası örneğine bakın.