Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Halaman ini menyediakan gambaran umum Checkpoints dalam sistem Alur Kerja Kerangka Kerja Agen Microsoft.
Gambaran Umum
Titik pemeriksaan memungkinkan Anda menyimpan status alur kerja pada titik tertentu selama eksekusinya, dan melanjutkan dari titik-titik tersebut nanti. Fitur ini sangat berguna untuk skenario berikut:
- Alur kerja yang berjalan lama di mana Anda ingin menghindari kehilangan kemajuan jika terjadi kegagalan.
- Alur kerja yang berjalan lama di mana Anda ingin menjeda dan melanjutkan eksekusi di lain waktu.
- Alur kerja yang memerlukan penyimpanan status berkala untuk tujuan audit atau kepatuhan.
- Alur kerja yang perlu dimigrasikan di berbagai lingkungan atau instans.
Kapan Titik Pemeriksaan Dibuat?
Ingatlah bahwa alur kerja dijalankan dalam supersteps, sebagaimana didokumentasikan dalam konsep inti. Titik pemeriksaan dibuat di akhir setiap superstep, setelah semua pelaksana dalam superstep tersebut telah menyelesaikan eksekusi mereka. Titik pemeriksaan menangkap seluruh status alur kerja, termasuk:
- Status saat ini dari semua pelaksana
- Semua pesan yang tertunda dalam alur kerja untuk superstep berikutnya
- Permintaan dan respons tertunda
- Status bersama
Menangkap Titik Pemeriksaan
Untuk mengaktifkan titik pemeriksaan, CheckpointManager perlu disediakan saat menjalankan alur kerja. Titik pemeriksaan kemudian dapat diakses melalui SuperStepCompletedEvent, atau melalui properti Checkpoints pada run.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
}
}
// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;
Untuk mengaktifkan titik pemeriksaan, CheckpointStorage perlu disediakan saat membuat alur kerja. Titik pemeriksaan kemudian dapat diakses melalui penyimpanan. Agent Framework mengirimkan tiga implementasi bawaan — pilih salah satu yang sesuai dengan durabilitas dan kebutuhan penyebaran Anda:
| Provider | Package | Durability | Paling cocok untuk |
|---|---|---|---|
InMemoryCheckpointStorage |
agent-framework |
Hanya dalam pemrosesan | Pengujian, demo, alur kerja sementara |
FileCheckpointStorage |
agent-framework |
Disk lokal | Alur kerja mesin tunggal, pengembangan lokal |
CosmosCheckpointStorage |
agent-framework-azure-cosmos |
Azure Cosmos DB | Alur kerja produksi, terdistribusi, lintas proses |
Ketiganya menerapkan protokol yang sama CheckpointStorage , sehingga Anda dapat menukar penyedia tanpa mengubah alur kerja atau kode pelaksana.
- In-Memory (Dalam Memori)
- File
- Azure Cosmos DB
InMemoryCheckpointStorage menyimpan titik pemeriksaan dalam memori proses. Terbaik untuk pengujian, demo, dan alur kerja berumur pendek di mana Anda tidak memerlukan ketahanan saat melakukan restart.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run(input, stream=True):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
Lanjutkan dari Titik Pemeriksaan
Anda dapat melanjutkan alur kerja dari titik pemeriksaan tertentu langsung dalam proses yang sama.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Anda dapat melanjutkan alur kerja dari checkpoint tertentu langsung pada instans alur kerja yang sama.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
...
Rehidrasi dari Titik Pemeriksaan
Atau Anda dapat merehidrasi alur kerja dari titik pemeriksaan ke dalam instans eksekusi baru.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Atau Anda dapat merehidrasi instans alur kerja baru dari titik pemeriksaan.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
...
Simpan Status Pelaksana
Untuk memastikan bahwa status pelaksana diambil dalam titik pemeriksaan, pelaksana harus mengambil alih OnCheckpointingAsync metode dan menyimpan statusnya ke konteks alur kerja.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
Selain itu, untuk memastikan status dipulihkan dengan benar saat melanjutkan dari titik pemeriksaan, pelaksana harus mengambil alih OnCheckpointRestoredAsync metode dan memuat statusnya dari konteks alur kerja.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Untuk memastikan bahwa status pelaksana ditangkap dalam titik pemeriksaan, pelaksana harus mengambil on_checkpoint_save alih metode dan mengembalikan statusnya sebagai kamus.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
Selain itu, untuk memastikan status dipulihkan dengan benar saat melanjutkan dari titik pemeriksaan, pelaksana harus mengambil alih on_checkpoint_restore metode dan memulihkan statusnya dari kamus status yang disediakan.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])
Pertimbangan Keamanan
Penting
Penyimpanan checkpoint berfungsi sebagai batas keamanan kepercayaan. Baik Anda menggunakan implementasi penyimpanan bawaan atau yang kustom, backend penyimpanan harus diperlakukan sebagai infrastruktur privat tepercaya. Jangan pernah memuat titik pemeriksaan dari sumber yang tidak terpercaya atau berpotensi telah diubah.
Pastikan bahwa lokasi penyimpanan yang digunakan untuk titik pemeriksaan diamankan dengan tepat. Hanya layanan dan pengguna yang berwenang yang seharusnya memiliki akses baca atau tulis ke data titik pemeriksaan.
Serialisasi Pickle
Baik FileCheckpointStorage dan CosmosCheckpointStorage menggunakan modul pickle Python untuk menserialisasikan status non-JSON-asli seperti kelas data, tanggalwaktu, dan objek kustom. Untuk mengurangi risiko eksekusi kode arbitrer selama deserialisasi, kedua penyedia menggunakan unpickler terbatas secara bawaan. Hanya sekumpulan jenis-jenis Python aman bawaan (primitif, datetime, uuid, Decimal, koleksi-koleksi yang umum, dll.) dan semua jenis-jenis internal agent_framework diizinkan selama deserialisasi. Jenis lain yang ditemui dalam titik pemeriksaan menyebabkan deserialisasi gagal dengan WorkflowCheckpointException.
Untuk mengizinkan jenis khusus aplikasi tambahan, lewatkan melalui parameter allowed_checkpoint_types dengan menggunakan format "module:qualname".
from agent_framework import FileCheckpointStorage
storage = FileCheckpointStorage(
"/tmp/checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
CosmosCheckpointStorage menerima parameter yang sama:
from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage
storage = CosmosCheckpointStorage(
endpoint="https://my-account.documents.azure.com:443/",
credential=DefaultAzureCredential(),
database_name="agent-db",
container_name="checkpoints",
allowed_checkpoint_types=[
"my_app.models:SafeState",
"my_app.models:UserProfile",
],
)
Jika model ancaman Anda tidak mengizinkan serialisasi berbasis pickle sama sekali, gunakan InMemoryCheckpointStorage atau menetapkan CheckpointStorage melalui strategi serialisasi alternatif.
Tanggung jawab lokasi penyimpanan
FileCheckpointStorage memerlukan parameter eksplisit storage_path — tidak ada direktori default. Meskipun kerangka kerja memvalidasi terhadap serangan traversal path, tanggung jawab pengembang adalah mengamankan direktori penyimpanan itu sendiri, termasuk izin file, enkripsi dalam keadaan diam, dan kontrol akses. Hanya proses yang berwenang yang seharusnya memiliki akses baca atau tulis ke direktori titik pemeriksaan.
CosmosCheckpointStorage bergantung pada Azure Cosmos DB untuk penyimpanan. Gunakan identitas terkelola / RBAC jika memungkinkan, tetapkan cakupan database dan kontainer ke dalam layanan alur kerja, dan rotasikan kunci akun jika menggunakan autentikasi berbasis kunci. Seperti halnya penyimpanan file, hanya pihak yang berwenang yang seharusnya memiliki akses baca atau tulis ke kontainer Cosmos DB yang menyimpan dokumen penanda posisi.