Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tato stránka obsahuje přehled Checkpoints v systému pracovních postupů Microsoft Agent Framework.
Přehled
Kontrolní body umožňují uložit stav pracovního postupu v konkrétních bodech během jeho provádění a pokračovat z těchto bodů později. Tato funkce je užitečná zejména pro následující scénáře:
- Dlouhotrvající pracovní postupy, ve kterých chcete zabránit ztrátě průběhu v případě selhání.
- Dlouhotrvající pracovní postupy, ve kterých chcete pozastavit a obnovit spouštění později.
- Pracovní postupy, které vyžadují pravidelné ukládání stavu pro účely auditování nebo dodržování předpisů
- Pracovní postupy, které je potřeba migrovat napříč různými prostředími nebo instancemi
Kdy jsou vytvořeny kontrolní body?
Mějte na paměti, že pracovní postupy se provádějí v superkrocích, jak je uvedeno v základních konceptech. Kontrolní body se vytvoří na konci každého superkroku poté, co všechny exekutory v daném superkroku dokončily provádění. Kontrolní bod zachycuje celý stav pracovního postupu, včetně:
- Aktuální stav všech exekutorů
- Všechny čekající zprávy ve workflow pro další superstep
- Čekající žádosti a odpovědi
- Sdílené stavy
Zachytávání kontrolních bodů
Pokud chcete povolit vytváření kontrolních bodů, je potřeba poskytnout CheckpointManager při spuštění pracovního postupu. K bodu obnovení je pak možné přistupovat prostřednictvím SuperStepCompletedEvent nebo vlastnosti Checkpoints při spuštění.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
}
}
// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;
Pro povolení vytváření kontrolních bodů je třeba poskytnout CheckpointStorage při vytváření pracovního postupu. K kontrolnímu bodu se pak dostanete přes úložiště. Agent Framework dodává tři předdefinované implementace – vyberte si ty, které odpovídají vašim potřebám odolnosti a nasazení:
| Poskytovatel | Balíček | Durability | Nejvhodnější pro |
|---|---|---|---|
InMemoryCheckpointStorage |
agent-framework |
Pouze během procesu | Testy, ukázky, krátkodobé pracovní postupy |
FileCheckpointStorage |
agent-framework |
Místní disk | Pracovní postupy s jedním počítačem, místní vývoj |
CosmosCheckpointStorage |
agent-framework-azure-cosmos |
Azure Cosmos DB | Produkční, distribuované pracovní postupy napříč procesy |
Všechny tři implementují stejný CheckpointStorage protokol, takže můžete prohodit poskytovatele beze změny kódu pracovního postupu nebo exekutoru.
InMemoryCheckpointStorage uchovává kontrolní body v paměti procesu. Nejvhodnější pro testy, ukázky a krátkodobé pracovní postupy, ve kterých nepotřebujete odolnost při restartování.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run(input, stream=True):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
Obnovení z kontrolních bodů
Pracovní postup můžete obnovit z konkrétního kontrolního bodu přímo v tom samém běhu.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Pracovní postup můžete obnovit z konkrétního kontrolního bodu přímo ve stejné instanci pracovního postupu.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
...
Obnovení z kontrolních bodů
Nebo můžete pracovní postup obnovit z kontrolního bodu do nové spustitelné instance.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Nebo můžete obnovit novou instanci pracovního postupu z kontrolního bodu.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
...
Stavy exekutoru
Aby se zajistilo, že se stav exekutoru zachytí do kontrolního bodu, musí exekutor přepsat OnCheckpointingAsync metodu a uložit svůj stav do kontextu pracovního postupu.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
Aby se zajistilo, že se stav při obnovení z kontrolního bodu správně obnoví, musí exekutor přepsat OnCheckpointRestoredAsync metodu a načíst její stav z kontextu pracovního postupu.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Aby se zajistilo, že se stav exekutoru zachytí do kontrolního bodu, musí exekutor přepsat on_checkpoint_save metodu a vrátit její stav jako slovník.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
K zajištění správného obnovení stavu při obnově z kontrolního bodu musí exekutor přepsat on_checkpoint_restore metodu a obnovit stav ze zadaného slovníku stavu.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])
Aspekty zabezpečení
Důležité
Úložiště kontrolních bodů je hranice důvěryhodnosti. Bez ohledu na to, jestli používáte integrované implementace úložiště nebo vlastní, musí být back-end úložiště považován za důvěryhodnou privátní infrastrukturu. Nikdy nenačítejte kontrolní body z nedůvěryhodných nebo potenciálně manipulovaných zdrojů.
Ujistěte se, že je umístění úložiště používané pro kontrolní body správně zabezpečené. Pouze autorizované služby a uživatelé by měli mít přístup ke čtení nebo zápisu k datům kontrolního bodu.
Serializace Pickle
Modul FileCheckpointStorage i CosmosCheckpointStorage používá modul Python pickle k serializaci jiného než nativního stavu JSON, jako jsou datové třídy, data a časy a vlastní objekty. Aby se zmírnila rizika spuštění libovolného kódu během deserializace, oba poskytovatelé ve výchozím nastavení používají omezený unpickler. Během deserializace jsou povoleny pouze předdefinované sady bezpečných typů Python (primitiv, datetime, uuid, Decimal, běžné kolekce atd.) a všechny agent_framework interní typy. Jakýkoli jiný typ zjištěný v kontrolním bodu způsobí, že deserializace selže s chybou WorkflowCheckpointException.
Pokud chcete povolit další typy specifické pro aplikaci, předejte je prostřednictvím parametru allowed_checkpoint_types pomocí "module:qualname" formátu:
from agent_framework import FileCheckpointStorage
storage = FileCheckpointStorage(
"/tmp/checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
CosmosCheckpointStorage přijímá stejný parametr:
from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage
storage = CosmosCheckpointStorage(
endpoint="https://my-account.documents.azure.com:443/",
credential=DefaultAzureCredential(),
database_name="agent-db",
container_name="checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
Pokud váš model hrozeb nepovoluje serializaci založenou na pickle, použijte InMemoryCheckpointStorage nebo implementujte vlastní CheckpointStorage s alternativní strategií serializace.
Odpovědnost za umístění úložiště
FileCheckpointStorage vyžaduje explicitní storage_path parametr – neexistuje žádný výchozí adresář. I když rámec ověřuje útoky proti procházení cest, zabezpečení samotného adresáře úložiště (oprávnění k souborům, šifrování dat v klidu, řízení přístupu) je zodpovědností vývojáře. K adresáři kontrolních bodů by měly mít přístup jen autorizované procesy pro čtení nebo zápis.
CosmosCheckpointStorage spoléhá na úložiště Azure Cosmos DB. Pokud je to možné, použijte spravovanou identitu nebo RBAC, omezte rozsah databáze a kontejneru na službu pracovního postupu a měňte klíče účtu pravidelně, pokud používáte ověřování založené na klíčích. Stejně jako u úložiště souborů by k kontejneru Cosmos DB, který obsahuje dokumenty kontrolních bodů, měly mít přístup jen autorizované subjekty.