Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfa, Microsoft Agent Framework İş Akışı sistemindeki Denetim Noktalarına genel bir bakış sağlar.
Genel Bakış
Denetim noktaları, yürütme sırasında belirli noktalarda iş akışının durumunu kaydetmenize ve daha sonra bu noktalardan devam etmenizi sağlar. Bu özellik özellikle aşağıdaki senaryolar için kullanışlıdır:
- Uzun süre devam eden ve hata durumunda ilerlemeyi kaybetmek istemediğiniz iş akışları.
- Yürütmeyi daha sonra duraklatmak ve devam ettirmek istediğiniz uzun süreli iş akışları.
- Denetim veya uyumluluk amacıyla düzenli olarak durum kaydetme gerektiren iş akışları.
- Farklı ortamlar veya örnekler arasında taşınması gereken iş akışları.
Denetim Noktaları Ne Zaman Oluşturulur?
İş akışlarının süper adımlar olarak yürütüldüğünü, temel kavramlarda belgelenmiş olduğunu unutmayın. Denetim noktaları, bu üst adımdaki tüm yürütücüler yürütmelerini tamamladıktan sonra her üst adımın sonunda oluşturulur. Denetim noktası iş akışının durumunun tamamını yakalar, örneğin:
- Tüm yürütücülerin geçerli durumu
- Sonraki üst adım için iş akışında bekleyen tüm iletiler
- Bekleyen istekler ve yanıtlar
- Paylaşılan durumlar
Denetim Noktalarını Yakalama
Kontrol noktalarını etkinleştirmek için bir iş akışı çalıştırması oluşturulurken bir CheckpointManager sağlanması gerekir.
SuperStepCompletedEvent aracılığıyla bir kontrol noktasına erişilebilir.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();
// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint and store it
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint != null)
{
checkpoints.Add(checkpoint);
}
}
}
Denetim işaretlerini etkinleştirmek için iş akışı oluştururken bir CheckpointStorage sağlanmalıdır. Ardından bir denetim noktasına depolama üzerinden erişilebilir.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run_streaming(input):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()
Kontrol Noktalarından Devam Etme
Belirli bir denetim noktasından bir iş akışını aynı çalışmada doğrudan sürdürebilirsiniz.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Belirli bir denetim noktasından bir iş akışını doğrudan aynı iş akışı örneğinde sürdürebilirsiniz.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
...
Denetim Noktalarından Yeniden Doldurma
Veya bir iş akışını bir denetim noktasından yeni bir çalıştırma örneğine yeniden yükleyebilirsiniz.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Veya bir kontrol noktasından yeni bir iş akışı örneğini yeniden başlatabilirsiniz.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
):
...
Yürütücü Durumlarını Kaydet
Bir yürütücü durumunun bir denetim noktasında yakalanmasını sağlamak için, yürütücü OnCheckpointingAsync yöntemini geçersiz kılmalı ve durumunu iş akışı bağlamına kaydetmelidir.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
Ayrıca, denetim noktasından devam ederken durumun doğru şekilde geri yüklendiğinden emin olmak için yürütücü yöntemi geçersiz kılmalı OnCheckpointRestoredAsync ve iş akışı bağlamından durumunu yüklemelidir.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Bir yürütücü durumunun bir denetim noktasında yakalanmasını sağlamak için, yürütücü on_checkpoint_save yöntemini geçersiz kılmalı ve durumunu iş akışı bağlamına kaydetmelidir.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
Ayrıca, denetim noktasından devam ederken durumun doğru şekilde geri yüklendiğinden emin olmak için yürütücü yöntemi geçersiz kılmalı on_checkpoint_restore ve iş akışı bağlamından durumunu yüklemelidir.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])