Bagikan melalui


agent_framework Paket

Paket

a2a
ag_ui
anthropic
azure
chatkit
declarative
devui
lab
mem0
microsoft
ollama
openai
redis

Modul

exceptions
observability

Kelas

AIFunction

Alat yang membungkus fungsi Python untuk membuatnya dapat dipanggil oleh model AI.

Kelas ini membungkus fungsi Python untuk membuatnya dapat dipanggil oleh model AI dengan validasi parameter otomatis dan pembuatan skema JSON.

Menginisialisasi AIFunction.

AgentExecutor

pelaksana bawaan yang membungkus agen untuk menangani pesan.

AgentExecutor mengadaptasi perilakunya berdasarkan mode eksekusi alur kerja:

  • run_stream(): Memancarkan peristiwa AgentRunUpdateEvent bertahap saat agen menghasilkan token
  • run(): Memancarkan satu AgentRunEvent yang berisi respons lengkap

Pelaksana secara otomatis mendeteksi mode melalui WorkflowContext.is_streaming().

Inisialisasi pelaksana dengan pengidentifikasi unik.

AgentExecutorRequest

Permintaan ke pelaksana agen.

AgentExecutorResponse

Respons dari pelaksana agen.

AgentInputRequest

Permintaan input manusia sebelum agen berjalan dalam alur kerja penyusun tingkat tinggi.

Dipancarkan melalui RequestInfoEvent saat alur kerja dijeda sebelum agen dijalankan. Respons disuntikkan ke dalam percakapan sebagai pesan pengguna untuk mengarahkan perilaku agen.

Ini adalah jenis permintaan standar yang digunakan oleh .with_request_info() pada SequentialBuilder, ConcurrentBuilder, GroupChatBuilder, dan HandoffBuilder.

AgentMiddleware

Kelas dasar abstrak untuk middleware agen yang dapat mencegat pemanggilan agen.

Middleware agen memungkinkan Anda untuk mencegat dan memodifikasi pemanggilan agen sebelum dan sesudah eksekusi. Anda dapat memeriksa pesan, memodifikasi konteks, mengambil alih hasil, atau mengakhiri eksekusi lebih awal.

Nota

AgentMiddleware adalah kelas dasar abstrak. Anda harus subkelas dan menerapkannya

metode process() untuk membuat middleware agen kustom.

AgentProtocol

Protokol untuk agen yang dapat dipanggil.

Protokol ini mendefinisikan antarmuka yang harus diterapkan semua agen, termasuk properti untuk identifikasi dan metode untuk eksekusi.

Nota

Protokol menggunakan subtipe struktural (pengetikan bebek). Kelas tidak perlu

untuk secara eksplisit mewarisi dari protokol ini agar dianggap kompatibel.

Ini memungkinkan Anda membuat agen kustom sepenuhnya tanpa menggunakan

kelas dasar Kerangka Kerja Agen apa pun.

AgentRunContext

Objek konteks untuk pemanggilan middleware agen.

Konteks ini diteruskan melalui alur middleware agen dan berisi semua informasi tentang pemanggilan agen.

Inisialisasi AgentRunContext.

AgentRunEvent

Peristiwa dipicu saat eksekusi agen selesai.

Menginisialisasi peristiwa eksekusi agen.

AgentRunResponse

Mewakili respons terhadap permintaan eksekusi Agen.

Menyediakan satu atau beberapa pesan respons dan metadata tentang respons. Respons umum akan berisi satu pesan, tetapi mungkin berisi beberapa pesan dalam skenario yang melibatkan panggilan fungsi, pengambilan RAG, atau logika kompleks.

Menginisialisasi AgentRunResponse.

AgentRunResponseUpdate

Mewakili potongan respons streaming tunggal dari Agen.

Menginisialisasi AgentRunResponseUpdate.

AgentRunUpdateEvent

Peristiwa dipicu saat agen melakukan streaming pesan.

Menginisialisasi peristiwa streaming agen.

AgentThread

Kelas utas Agen, ini dapat mewakili utas yang dikelola secara lokal atau utas yang dikelola oleh layanan.

Mempertahankan AgentThread status percakapan dan riwayat pesan untuk interaksi agen. Ini dapat menggunakan utas yang dikelola layanan (melalui service_thread_id) atau penyimpanan pesan lokal (melalui message_store), tetapi tidak keduanya.

Inisialisasi AgentThread, jangan gunakan metode ini secara manual, selalu gunakan: agent.get_new_thread().

Nota

Baik service_thread_id atau message_store dapat diatur, tetapi tidak keduanya.

AggregateContextProvider

ContextProvider yang berisi beberapa penyedia konteks.

Ini mendelegasikan peristiwa ke beberapa penyedia konteks dan menggabungkan respons dari peristiwa tersebut sebelum kembali. Ini memungkinkan Anda menggabungkan beberapa penyedia konteks ke dalam satu penyedia.

Nota

AggregateContextProvider dibuat secara otomatis saat Anda melewati satu konteks

penyedia atau urutan penyedia konteks ke konstruktor agen.

Inisialisasi AggregateContextProvider dengan penyedia konteks.

BaseAgent

Kelas dasar untuk semua agen Agent Framework.

Kelas ini menyediakan fungsionalitas inti untuk implementasi agen, termasuk penyedia konteks, dukungan middleware, dan manajemen utas.

Nota

BaseAgent tidak dapat diinstansiasi secara langsung karena tidak mengimplementasikan

run(), run_stream(), dan metode lain yang diperlukan oleh AgentProtocol.

Gunakan implementasi konkret seperti ChatAgent atau buat subkelas.

Menginisialisasi instans BaseAgent.

BaseAnnotation

Kelas dasar untuk semua jenis Anotasi AI.

Menginisialisasi BaseAnnotation.

BaseChatClient

Kelas dasar untuk klien obrolan.

Kelas dasar abstrak ini menyediakan fungsionalitas inti untuk implementasi klien obrolan, termasuk dukungan middleware, persiapan pesan, dan normalisasi alat.

Nota

BaseChatClient tidak dapat dibuat langsung karena merupakan kelas dasar abstrak.

Subkelas harus menerapkan _inner_get_response() dan _inner_get_streaming_response().

Menginisialisasi instans BaseChatClient.

BaseContent

Mewakili konten yang digunakan oleh layanan AI.

Menginisialisasi BaseContent.

Case

Pembungkus runtime menggabungkan predikat switch-case dengan targetnya.

Setiap Kasus menggandeng predikat boolean dengan pelaksana yang harus menangani pesan ketika predikat mengevaluasi ke True. Runtime menjaga kontainer ringan ini terpisah dari SwitchCaseEdgeGroupCase yang dapat diserialisasikan sehingga eksekusi dapat beroperasi dengan panggilan langsung tanpa mencemari status persisten.

ChatAgent

Agen Klien Obrolan.

Ini adalah implementasi agen utama yang menggunakan klien obrolan untuk berinteraksi dengan model bahasa. Ini mendukung alat, penyedia konteks, middleware, dan respons streaming dan non-streaming.

Menginisialisasi instans ChatAgent.

Nota

Kumpulan parameter dari frequency_penalty ke request_kwargs digunakan untuk

hubungi klien obrolan. Mereka juga dapat diteruskan ke kedua metode eksekusi.

Ketika keduanya diatur, yang diteruskan ke metode eksekusi lebih diutamakan.

ChatClientProtocol

Protokol untuk klien obrolan yang dapat menghasilkan respons.

Protokol ini mendefinisikan antarmuka yang harus diterapkan semua klien obrolan, termasuk metode untuk menghasilkan respons streaming dan non-streaming.

Nota

Protokol menggunakan subtipe struktural (pengetikan bebek). Kelas tidak perlu

untuk secara eksplisit mewarisi dari protokol ini agar dianggap kompatibel.

ChatContext

Objek konteks untuk pemanggilan middleware obrolan.

Konteks ini diteruskan melalui alur middleware obrolan dan berisi semua informasi tentang permintaan obrolan.

Inisialisasi ChatContext.

ChatMessage

Mewakili pesan obrolan.

Menginisialisasi ChatMessage.

ChatMessageStore

Implementasi dalam memori ChatMessageStoreProtocol yang menyimpan pesan dalam daftar.

Implementasi ini menyediakan penyimpanan sederhana berbasis daftar untuk pesan obrolan dengan dukungan untuk serialisasi dan deserialisasi. Ini mengimplementasikan semua metode protokol yang ChatMessageStoreProtocol diperlukan.

Penyimpanan mempertahankan pesan dalam memori dan menyediakan metode untuk menserialisasikan dan mendeserialisasi status untuk tujuan persistensi.

Buat ChatMessageStore untuk digunakan dalam utas.

ChatMessageStoreProtocol

Menentukan metode untuk menyimpan dan mengambil pesan obrolan yang terkait dengan utas tertentu.

Implementasi protokol ini bertanggung jawab untuk mengelola penyimpanan pesan obrolan, termasuk menangani data dalam volume besar dengan memotong atau meringkas pesan seperlunya.

ChatMiddleware

Kelas dasar abstrak untuk middleware obrolan yang dapat mencegat permintaan klien obrolan.

Middleware obrolan memungkinkan Anda mencegat dan memodifikasi permintaan klien obrolan sebelum dan sesudah eksekusi. Anda dapat mengubah pesan, menambahkan permintaan sistem, mencatat permintaan, atau mengambil alih respons obrolan.

Nota

ChatMiddleware adalah kelas dasar abstrak. Anda harus subkelas dan menerapkannya

metode process() untuk membuat middleware obrolan kustom.

ChatOptions

Pengaturan permintaan umum untuk layanan AI.

Menginisialisasi ChatOptions.

ChatResponse

Mewakili respons terhadap permintaan obrolan.

Menginisialisasi ChatResponse dengan parameter yang disediakan.

ChatResponseUpdate

Mewakili potongan respons streaming tunggal dari ChatClient.

Menginisialisasi ChatResponseUpdate dengan parameter yang disediakan.

CheckpointStorage

Protokol untuk backend penyimpanan titik pemeriksaan.

CitationAnnotation

Mewakili anotasi kutipan.

Inisialisasi CitationAnnotation.

ConcurrentBuilder

Penyusun tingkat tinggi untuk alur kerja agen bersamaan.

  • peserta([...]) menerima daftar AgentProtocol (disarankan) atau Pelaksana.

  • register_participants([...]) menerima daftar pabrik untuk AgentProtocol (disarankan)

    atau Pabrik pelaksana

  • build() kabel: dispatcher -> fan-out -> peserta -> fan-in -> agregator.

  • with_aggregator(...) mengambil alih agregator default dengan Eksekutor atau panggilan balik.

  • register_aggregator(...) menerima pabrik untuk Pelaksana sebagai agregator kustom.

Penggunaan:


   from agent_framework import ConcurrentBuilder

   # Minimal: use default aggregator (returns list[ChatMessage])
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()

   # With agent factories
   workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()


   # Custom aggregator via callback (sync or async). The callback receives
   # list[AgentExecutorResponse] and its return value becomes the workflow's output.
   def summarize(results: list[AgentExecutorResponse]) -> str:
       return " | ".join(r.agent_run_response.messages[-1].text for r in results)


   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build()


   # Custom aggregator via a factory
   class MyAggregator(Executor):
       @handler
       async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
           await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results))


   workflow = (
       ConcurrentBuilder()
       .register_participants([create_agent1, create_agent2, create_agent3])
       .register_aggregator(lambda: MyAggregator(id="my_aggregator"))
       .build()
   )


   # Enable checkpoint persistence so runs can resume
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

   # Enable request info before aggregation
   workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
Context

Kelas yang berisi konteks apa pun yang harus diberikan ke model AI sebagaimana disediakan oleh ContextProvider.

Setiap ContextProvider memiliki kemampuan untuk memberikan konteksnya sendiri untuk setiap pemanggilan. Kelas Konteks berisi konteks tambahan yang disediakan oleh ContextProvider. Konteks ini akan dikombinasikan dengan konteks yang disediakan oleh penyedia lain sebelum diteruskan ke model AI. Konteks ini per pemanggilan, dan tidak akan disimpan sebagai bagian dari riwayat obrolan.

Buat objek Konteks baru.

ContextProvider

Kelas dasar untuk semua penyedia konteks.

Penyedia konteks adalah komponen yang dapat digunakan untuk meningkatkan manajemen konteks AI. Ini dapat mendengarkan perubahan dalam percakapan dan memberikan konteks tambahan ke model AI tepat sebelum pemanggilan.

Nota

ContextProvider adalah kelas dasar abstrak. Anda harus subkelas dan menerapkannya

metode invoking() untuk membuat penyedia konteks kustom. Idealnya, Anda harus

juga menerapkan metode invoked() dan thread_created() untuk melacak percakapan

negara bagian, tetapi ini bersifat opsional.

DataContent

Mewakili konten data biner dengan jenis media terkait (juga dikenal sebagai jenis MIME).

Penting

Ini untuk data biner yang direpresentasikan sebagai URI data, bukan untuk sumber daya online.

Gunakan UriContent untuk sumber daya online.

Menginisialisasi instans DataContent.

Penting

Ini untuk data biner yang direpresentasikan sebagai URI data, bukan untuk sumber daya online.

Gunakan UriContent untuk sumber daya online.

Default

Representasi runtime cabang default dalam grup switch-case.

Cabang default hanya dipanggil ketika tidak ada predikat kasus lain yang cocok. Dalam praktiknya dijamin ada sehingga perutean tidak pernah menghasilkan target kosong.

Edge

Memodelkan hand-off yang diarahkan secara opsional-kondisional antara dua pelaksana.

Setiap Edge mengambil metadata minimal yang diperlukan untuk memindahkan pesan dari satu pelaksana ke pelaksana lainnya di dalam grafik alur kerja. Ini secara opsional menyematkan predikat boolean yang memutuskan apakah tepi harus diambil pada runtime. Dengan membuat serial tepi ke primitif, kita dapat membangun kembali topologi alur kerja terlepas dari proses Python asli.

Menginisialisasi tepi yang sepenuhnya ditentukan antara dua pelaksana alur kerja.

EdgeDuplicationError

Pengecualian muncul saat tepi duplikat terdeteksi dalam alur kerja.

ErrorContent

Mewakili kesalahan.

Komentar: Biasanya digunakan untuk kesalahan non-fatal, di mana ada yang salah sebagai bagian dari operasi, tetapi operasi masih dapat dilanjutkan.

Menginisialisasi instans ErrorContent.

Executor

Kelas dasar untuk semua pelaksana alur kerja yang memproses pesan dan melakukan komputasi.

Gambaran Umum

Pelaksana adalah blok penyusun dasar alur kerja, mewakili unit pemrosesan individual yang menerima pesan, melakukan operasi, dan menghasilkan output. Setiap pelaksana diidentifikasi secara unik dan dapat menangani jenis pesan tertentu melalui metode handler yang didekorasi.

Sistem Tipe

Pelaksana memiliki sistem jenis kaya yang menentukan kemampuannya:

Jenis Input

Jenis pesan yang dapat diproses oleh eksekutor, ditemukan dari tanda tangan metode handler:


   class MyExecutor(Executor):
       @handler
       async def handle_string(self, message: str, ctx: WorkflowContext) -> None:
           # This executor can handle 'str' input types

Akses melalui properti input_types .

Jenis Output

Jenis pesan yang dapat dikirim eksekutor ke pelaksana lain melalui ctx.send_message():


   class MyExecutor(Executor):
       @handler
       async def handle_data(self, message: str, ctx: WorkflowContext[int | bool]) -> None:
           # This executor can send 'int' or 'bool' messages

Akses melalui properti output_types .

Jenis Output Alur Kerja

Jenis data yang dapat dipancarkan eksekutor sebagai output tingkat alur kerja melalui ctx.yield_output():


   class MyExecutor(Executor):
       @handler
       async def process(self, message: str, ctx: WorkflowContext[int, str]) -> None:
           # Can send 'int' messages AND yield 'str' workflow outputs

Akses melalui properti workflow_output_types .

Penemuan Handler

Pelaksana menemukan kemampuan mereka melalui metode yang didekorasi:

@handler Dekorator

Menandai metode yang memproses pesan masuk:


   class MyExecutor(Executor):
       @handler
       async def handle_text(self, message: str, ctx: WorkflowContext[str]) -> None:
           await ctx.send_message(message.upper())

Intersepsi Permintaan Sub-alur kerja

Gunakan @handler metode untuk mencegat permintaan sub-alur kerja:


   class ParentExecutor(Executor):
       @handler
       async def handle_subworkflow_request(
           self,
           request: SubWorkflowRequestMessage,
           ctx: WorkflowContext[SubWorkflowResponseMessage],
       ) -> None:
           if self.is_allowed(request.domain):
               response = request.create_response(data=True)
               await ctx.send_message(response, target_id=request.executor_id)
           else:
               await ctx.request_info(request.source_event, response_type=request.source_event.response_type)

Jenis Konteks

Metode handler menerima varian WorkflowContext yang berbeda berdasarkan jenis anotasinya:

WorkflowContext (tanpa parameter jenis)

Untuk handler yang hanya melakukan efek samping tanpa mengirim pesan atau menghasilkan output:


   class LoggingExecutor(Executor):
       @handler
       async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
           print(f"Received: {msg}")  # Only logging, no outputs

WorkflowContext[T_Out]

Memungkinkan pengiriman pesan jenis T_Out melalui ctx.send_message():


   class ProcessorExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
           await ctx.send_message(42)  # Can send int messages

WorkflowContext[T_Out, T_W_Out]

Memungkinkan pengiriman pesan (T_Out) dan menghasilkan output alur kerja (T_W_Out):


   class DualOutputExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int, str]) -> None:
           await ctx.send_message(42)  # Send int message
           await ctx.yield_output("done")  # Yield str workflow output

Pelaksana Fungsi

Fungsi sederhana dapat dikonversi menjadi pelaksana menggunakan dekorator @executor :


   @executor
   async def process_text(text: str, ctx: WorkflowContext[str]) -> None:
       await ctx.send_message(text.upper())


   # Or with custom ID:
   @executor(id="text_processor")
   def sync_process(text: str, ctx: WorkflowContext[str]) -> None:
       ctx.send_message(text.lower())  # Sync functions run in thread pool

Komposisi sub-alur kerja

Pelaksana dapat berisi sub-alur kerja menggunakan WorkflowExecutor. Sub-alur kerja dapat membuat permintaan yang dapat disadap alur kerja induk. Lihat Dokumentasi WorkflowExecutor untuk detail tentang pola komposisi alur kerja dan penanganan permintaan/respons.

Pengelolaan Status

Pelaksana dapat berisi status yang bertahan di seluruh eksekusi alur kerja dan titik pemeriksaan. Ambil alih metode on_checkpoint_save dan on_checkpoint_restore untuk menerapkan serialisasi status kustom dan logika pemulihan.

Catatan Implementasi

  • Jangan panggil execute() secara langsung - dipanggil oleh mesin alur kerja
  • Jangan ambil alih execute() - tentukan handler menggunakan dekorator sebagai gantinya
  • Setiap pelaksana harus memiliki setidaknya satu metode @handler
  • Tanda tangan metode handler divalidasi pada waktu inisialisasi

Inisialisasi pelaksana dengan pengidentifikasi unik.

ExecutorCompletedEvent

Peristiwa dipicu ketika handler eksekutor selesai.

Inisialisasi peristiwa pelaksana dengan ID pelaksana dan data opsional.

ExecutorEvent

Kelas dasar untuk peristiwa pelaksana.

Inisialisasi peristiwa pelaksana dengan ID pelaksana dan data opsional.

ExecutorFailedEvent

Peristiwa dipicu ketika handler eksekutor menimbulkan kesalahan.

ExecutorInvokedEvent

Peristiwa dipicu ketika handler eksekutor dipanggil.

Inisialisasi peristiwa pelaksana dengan ID pelaksana dan data opsional.

FanInEdgeGroup

Mewakili sekumpulan tepi konvergensi yang memberi umpan pada satu pelaksana hilir.

Grup fan-in biasanya digunakan ketika beberapa tahap upstream secara independen menghasilkan pesan yang semuanya harus tiba di prosesor hilir yang sama.

Buat pemetaan fan-in yang menggabungkan beberapa sumber ke dalam satu target.

FanOutEdgeGroup

Mewakili grup tepi gaya siaran dengan logika pemilihan opsional.

Fan-out meneruskan pesan yang dihasilkan oleh satu pelaksana sumber ke satu atau beberapa pelaksana hilir. Pada runtime kami dapat mempersempit target lebih lanjut dengan menjalankan selection_func yang memeriksa payload dan mengembalikan subset id yang harus menerima pesan.

Buat pemetaan fan-out dari satu sumber ke banyak target.

FileCheckpointStorage

Penyimpanan titik pemeriksaan berbasis file untuk persistensi.

Menginisialisasi penyimpanan file.

FinishReason

Mewakili alasan respons obrolan selesai.

Inisialisasi FinishReason dengan nilai.

FunctionApprovalRequestContent

Mewakili permintaan persetujuan pengguna dari panggilan fungsi.

Menginisialisasi instans FunctionApprovalRequestContent.

FunctionApprovalResponseContent

Mewakili respons untuk persetujuan pengguna atas panggilan fungsi.

Menginisialisasi instans FunctionApprovalResponseContent.

FunctionCallContent

Mewakili permintaan panggilan fungsi.

Menginisialisasi instans FunctionCallContent.

FunctionExecutor

Pelaksana yang membungkus fungsi yang ditentukan pengguna.

Pelaksana ini memungkinkan pengguna untuk menentukan fungsi sederhana (sinkronisasi dan asinkron) dan menggunakannya sebagai pelaksana alur kerja tanpa perlu membuat kelas pelaksana penuh.

Fungsi sinkron dijalankan dalam kumpulan utas menggunakan asyncio.to_thread() untuk menghindari pemblokiran perulangan peristiwa.

Inisialisasi FunctionExecutor dengan fungsi yang ditentukan pengguna.

FunctionInvocationConfiguration

Konfigurasi untuk pemanggilan fungsi di klien obrolan.

Kelas ini dibuat secara otomatis pada setiap klien obrolan yang mendukung pemanggilan fungsi. Ini berarti bahwa untuk sebagian besar kasus Anda hanya dapat mengubah atribut pada instans, daripada membuat yang baru.

Inisialisasi FunctionInvocationConfiguration.

FunctionInvocationContext

Objek konteks untuk pemanggilan middleware fungsi.

Konteks ini diteruskan melalui alur middleware fungsi dan berisi semua informasi tentang pemanggilan fungsi.

Inisialisasi FunctionInvocationContext.

FunctionMiddleware

Kelas dasar abstrak untuk middleware fungsi yang dapat mencegat pemanggilan fungsi.

Middleware fungsi memungkinkan Anda untuk mencegat dan memodifikasi pemanggilan fungsi/alat sebelum dan sesudah eksekusi. Anda dapat memvalidasi argumen, hasil cache, pemanggilan log, atau mengganti eksekusi fungsi.

Nota

FunctionMiddleware adalah kelas dasar abstrak. Anda harus subkelas dan menerapkannya

metode process() untuk membuat middleware fungsi kustom.

FunctionResultContent

Mewakili hasil panggilan fungsi.

Menginisialisasi instans FunctionResultContent.

GraphConnectivityError

Pengecualian muncul saat masalah konektivitas grafik terdeteksi.

GroupChatBuilder

Penyusun tingkat tinggi untuk alur kerja obrolan grup yang diarahkan manajer dengan orkestrasi dinamis.

GroupChat mengoordinasikan percakapan multi-agen menggunakan manajer yang memilih peserta mana yang berbicara berikutnya. Manajer dapat menjadi fungsi Python sederhana (set_select_speakers_func) atau pemilih berbasis agen melalui set_manager. Kedua pendekatan ini saling eksklusif.

Alur Kerja Inti:

  1. Menentukan peserta: daftar agen (menggunakan nama .name) atau nama pemetaan dict ke agen

  2. Mengonfigurasi pemilihan pembicara: set_select_speakers_func OR

    set_manager (bukan keduanya)

  3. Opsional: atur batas putaran, titik pemeriksaan, kondisi penghentian

  4. Membangun dan menjalankan alur kerja

Pola Pemilihan Pembicara:

Pola 1: Pilihan berbasis fungsi sederhana (disarankan)


   from agent_framework import GroupChatBuilder, GroupChatStateSnapshot


   def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
       # state contains: task, participants, conversation, history, round_index
       if state["round_index"] >= 5:
           return None  # Finish
       last_speaker = state["history"][-1].speaker if state["history"] else None
       if last_speaker == "researcher":
           return "writer"
       return "researcher"


   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher_agent, writer_agent])  # Uses agent.name
       .build()
   )

Pola 2: Pilihan berbasis LLM


   from agent_framework import ChatAgent
   from agent_framework.azure import AzureOpenAIChatClient

   manager_agent = AzureOpenAIChatClient().create_agent(
       instructions="Coordinate the conversation and pick the next speaker.",
       name="Coordinator",
       temperature=0.3,
       seed=42,
       max_tokens=500,
   )

   workflow = (
       GroupChatBuilder()
       .set_manager(manager_agent, display_name="Coordinator")
       .participants([researcher, writer])  # Or use dict: researcher=r, writer=w
       .with_max_rounds(10)
       .build()
   )

Pola 3: Meminta info untuk umpan balik pertengahan percakapan


   from agent_framework import GroupChatBuilder

   # Pause before all participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer])
       .with_request_info()
       .build()
   )

   # Pause only before specific participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer, editor])
       .with_request_info(agents=[editor])  # Only pause before editor responds
       .build()
   )

Spesifikasi Peserta:

Dua cara untuk menentukan peserta:

  • Formulir daftar: [agent1, agent2] - menggunakan atribut agent.name untuk nama peserta
  • Formulir dict: {name1: agent1, name2: agent2} - kontrol nama eksplisit
  • Formulir kata kunci: peserta(name1=agent1, name2=agent2) - kontrol nama eksplisit

Struktur Rekam Jepret Status:

GroupChatStateSnapshot yang diteruskan ke set_select_speakers_func berisi:

  • tugas: ChatMessage - Tugas pengguna asli
  • peserta: dict[str, str] - Pemetaan nama peserta ke deskripsi
  • percakapan: tuple[ChatMessage, ...] - Riwayat percakapan lengkap
  • history: tuple[GroupChatTurn, ...] - Rekaman turn-by-turn dengan atribusi pembicara
  • round_index: int - Jumlah putaran pemilihan manajer sejauh ini
  • pending_agent: str | None - Nama agen yang sedang diproses (jika ada)

Batasan Penting:

  • Tidak dapat menggabungkan set_select_speakers_func dan set_manager
  • Nama peserta harus unik
  • Saat menggunakan formulir daftar, agen harus memiliki atribut nama yang tidak kosong

Inisialisasi GroupChatBuilder.

GroupChatDirective

Instruksi yang dipancarkan oleh implementasi manajer obrolan grup.

HandoffBuilder

Penyusun yang fasih untuk alur kerja handoff percakapan dengan agen koordinator dan spesialis.

Pola handoff memungkinkan agen koordinator untuk merutekan permintaan ke agen spesialis. Mode interaksi mengontrol apakah alur kerja meminta input pengguna setelah setiap respons agen atau selesai secara otonom setelah agen selesai merespons. Kondisi penghentian menentukan kapan alur kerja harus berhenti meminta input dan selesai.

Pola Perutean:

Single-Tier (Default): Hanya koordinator yang bisa menyerahkan ke spesialis. Secara default, setelah spesialis merespons, kontrol kembali ke pengguna untuk input lebih lanjut. Ini menciptakan alur siklus: pengguna - koordinator -> [spesialis opsional] - pengguna ->> koordinator -> ...> Gunakan with_interaction_mode("otonom") untuk melewati permintaan input pengguna tambahan dan menghasilkan percakapan akhir saat agen merespons tanpa mendelegasikan.

Multi-Tingkat (Tingkat Lanjut): Spesialis dapat menyerahkan kepada spesialis lain menggunakan .add_handoff(). Ini memberikan lebih banyak fleksibilitas untuk alur kerja yang kompleks tetapi kurang dapat dikontrol daripada pola tingkat tunggal. Pengguna kehilangan visibilitas real-time ke dalam langkah-langkah menengah selama handoff spesialis-ke-spesialis (meskipun riwayat percakapan lengkap termasuk semua handoff dipertahankan dan dapat diperiksa setelahnya).

Fitur Utama:

  • Deteksi handoff otomatis: Koordinator memanggil alat handoff yang

    argumen (misalnya {"handoff_to": "shipping_agent"}) mengidentifikasi spesialis untuk menerima kontrol.

  • Alat yang dihasilkan secara otomatis: Secara default penyusun mensintesis alat handoff_to_<agent> untuk koordinator, sehingga Anda tidak menentukan fungsi tempat penampung secara manual.

  • Riwayat percakapan lengkap: Seluruh percakapan (termasuk ChatMessage.additional_properties) dipertahankan dan diteruskan ke setiap agen.

  • Kontrol penghentian: Secara default, berakhir setelah 10 pesan pengguna. Ambil alih dengan .with_termination_condition(lambda conv: ...) untuk logika kustom (misalnya, deteksi "selamat tinggal").

  • Mode interaksi: Pilih human_in_loop (default) untuk meminta pengguna di antara agen berbalik, atau otonom untuk terus merutekan kembali ke agen tanpa meminta input pengguna hingga handoff terjadi atau batas penghentian/giliran tercapai (batas giliran otonom default: 50).

  • Titik pemeriksaan: Persistensi opsional untuk alur kerja yang dapat dilanjutkan.

Penggunaan (Single-Tier):


   from agent_framework import HandoffBuilder
   from agent_framework.openai import OpenAIChatClient

   chat_client = OpenAIChatClient()

   # Create coordinator and specialist agents
   coordinator = chat_client.create_agent(
       instructions=(
           "You are a frontline support agent. Assess the user's issue and decide "
           "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is "
           "required, call the matching handoff tool (for example `handoff_to_refund_agent`)."
       ),
       name="coordinator_agent",
   )

   refund = chat_client.create_agent(
       instructions="You handle refund requests. Ask for order details and process refunds.",
       name="refund_agent",
   )

   shipping = chat_client.create_agent(
       instructions="You resolve shipping issues. Track packages and update delivery status.",
       name="shipping_agent",
   )

   # Build the handoff workflow - default single-tier routing
   workflow = (
       HandoffBuilder(
           name="customer_support",
           participants=[coordinator, refund, shipping],
       )
       .set_coordinator(coordinator)
       .build()
   )

   # Run the workflow
   events = await workflow.run_stream("My package hasn't arrived yet")
   async for event in events:
       if isinstance(event, RequestInfoEvent):
           # Request user input
           user_response = input("You: ")
           await workflow.send_response(event.data.request_id, user_response)

Perutean Multi-Tingkat dengan .add_handoff():


   # Enable specialist-to-specialist handoffs with fluent API
   workflow = (
       HandoffBuilder(participants=[coordinator, replacement, delivery, billing])
       .set_coordinator(coordinator)
       .add_handoff(coordinator, [replacement, delivery, billing])  # Coordinator routes to all
       .add_handoff(replacement, [delivery, billing])  # Replacement delegates to delivery/billing
       .add_handoff(delivery, billing)  # Delivery escalates to billing
       .build()
   )

   # Flow: User → Coordinator → Replacement → Delivery → Back to User
   # (Replacement hands off to Delivery without returning to user)

Gunakan Pabrik Peserta untuk Isolasi Status:

Kondisi Penghentian Kustom:


   # Terminate when user says goodbye or after 5 exchanges
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_termination_condition(
           lambda conv: (
               sum(1 for msg in conv if msg.role.value == "user") >= 5
               or any("goodbye" in msg.text.lower() for msg in conv[-2:])
           )
       )
       .build()
   )

Titik pemeriksaan:


   from agent_framework import InMemoryCheckpointStorage

   storage = InMemoryCheckpointStorage()
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_checkpointing(storage)
       .build()
   )

Menginisialisasi HandoffBuilder untuk membuat alur kerja handoff percakapan.

Penyusun dimulai dalam status tidak dikonfigurasi dan mengharuskan Anda untuk memanggil:

  1. .participants([...]) - Mendaftarkan agen
  2. atau .participant_factories({...}) - Mendaftarkan pabrik agen/pelaksana
  3. .set_coordinator(...) - Menunjuk agen mana yang menerima input pengguna awal
  4. .build() - Membangun Alur Kerja akhir

Metode konfigurasi opsional memungkinkan Anda menyesuaikan manajemen konteks, logika penghentian, dan persistensi.

Nota

Peserta harus memiliki nama/id stabil karena alur kerja memetakan

argumen alat handoff untuk pengidentifikasi ini. Nama agen harus cocok

string yang dipancarkan oleh alat handoff koordinator (misalnya, alat yang

output {"handoff_to": "billing"} memerlukan agen bernama billing).

HandoffUserInputRequest

Pesan permintaan dipancarkan saat alur kerja memerlukan input pengguna baru.

Catatan: Bidang percakapan sengaja dikecualikan dari serialisasi titik pemeriksaan untuk mencegah duplikasi. Percakapan dipertahankan dalam status koordinator dan akan direkonstruksi saat pemulihan. Lihat masalah #2667.

HostedCodeInterpreterTool

Mewakili alat yang dihosting yang dapat ditentukan ke layanan AI untuk memungkinkannya menjalankan kode yang dihasilkan.

Alat ini tidak menerapkan interpretasi kode itu sendiri. Ini berfungsi sebagai penanda untuk menginformasikan layanan bahwa layanan diizinkan untuk menjalankan kode yang dihasilkan jika layanan mampu melakukannya.

Inisialisasi HostedCodeInterpreterTool.

HostedFileContent

Mewakili konten file yang dihosting.

Menginisialisasi instans HostedFileContent.

HostedFileSearchTool

Mewakili alat pencarian file yang dapat ditentukan ke layanan AI untuk memungkinkannya melakukan pencarian file.

Menginisialisasi FileSearchTool.

HostedMCPSpecificApproval

Mewakili mode khusus untuk alat yang dihosting.

Saat menggunakan mode ini, pengguna harus menentukan alat mana yang selalu atau tidak pernah memerlukan persetujuan. Ini direpresentasikan sebagai kamus dengan dua kunci opsional:

HostedMCPTool

Mewakili alat MCP yang dikelola dan dijalankan oleh layanan.

Buat alat MCP yang dihosting.

HostedVectorStoreContent

Mewakili konten penyimpanan vektor yang dihosting.

Menginisialisasi instans HostedVectorStoreContent.

HostedWebSearchTool

Mewakili alat pencarian web yang dapat ditentukan ke layanan AI untuk memungkinkannya melakukan pencarian web.

Menginisialisasi HostedWebSearchTool.

InMemoryCheckpointStorage

Penyimpanan titik pemeriksaan dalam memori untuk pengujian dan pengembangan.

Menginisialisasi penyimpanan memori.

InProcRunnerContext

Konteks eksekusi dalam proses untuk eksekusi lokal dan titik pemeriksaan opsional.

Menginisialisasi konteks eksekusi dalam proses.

MCPStdioTool

Alat MCP untuk menyambungkan ke server MCP berbasis stdio.

Kelas ini terhubung ke server MCP yang berkomunikasi melalui input/output standar, biasanya digunakan untuk proses lokal.

Inisialisasi alat stdio MCP.

Nota

Argumen digunakan untuk membuat objek StdioServerParameters,

yang kemudian digunakan untuk membuat klien stdio. Lihat mcp.client.stdio.stdio_client

dan mcp.client.stdio.stdio_server_parameters untuk detail selengkapnya.

MCPStreamableHTTPTool

Alat MCP untuk menyambungkan ke server MCP berbasis HTTP.

Kelas ini terhubung ke server MCP yang berkomunikasi melalui HTTP/SSE yang dapat dialirkan.

Inisialisasi alat HTTP yang dapat dialirkan MCP.

Nota

Argumen digunakan untuk membuat klien HTTP yang dapat dialirkan.

Lihat mcp.client.streamable_http.streamablehttp_client untuk detail selengkapnya.

Setiap argumen tambahan yang diteruskan ke konstruktor akan diteruskan ke

konstruktor klien HTTP yang dapat dialirkan.

MCPWebsocketTool

Alat MCP untuk menyambungkan ke server MCP berbasis WebSocket.

Kelas ini terhubung ke server MCP yang berkomunikasi melalui WebSocket.

Inisialisasi alat MCP WebSocket.

Nota

Argumen digunakan untuk membuat klien WebSocket.

Lihat mcp.client.websocket.websocket_client untuk detail selengkapnya.

Setiap argumen tambahan yang diteruskan ke konstruktor akan diteruskan ke

Konstruktor klien WebSocket.

MagenticBuilder

Penyusun yang fasih untuk membuat alur kerja orkestrasi multi-agen Magentic One.

Alur kerja Magentic One menggunakan manajer bertenaga LLM untuk mengoordinasikan beberapa agen melalui perencanaan tugas dinamis, pelacakan kemajuan, dan replanning adaptif. Manajer membuat rencana, memilih agen, memantau kemajuan, dan menentukan kapan harus melakukan replan atau menyelesaikannya.

Penyusun menyediakan API yang fasih untuk mengonfigurasi peserta, manajer, tinjauan paket opsional, titik pemeriksaan, dan panggilan balik peristiwa.

Dukungan Human-in-the-loop: Magentic menyediakan mekanisme HITL khusus melalui:

  • .with_plan_review() - Meninjau dan menyetujui/merevisi rencana sebelum eksekusi

  • .with_human_input_on_stall() - Intervensi saat alur kerja mengintai

  • Persetujuan alat melalui FunctionApprovalRequestContent - Menyetujui panggilan alat individual

Ini memancarkan peristiwa MagenticHumanInterventionRequest yang menyediakan opsi keputusan terstruktur (SETUJUI, REVISE, LANJUTKAN, REPLAN, PANDUAN) yang sesuai untuk orkestrasi berbasis perencanaan Magentic.

Penggunaan:


   from agent_framework import MagenticBuilder, StandardMagenticManager
   from azure.ai.projects.aio import AIProjectClient

   # Create manager with LLM client
   project_client = AIProjectClient.from_connection_string(...)
   chat_client = project_client.inference.get_chat_completions_client()

   # Build Magentic workflow with agents
   workflow = (
       MagenticBuilder()
       .participants(researcher=research_agent, writer=writing_agent, coder=coding_agent)
       .with_standard_manager(chat_client=chat_client, max_round_count=20, max_stall_count=3)
       .with_plan_review(enable=True)
       .with_checkpointing(checkpoint_storage)
       .build()
   )

   # Execute workflow
   async for message in workflow.run("Research and write article about AI agents"):
       print(message.text)

Dengan manajer kustom:


   # Create custom manager subclass
   class MyCustomManager(MagenticManagerBase):
       async def plan(self, context: MagenticContext) -> ChatMessage:
           # Custom planning logic
           ...


   manager = MyCustomManager()
   workflow = MagenticBuilder().participants(agent1=agent1, agent2=agent2).with_standard_manager(manager).build()
MagenticContext

Konteks untuk manajer Magentic.

MagenticManagerBase

Kelas dasar untuk manajer Magentic One.

ManagerDirectiveModel

Model Pydantic untuk output arahan manajer terstruktur.

Buat model baru dengan mengurai dan memvalidasi data input dari argumen kata kunci.

Menaikkan [ValidationError][pydantic_core. ValidationError] jika data input tidak dapat divalidasi untuk membentuk model yang valid.

diri sendiri secara eksplisit hanya posisional untuk memungkinkan diri sebagai nama bidang.

ManagerSelectionRequest

Permintaan dikirim ke agen manajer untuk pemilihan pembicara berikutnya.

Dataclass ini mengemas status percakapan lengkap dan konteks tugas bagi agen manajer untuk menganalisis dan membuat keputusan pemilihan pembicara.

ManagerSelectionResponse

Respons dari agen manajer dengan keputusan pemilihan pembicara.

Agen manajer harus menghasilkan struktur ini (atau dict/JSON yang kompatibel) untuk mengomunikasikan keputusannya kembali ke orkestrator.

Buat model baru dengan mengurai dan memvalidasi data input dari argumen kata kunci.

Menaikkan [ValidationError][pydantic_core. ValidationError] jika data input tidak dapat divalidasi untuk membentuk model yang valid.

diri sendiri secara eksplisit hanya posisional untuk memungkinkan diri sebagai nama bidang.

Message

Kelas yang mewakili pesan dalam alur kerja.

OrchestrationState

Kontainer status terpadu untuk titik pemeriksaan orkestrator.

Kelas data ini menstandarkan serialisasi titik pemeriksaan di ketiga pola obrolan grup sambil memungkinkan ekstensi khusus pola melalui metadata.

Atribut umum mencakup masalah orkestrasi bersama (tugas, percakapan, pelacakan bulat). Status khusus pola masuk ke dikte metadata.

RequestInfoEvent

Peristiwa dipicu saat pelaksana alur kerja meminta informasi eksternal.

Menginisialisasi peristiwa info permintaan.

RequestInfoInterceptor

Pelaksana internal yang menjeda alur kerja untuk input manusia sebelum agen berjalan.

Pelaksana ini dimasukkan ke dalam grafik alur kerja oleh penyusun saat .with_request_info() dipanggil. Ini mencegat pesan AgentExecutorRequest SEBELUM agen berjalan dan menjeda alur kerja melalui ctx.request_info() dengan AgentInputRequest.

Saat respons diterima, handler respons menyuntikkan input sebagai pesan pengguna ke dalam percakapan dan meneruskan permintaan ke agen.

Parameter agent_filter opsional memungkinkan pembatasan agen mana yang memicu jeda. Jika ID agen target tidak berada dalam set filter, permintaan akan diteruskan tanpa jeda.

Menginisialisasi eksekutor pencegat info permintaan.

Role

Menjelaskan tujuan yang dimaksudkan dari pesan dalam interaksi obrolan.

Properti: SYSTEM: Peran yang menginstruksikan atau mengatur perilaku sistem AI. PENGGUNA: Peran yang menyediakan input pengguna untuk interaksi obrolan. ASISTEN: Peran yang memberikan respons terhadap input yang diinstruksikan sistem dan diminta pengguna. ALAT: Peran yang menyediakan informasi dan referensi tambahan sebagai respons terhadap permintaan penggunaan alat.

Menginisialisasi Peran dengan nilai.

Runner

Kelas untuk menjalankan alur kerja di supersteps Pregel.

Inisialisasi runner dengan tepi, status bersama, dan konteks.

RunnerContext

Protokol untuk konteks eksekusi yang digunakan oleh runner.

Konteks tunggal yang mendukung olahpesan, peristiwa, dan titik pemeriksaan opsional. Jika penyimpanan titik pemeriksaan tidak dikonfigurasi, metode titik pemeriksaan dapat muncul.

SequentialBuilder

Penyusun tingkat tinggi untuk alur kerja agen/pelaksana berurutan dengan konteks bersama.

  • peserta([...]) menerima daftar instans AgentProtocol (disarankan) atau Pelaksana

  • register_participants([...]) menerima daftar pabrik untuk AgentProtocol (disarankan)

    atau Pabrik pelaksana

  • Pelaksana harus menentukan handler yang menggunakan daftar[ChatMessage] dan mengirimkan daftar[ChatMessage]

  • Alur kerja menghubungkan peserta secara berurutan, meneruskan daftar[ChatMessage] ke bawah rantai

  • Agen menambahkan pesan asisten mereka ke percakapan

  • Pelaksana kustom dapat mengubah/meringkas dan mengembalikan daftar[ChatMessage]

  • Output akhir adalah percakapan yang dihasilkan oleh peserta terakhir

Penggunaan:


   from agent_framework import SequentialBuilder

   # With agent instances
   workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

   # With agent factories
   workflow = (
       SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
   )

   # Enable checkpoint persistence
   workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

   # Enable request info for mid-workflow feedback (pauses before each agent)
   workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()

   # Enable request info only for specific agents
   workflow = (
       SequentialBuilder()
       .participants([agent1, agent2, agent3])
       .with_request_info(agents=[agent2])  # Only pause before agent2
       .build()
   )
SharedState

Kelas untuk mengelola status bersama dalam alur kerja.

SharedState menyediakan akses aman utas ke data status alur kerja yang perlu dibagikan di seluruh pelaksana selama eksekusi alur kerja.

Kunci yang Dipesan: Kunci berikut dicadangkan untuk penggunaan kerangka kerja internal dan tidak boleh dimodifikasi oleh kode pengguna:

  • _executor_state: Menyimpan status eksekutor untuk titik pemeriksaan (dikelola oleh Runner)

Peringatan

Jangan gunakan kunci yang dimulai dengan garis bawah (_) karena dapat dicadangkan untuk

operasi kerangka kerja internal.

Menginisialisasi status bersama.

SingleEdgeGroup

Pembungkus kenyamanan untuk tepi soliter, menjaga seragam API grup.

Buat grup tepi satu ke satu di antara dua pelaksana.

StandardMagenticManager

Manajer Magentic Standar yang melakukan panggilan LLM nyata melalui ChatAgent.

Manajer membuat permintaan yang mencerminkan orkestrasi Magentic One asli:

  • Pengumpulan fakta
  • Pembuatan rencana
  • Ledger kemajuan di JSON
  • Pembaruan fakta dan pembaruan paket saat reset
  • Sintesis jawaban akhir

Inisialisasi Manajer Magentik Standar.

SubWorkflowRequestMessage

Pesan yang dikirim dari sub-alur kerja ke pelaksana dalam alur kerja induk untuk meminta informasi.

Pesan ini membungkus RequestInfoEvent yang dipancarkan oleh pelaksana dalam sub-alur kerja.

SubWorkflowResponseMessage

Pesan yang dikirim dari alur kerja induk ke sub-alur kerja melalui WorkflowExecutor untuk memberikan informasi yang diminta.

Pesan ini membungkus data respons bersama dengan RequestInfoEvent asli yang dipancarkan oleh eksekutor sub-alur kerja.

SuperStepCompletedEvent

Peristiwa dipicu saat superstep berakhir.

Inisialisasi peristiwa superstep.

SuperStepStartedEvent

Peristiwa dipicu saat superstep dimulai.

Inisialisasi peristiwa superstep.

SwitchCaseEdgeGroup

Varian fan-out yang meniadakan alur kontrol sakelar/kasus tradisional.

Setiap kasus memeriksa payload pesan dan memutuskan apakah harus menangani pesan. Tepat satu kasus atau cabang default mengembalikan target pada runtime, mempertahankan semantik pengiriman tunggal.

Konfigurasikan struktur perutean switch/case untuk satu pelaksana sumber.

SwitchCaseEdgeGroupCase

Deskripsi yang dapat dipertahankan dari satu cabang kondisional dalam switch-case.

Tidak seperti objek Kasus runtime varian serialisable ini hanya menyimpan pengidentifikasi target dan nama deskriptif untuk predikat. Ketika panggilan yang mendasar tidak tersedia selama deserialisasi, kami mengganti tempat penampung proksi yang gagal dengan keras, memastikan dependensi yang hilang segera terlihat.

Rekam metadata perutean untuk cabang kasus bersyar.

SwitchCaseEdgeGroupDefault

Deskriptor yang dapat dipertahankan untuk cabang fallback dari grup switch-case.

Cabang default dijamin ada dan dipanggil ketika setiap predikat kasus lainnya gagal mencocokkan payload.

Arahkan cabang default ke pengidentifikasi pelaksana yang diberikan.

TextContent

Mewakili konten teks dalam obrolan.

Menginisialisasi instans TextContent.

TextReasoningContent

Mewakili konten penalaran teks dalam obrolan.

Komentar: Kelas ini dan TextContent sangat mirip, tetapi berbeda.

Menginisialisasi instans TextReasoningContent.

TextSpanRegion

Mewakili wilayah teks yang telah diannotasi.

Menginisialisasi TextSpanRegion.

ToolMode

Menentukan apakah dan bagaimana alat digunakan dalam permintaan obrolan.

Inisialisasi ToolMode.

ToolProtocol

Mewakili alat generik.

Protokol ini mendefinisikan antarmuka yang harus diterapkan semua alat agar kompatibel dengan kerangka kerja agen. Ini diimplementasikan oleh berbagai kelas alat seperti HostedMCPTool, HostedWebSearchTool, dan AIFunction. AIFunction biasanya dibuat oleh dekorator ai_function .

Karena setiap konektor perlu mengurai alat secara berbeda, pengguna dapat meneruskan dikte untuk menentukan alat khusus layanan ketika tidak ada abstraksi yang tersedia.

TypeCompatibilityError

Pengecualian dimunculkan ketika ketidaksesuaian jenis terdeteksi di antara pelaksana yang terhubung.

UriContent

Mewakili konten URI.

Penting

Ini digunakan untuk konten yang diidentifikasi oleh URI, seperti gambar atau file.

Untuk URI data (biner), gunakan DataContent sebagai gantinya.

Menginisialisasi instans UriContent.

Keterangan: Ini digunakan untuk konten yang diidentifikasi oleh URI, seperti gambar atau file. Untuk URI data (biner), gunakan DataContent sebagai gantinya.

UsageContent

Mewakili informasi penggunaan yang terkait dengan permintaan dan respons obrolan.

Menginisialisasi instans UsageContent.

UsageDetails

Menyediakan detail penggunaan tentang permintaan/respons.

Menginisialisasi instans UsageDetails.

Workflow

Mesin eksekusi berbasis grafik yang mengatur eksekutor yang terhubung.

Gambaran Umum

Alur kerja menjalankan grafik eksekutor yang diarahkan yang terhubung melalui grup tepi menggunakan model seperti Pregel, berjalan di supersteps hingga grafik menjadi menganggur. Alur kerja dibuat menggunakan kelas WorkflowBuilder - jangan membuat instans kelas ini secara langsung.

Model Eksekusi

Pelaksana berjalan di supersteps yang disinkronkan di mana setiap pelaksana:

  • Dipanggil saat menerima pesan dari grup tepi yang tersambung
  • Dapat mengirim pesan ke pelaksana hilir melalui ctx.send_message()
  • Dapat menghasilkan output tingkat alur kerja melalui ctx.yield_output()
  • Dapat memancarkan peristiwa kustom melalui ctx.add_event()

Pesan antar eksekutor dikirimkan di akhir setiap superstep dan tidak terlihat di aliran peristiwa. Hanya peristiwa tingkat alur kerja (output, peristiwa kustom) dan peristiwa status yang dapat diamati oleh pemanggil.

Jenis Input/Output

Jenis alur kerja ditemukan saat runtime dengan memeriksa:

  • Jenis input: Dari jenis input pelaksana awal
  • Jenis output: Penyatuan semua jenis output alur kerja pelaksana Mengakses ini melalui properti input_types dan output_types.

Metode Eksekusi

Alur kerja menyediakan dua API eksekusi utama, masing-masing mendukung beberapa skenario:

  • run(): Jalankan ke penyelesaian, mengembalikan WorkflowRunResult dengan semua peristiwa

  • run_stream(): Mengembalikan peristiwa menghasilkan generator asinkron saat terjadi

Kedua metode mendukung:

  • Alur kerja awal berjalan: Berikan parameter pesan
  • Pemulihan titik pemeriksaan: Menyediakan checkpoint_id (dan secara opsional checkpoint_storage)
  • Kelanjutan HIL: Berikan respons untuk melanjutkan setelah permintaan RequestInfoExecutor
  • Titik pemeriksaan runtime: Menyediakan checkpoint_storage untuk mengaktifkan/menimpa titik pemeriksaan untuk eksekusi ini

Pengelolaan Status

Instans alur kerja berisi status dan status dipertahankan di seluruh panggilan untuk dijalankan dan run_stream. Untuk menjalankan beberapa eksekusi independen, buat instans Alur Kerja terpisah melalui WorkflowBuilder.

Permintaan Input Eksternal

Pelaksana dalam alur kerja dapat meminta input eksternal menggunakan ctx.request_info():

  1. Pelaksana memanggil ctx.request_info() untuk meminta input
  2. Pelaksana mengimplementasikan response_handler() untuk memproses respons
  3. Permintaan dipancarkan sebagai instans RequestInfoEvent dalam aliran peristiwa
  4. Alur kerja memasuki status IDLE_WITH_PENDING_REQUESTS
  5. Penelepon menangani permintaan dan memberikan respons melalui metode send_responses atau send_responses_streaming
  6. Respons dirutekan ke pelaksana yang meminta dan penangan respons dipanggil

Titik Pemeriksaan

Titik pemeriksaan dapat dikonfigurasi pada waktu build atau runtime:

Build-time (melalui WorkflowBuilder): alur kerja = WorkflowBuilder().with_checkpointing(storage).build()

Runtime (melalui parameter run/run_stream): result = await workflow.run(message, checkpoint_storage=runtime_storage)

Saat diaktifkan, titik pemeriksaan dibuat di akhir setiap superstep, menangkap:

  • Status pelaksana
  • Pesan saat transit
  • Alur Kerja status bersama dapat dijeda dan dilanjutkan di seluruh proses dimulai ulang menggunakan penyimpanan titik pemeriksaan.

Komposisi

Alur kerja dapat ditumpuk menggunakan WorkflowExecutor, yang membungkus alur kerja anak sebagai pelaksana. Jenis input/output alur kerja berlapis menjadi bagian dari jenis WorkflowExecutor. Saat dipanggil, WorkflowExecutor menjalankan alur kerja berlapis untuk menyelesaikan dan memproses outputnya.

Menginisialisasi alur kerja dengan daftar tepi.

WorkflowAgent

Subkelas Agen yang membungkus alur kerja dan mengeksposnya sebagai agen.

Inisialisasi WorkflowAgent.

WorkflowBuilder

Kelas penyusun untuk membangun alur kerja.

Kelas ini menyediakan API yang fasih untuk menentukan grafik alur kerja dengan menghubungkan pelaksana dengan tepi dan mengonfigurasi parameter eksekusi. Panggilan build untuk membuat instans yang tidak dapat diubah Workflow .

Inisialisasi WorkflowBuilder dengan daftar tepi kosong dan tidak ada pelaksana awal.

WorkflowCheckpoint

Mewakili titik pemeriksaan lengkap status alur kerja.

Titik pemeriksaan menangkap status eksekusi penuh alur kerja pada titik tertentu, memungkinkan alur kerja dijeda dan dilanjutkan.

Nota

Dict shared_state mungkin berisi kunci cadangan yang dikelola oleh kerangka kerja.

Lihat dokumentasi kelas SharedState untuk detail tentang kunci yang dipesan.

WorkflowCheckpointSummary

Ringkasan titik pemeriksaan alur kerja yang dapat dibaca manusia.

WorkflowContext

Konteks eksekusi yang memungkinkan pelaksana berinteraksi dengan alur kerja dan pelaksana lainnya.

Gambaran Umum

WorkflowContext menyediakan antarmuka terkontrol bagi pelaksana untuk mengirim pesan, menghasilkan output, mengelola status, dan berinteraksi dengan ekosistem alur kerja yang lebih luas. Ini memberlakukan keamanan jenis melalui parameter generik sambil mencegah akses langsung ke komponen runtime internal.

Jenis parameter

Konteks diparameterkan untuk memberlakukan keamanan jenis untuk operasi yang berbeda:

WorkflowContext (tanpa parameter)

Untuk pelaksana yang hanya melakukan efek samping tanpa mengirim pesan atau menghasilkan output:


   async def log_handler(message: str, ctx: WorkflowContext) -> None:
       print(f"Received: {message}")  # Only side effects

WorkflowContext[T_Out]

Memungkinkan pengiriman pesan jenis T_Out ke pelaksana lain:


   async def processor(message: str, ctx: WorkflowContext[int]) -> None:
       result = len(message)
       await ctx.send_message(result)  # Send int to downstream executors

WorkflowContext[T_Out, T_W_Out]

Memungkinkan pengiriman pesan (T_Out) dan menghasilkan output alur kerja (T_W_Out):


   async def dual_output(message: str, ctx: WorkflowContext[int, str]) -> None:
       await ctx.send_message(42)  # Send int message
       await ctx.yield_output("complete")  # Yield str workflow output

Jenis Union

Beberapa jenis dapat ditentukan menggunakan notasi gabungan:


   async def flexible(message: str, ctx: WorkflowContext[int | str, bool | dict]) -> None:
       await ctx.send_message("text")  # or send 42
       await ctx.yield_output(True)  # or yield {"status": "done"}

Inisialisasi konteks pelaksana dengan konteks alur kerja yang diberikan.

WorkflowErrorDetails

Informasi kesalahan terstruktur untuk muncul dalam peristiwa/hasil kesalahan.

WorkflowEvent

Kelas dasar untuk peristiwa alur kerja.

Menginisialisasi peristiwa alur kerja dengan data opsional.

WorkflowExecutor

Pelaksana yang membungkus alur kerja untuk mengaktifkan komposisi alur kerja hierarkis.

Gambaran Umum

WorkflowExecutor membuat alur kerja berperilaku sebagai pelaksana tunggal dalam alur kerja induk, memungkinkan arsitektur alur kerja berlapis. Ini menangani siklus hidup lengkap eksekusi sub-alur kerja termasuk pemrosesan peristiwa, penerusan output, dan koordinasi permintaan/respons antara alur kerja induk dan anak.

Model Eksekusi

Saat dipanggil, WorkflowExecutor:

  1. Memulai alur kerja yang dibungkus dengan pesan input
  2. Menjalankan sub-alur kerja ke penyelesaian atau hingga memerlukan input eksternal
  3. Memproses aliran peristiwa lengkap sub-alur kerja setelah eksekusi
  4. Meneruskan output ke alur kerja induk sebagai pesan
  5. Menangani permintaan eksternal dengan merutekannya ke alur kerja induk
  6. Mengakumulasi respons dan melanjutkan eksekusi sub-alur kerja

Pemrosesan Aliran Peristiwa

WorkflowExecutor memproses peristiwa setelah penyelesaian sub-alur kerja:

Penerusan Output

Semua output dari sub-alur kerja secara otomatis diteruskan ke induk:

Ketika allow_direct_output False (default):


   # An executor in the sub-workflow yields outputs
   await ctx.yield_output("sub-workflow result")

   # WorkflowExecutor forwards to parent via ctx.send_message()
   # Parent receives the output as a regular message

Ketika allow_direct_output True:

Koordinasi Permintaan/Respons

Saat sub-alur kerja memerlukan informasi eksternal:


   # An executor in the sub-workflow makes request
   request = MyDataRequest(query="user info")

   # WorkflowExecutor captures RequestInfoEvent and wraps it in a SubWorkflowRequestMessage
   # then send it to the receiving executor in parent workflow. The executor in parent workflow
   # can handle the request locally or forward it to an external source.
   # The WorkflowExecutor tracks the pending request, and implements a response handler.
   # When the response is received, it executes the response handler to accumulate responses
   # and resume the sub-workflow when all expected responses are received.
   # The response handler expects a SubWorkflowResponseMessage wrapping the response data.

Pengelolaan Status

WorkflowExecutor mempertahankan status eksekusi di seluruh siklus permintaan/respons:

  • Melacak permintaan yang tertunda berdasarkan request_id
  • Mengakumulasi respons hingga semua respons yang diharapkan diterima
  • Melanjutkan eksekusi sub-alur kerja dengan batch respons lengkap
  • Menangani eksekusi bersamaan dan beberapa permintaan yang tertunda

Ketik Integrasi Sistem

WorkflowExecutor mewarisi tanda tangan jenisnya dari alur kerja yang dibungkus:

Jenis Input

Cocok dengan jenis input pelaksana mulai alur kerja yang dibungkus:


   # If sub-workflow accepts str, WorkflowExecutor accepts str
   workflow_executor = WorkflowExecutor(my_workflow, id="wrapper")
   assert workflow_executor.input_types == my_workflow.input_types

Jenis Output

Menggabungkan output sub-alur kerja dengan jenis koordinasi permintaan:


   # Includes all sub-workflow output types
   # Plus SubWorkflowRequestMessage if sub-workflow can make requests
   output_types = workflow.output_types + [SubWorkflowRequestMessage]  # if applicable

Penanganan Kesalahan

WorkflowExecutor menyebarkan kegagalan sub-alur kerja:

  • Mengambil WorkflowFailedEvent dari sub-alur kerja
  • Mengonversi ke WorkflowErrorEvent dalam konteks induk
  • Menyediakan informasi kesalahan terperinci termasuk ID sub-alur kerja

Dukungan Eksekusi Bersamaan

WorkflowExecutor sepenuhnya mendukung beberapa eksekusi sub-alur kerja bersamaan:

Isolasi Status Per-Execution

Setiap pemanggilan sub-alur kerja membuat ExecutionContext terisolasi:


   # Multiple concurrent invocations are supported
   workflow_executor = WorkflowExecutor(my_workflow, id="concurrent_executor")

   # Each invocation gets its own execution context
   # Execution 1: processes input_1 independently
   # Execution 2: processes input_2 independently
   # No state interference between executions

Koordinasi Permintaan/Respons

Respons dirutekan dengan benar ke eksekusi asal:

  • Setiap eksekusi melacak permintaannya sendiri yang tertunda dan respons yang diharapkan
  • Pemetaan permintaan ke eksekusi memastikan respons mencapai sub-alur kerja yang benar
  • Akumulasi respons diisolasi per eksekusi
  • Pembersihan otomatis saat eksekusi selesai

Manajemen Memori

  • Eksekusi bersamaan tanpa batas didukung
  • Setiap eksekusi memiliki identifikasi berbasis UUID yang unik
  • Pembersihan konteks eksekusi yang telah selesai
  • Manajemen status aman utas untuk akses bersamaan

Pertimbangan Penting

Instans Alur Kerja Bersama: Semua eksekusi bersamaan menggunakan instans alur kerja yang mendasar yang sama. Untuk isolasi yang tepat, pastikan bahwa alur kerja yang dibungkus dan pelaksananya tidak memiliki status.


   # Avoid: Stateful executor with instance variables
   class StatefulExecutor(Executor):
       def __init__(self):
           super().__init__(id="stateful")
           self.data = []  # This will be shared across concurrent executions!

Integrasi dengan Alur Kerja Induk

Alur kerja induk dapat mencegat permintaan sub-alur kerja:

Catatan Implementasi

  • Sub-alur kerja berjalan hingga selesai sebelum memproses hasilnya
  • Pemrosesan peristiwa bersifat atom - semua output diteruskan sebelum permintaan
  • Akumulasi respons memastikan sub-alur kerja menerima batch respons lengkap
  • Status eksekusi dipertahankan untuk dimulai kembali yang tepat setelah permintaan eksternal
  • Eksekusi bersamaan sepenuhnya terisolasi dan tidak mengganggu satu sama lain

Inisialisasi WorkflowExecutor.

WorkflowFailedEvent

Peristiwa siklus hidup bawaan yang dipancarkan saat alur kerja berjalan berakhir dengan kesalahan.

WorkflowOutputEvent

Peristiwa dipicu ketika pelaksana alur kerja menghasilkan output.

Menginisialisasi peristiwa output alur kerja.

WorkflowRunResult

Kontainer untuk peristiwa yang dihasilkan selama eksekusi alur kerja non-streaming.

Gambaran Umum

Mewakili hasil eksekusi lengkap dari eksekusi alur kerja, yang berisi semua peristiwa yang dihasilkan dari status awal hingga diam. Alur kerja menghasilkan output secara bertahap melalui panggilan ctx.yield_output() selama eksekusi.

Struktur Peristiwa

Mempertahankan pemisahan antara data-plane dan peristiwa sarana kontrol:

  • Peristiwa bidang data: Pemanggilan pelaksana, penyelesaian, output, dan permintaan (dalam daftar utama)
  • Peristiwa sarana kontrol: Garis waktu status dapat diakses melalui metode status_timeline()

Metode Kunci

  • get_outputs(): Mengekstrak semua output alur kerja dari eksekusi
  • get_request_info_events(): Mengambil permintaan input eksternal yang dibuat selama eksekusi
  • get_final_state(): Dapatkan status alur kerja akhir (IDLE, IDLE_WITH_PENDING_REQUESTS, dll.)
  • status_timeline(): Mengakses riwayat peristiwa status lengkap
WorkflowStartedEvent

Peristiwa siklus hidup bawaan yang dipancarkan saat alur kerja berjalan dimulai.

Menginisialisasi peristiwa alur kerja dengan data opsional.

WorkflowStatusEvent

Peristiwa siklus hidup bawaan yang dipancarkan untuk transisi status eksekusi alur kerja.

Inisialisasi peristiwa status alur kerja dengan status baru dan data opsional.

WorkflowValidationError

Pengecualian dasar untuk kesalahan validasi alur kerja.

WorkflowViz

Kelas untuk memvisualisasikan alur kerja menggunakan graphviz dan Mermaid.

Inisialisasi WorkflowViz dengan alur kerja.

Enum

MagenticHumanInterventionDecision

Opsi keputusan untuk respons intervensi manusia.

MagenticHumanInterventionKind

Jenis intervensi manusia yang diminta.

ValidationTypeEnum

Enumerasi jenis validasi alur kerja.

WorkflowEventSource

Mengidentifikasi apakah peristiwa alur kerja berasal dari kerangka kerja atau pelaksana.

Gunakan FRAMEWORK untuk peristiwa yang dipancarkan oleh jalur orkestrasi bawaan—bahkan ketika kode yang meningkatkannya hidup dalam modul terkait runner—dan PELAKSANA untuk peristiwa yang muncul oleh implementasi pelaksana yang disediakan pengembang.

WorkflowRunState

Status tingkat eksekusi alur kerja.

Semantik:

  • DIMULAI: Eksekusi telah dimulai dan konteks alur kerja telah dibuat. Ini adalah status awal sebelum pekerjaan yang bermakna dilakukan. Dalam basis kode ini kami memancarkan WorkflowStartedEvent khusus untuk telemetri, dan biasanya memajukan status langsung ke IN_PROGRESS. Konsumen mungkin masih mengandalkan START untuk mesin status yang membutuhkan fase pra-kerja eksplisit.

  • IN_PROGRESS: Alur kerja secara aktif dijalankan (misalnya, pesan awal telah dikirimkan ke pelaksana awal atau superstep sedang berjalan). Status ini dipancarkan di awal eksekusi dan dapat diikuti oleh status lain saat eksekusi berlangsung.

  • IN_PROGRESS_PENDING_REQUESTS: Eksekusi aktif sementara satu atau beberapa operasi permintaan informasi luar biasa. Pekerjaan baru mungkin masih dijadwalkan saat permintaan sedang dalam penerbangan.

  • IDLE: Alur kerja berhenti tanpa permintaan yang luar biasa dan tidak ada lagi pekerjaan yang harus dilakukan. Ini adalah status terminal normal untuk alur kerja yang telah selesai dieksekusi, berpotensi menghasilkan output di sepanjang jalan.

  • IDLE_WITH_PENDING_REQUESTS: Alur kerja dijeda menunggu input eksternal (misalnya, memancarkan RequestInfoEvent). Ini adalah status non-terminal; alur kerja dapat dilanjutkan ketika respons disediakan.

  • GAGAL: Status terminal menunjukkan kesalahan muncul. Disertai dengan WorkflowFailedEvent dengan detail kesalahan terstruktur.

  • DIBATALKAN: Status terminal yang menunjukkan eksekusi dibatalkan oleh penelepon atau orkestrator. Saat ini tidak dipancarkan oleh jalur pelari default tetapi disertakan untuk integrator/orkestrator yang mendukung pembatalan.

Fungsi

agent_middleware

Dekorator untuk menandai fungsi sebagai middleware agen.

Dekorator ini secara eksplisit mengidentifikasi fungsi sebagai middleware agen, yang memproses objek AgentRunContext.

agent_middleware(func: Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]

Parameter

Nama Deskripsi
func
Diperlukan

Fungsi middleware untuk menandai sebagai middleware agen.

Mengembalikan

Jenis Deskripsi

Fungsi yang sama dengan penanda middleware agen.

Contoh


   from agent_framework import agent_middleware, AgentRunContext, ChatAgent


   @agent_middleware
   async def logging_middleware(context: AgentRunContext, next):
       print(f"Before: {context.agent.name}")
       await next(context)
       print(f"After: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

ai_function

Hiasi fungsi untuk mengubahnya menjadi AIFunction yang dapat diteruskan ke model dan dijalankan secara otomatis.

Dekorator ini membuat model Pydantic dari tanda tangan fungsi, yang akan digunakan untuk memvalidasi argumen yang diteruskan ke fungsi dan untuk menghasilkan skema JSON untuk parameter fungsi.

Untuk menambahkan deskripsi ke parameter, gunakan Annotated jenis dari typing dengan deskripsi string sebagai argumen kedua. Anda juga dapat menggunakan kelas Pydantic Field untuk konfigurasi yang lebih canggih.

Nota

Ketika approval_mode diatur ke "always_require", fungsi tidak akan dijalankan

hingga persetujuan eksplisit diberikan, ini hanya berlaku untuk alur pemanggilan otomatis.

Penting juga untuk dicatat bahwa jika model mengembalikan beberapa panggilan fungsi, beberapa yang memerlukan persetujuan

Dan orang lain yang tidak meminta persetujuan untuk semuanya.

ai_function(func: Callable[[...], ReturnT | Awaitable[ReturnT]] | None = None, *, name: str | None = None, description: str | None = None, approval_mode: Literal['always_require', 'never_require'] | None = None, max_invocations: int | None = None, max_invocation_exceptions: int | None = None, additional_properties: dict[str, Any] | None = None) -> AIFunction[Any, ReturnT] | Callable[[Callable[[...], ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]

Parameter

Nama Deskripsi
func
Callable[[...], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]] | None

Fungsi untuk menghias.

Nilai default: None
name
Diperlukan
str | None
description
Diperlukan
str | None
approval_mode
Diperlukan
Literal['always_require', 'never_require'] | None
max_invocations
Diperlukan
int | None
max_invocation_exceptions
Diperlukan
int | None
additional_properties
Diperlukan

Parameter Kata Kunci-Saja

Nama Deskripsi
name

Nama fungsi. Jika tidak disediakan, atribut fungsi __name__ akan digunakan.

Nilai default: None
description

Deskripsi fungsi. Jika tidak disediakan, docstring fungsi akan digunakan.

Nilai default: None
approval_mode

Apakah persetujuan diperlukan atau tidak untuk menjalankan alat ini. Defaultnya adalah persetujuan tidak diperlukan.

Nilai default: None
max_invocations

Frekuensi maksimum fungsi ini dapat dipanggil. Jika Tidak Ada, tidak ada batasan, harus setidaknya 1.

Nilai default: None
max_invocation_exceptions

Jumlah maksimum pengecualian yang diizinkan selama pemanggilan. Jika Tidak Ada, tidak ada batasan, harus setidaknya 1.

Nilai default: None
additional_properties

Properti tambahan untuk diatur pada fungsi .

Nilai default: None

Mengembalikan

Jenis Deskripsi
AIFunction[Any, <xref:agent_framework._tools.ReturnT>] | Callable[[Callable[[…], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]]], AIFunction[Any, <xref:agent_framework._tools.ReturnT>]]

Contoh


   from agent_framework import ai_function
   from typing import Annotated


   @ai_function
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # the same function but with approval required to run
   @ai_function(approval_mode="always_require")
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # With custom name and description
   @ai_function(name="custom_weather", description="Custom weather function")
   def another_weather_func(location: str) -> str:
       return f"Weather in {location}"


   # Async functions are also supported
   @ai_function
   async def async_get_weather(location: str) -> str:
       '''Get weather asynchronously.'''
       # Simulate async operation
       return f"Weather in {location}"

chat_middleware

Dekorator untuk menandai fungsi sebagai middleware obrolan.

Dekorator ini secara eksplisit mengidentifikasi fungsi sebagai middleware obrolan, yang memproses objek ChatContext.

chat_middleware(func: Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]

Parameter

Nama Deskripsi
func
Diperlukan

Fungsi middleware untuk menandai sebagai middleware obrolan.

Mengembalikan

Jenis Deskripsi

Fungsi yang sama dengan penanda middleware obrolan.

Contoh


   from agent_framework import chat_middleware, ChatContext, ChatAgent


   @chat_middleware
   async def logging_middleware(context: ChatContext, next):
       print(f"Messages: {len(context.messages)}")
       await next(context)
       print(f"Response: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

create_edge_runner

Fungsi pabrik untuk membuat pelari tepi yang sesuai untuk grup tepi.

create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) -> EdgeRunner

Parameter

Nama Deskripsi
edge_group
Diperlukan
<xref:agent_framework._workflows._edge.EdgeGroup>

Grup tepi untuk membuat runner.

executors
Diperlukan

Peta ID pelaksana ke instans eksekutor.

Mengembalikan

Jenis Deskripsi
<xref:agent_framework._workflows._edge_runner.EdgeRunner>

Instans EdgeRunner yang sesuai.

executor

Dekorator yang mengonversi fungsi mandiri menjadi instans FunctionExecutor.

Dekorator @executor dirancang hanya untuk fungsi tingkat modul mandiri. Untuk pelaksana berbasis kelas, gunakan kelas dasar Pelaksana dengan @handler pada metode instans.

Mendukung fungsi sinkron dan asinkron. Fungsi sinkron dijalankan dalam kumpulan utas untuk menghindari pemblokiran perulangan peristiwa.

Penting

Gunakan @executor untuk fungsi mandiri (fungsi tingkat modul atau lokal)

JANGAN gunakan @executor dengan staticmethod atau classmethod

Untuk pelaksana berbasis kelas, Pelaksana subkelas dan gunakan @handler pada metode instans

Penggunaan:


   # Standalone async function (RECOMMENDED):
   @executor(id="upper_case")
   async def to_upper(text: str, ctx: WorkflowContext[str]):
       await ctx.send_message(text.upper())


   # Standalone sync function (runs in thread pool):
   @executor
   def process_data(data: str):
       return data.upper()


   # For class-based executors, use @handler instead:
   class MyExecutor(Executor):
       def __init__(self):
           super().__init__(id="my_executor")

       @handler
       async def process(self, data: str, ctx: WorkflowContext[str]):
           await ctx.send_message(data.upper())
executor(func: Callable[[...], Any] | None = None, *, id: str | None = None) -> Callable[[Callable[[...], Any]], FunctionExecutor] | FunctionExecutor

Parameter

Nama Deskripsi
func
Callable[[...], Any] | None

Fungsi untuk menghias (saat digunakan tanpa tanda kurung)

Nilai default: None
id
Diperlukan
str | None

ID kustom opsional untuk pelaksana. Jika Tidak Ada, menggunakan nama fungsi.

Parameter Kata Kunci-Saja

Nama Deskripsi
id
Nilai default: None

Mengembalikan

Jenis Deskripsi

Instans FunctionExecutor yang dapat dikabeli ke dalam Alur Kerja.

Pengecualian

Jenis Deskripsi

Jika digunakan dengan staticmethod atau classmethod (pola yang tidak didukung)

function_middleware

Dekorator untuk menandai fungsi sebagai middleware fungsi.

Dekorator ini secara eksplisit mengidentifikasi fungsi sebagai middleware fungsi, yang memproses objek FunctionInvocationContext.

function_middleware(func: Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]

Parameter

Nama Deskripsi
func
Diperlukan

Fungsi middleware untuk menandai sebagai middleware fungsi.

Mengembalikan

Jenis Deskripsi

Fungsi yang sama dengan penanda middleware fungsi.

Contoh


   from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent


   @function_middleware
   async def logging_middleware(context: FunctionInvocationContext, next):
       print(f"Calling: {context.function.name}")
       await next(context)
       print(f"Result: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

get_checkpoint_summary

get_checkpoint_summary(checkpoint: WorkflowCheckpoint) -> WorkflowCheckpointSummary

Parameter

Nama Deskripsi
checkpoint
Diperlukan

Mengembalikan

Jenis Deskripsi

get_logger

Dapatkan pencatat dengan nama yang ditentukan, default ke 'agent_framework'.

get_logger(name: str = 'agent_framework') -> Logger

Parameter

Nama Deskripsi
name
str

Nama pencatat. Default ke 'agent_framework'.

Nilai default: "agent_framework"

Mengembalikan

Jenis Deskripsi

Instans pencatat yang dikonfigurasi.

handler

Dekorator untuk mendaftarkan handler untuk pelaksana.

handler(func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]

Parameter

Nama Deskripsi
func
Diperlukan
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

Fungsi untuk menghias. Bisa Tidak Ada saat digunakan tanpa parameter.

Mengembalikan

Jenis Deskripsi
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

Fungsi yang dihiasi dengan metadata handler.

Contoh

@handler asinkron def handle_string(self, message: str, ctx: WorkflowContext[str]) -> Tidak ada:

...

@handler asinkron def handle_data(self, message: dict, ctx: WorkflowContext[str | int]) -> None:

...

prepare_function_call_results

Siapkan nilai hasil panggilan fungsi.

prepare_function_call_results(content: TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any | list[TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any]) -> str

Parameter

Nama Deskripsi
content
Diperlukan

Mengembalikan

Jenis Deskripsi
str

prepend_agent_framework_to_user_agent

Tambahkan "agent-framework" ke User-Agent di header.

Ketika telemetri agen pengguna dinonaktifkan melalui AGENT_FRAMEWORK_USER_AGENT_DISABLED variabel lingkungan, header User-Agent tidak akan menyertakan informasi kerangka kerja agen. Ini akan dikirim kembali apa adanya, atau sebagai dict kosong ketika Tidak ada yang diteruskan.

prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]

Parameter

Nama Deskripsi
headers

Kamus header yang ada.

Nilai default: None

Mengembalikan

Jenis Deskripsi

Dict baru dengan "User-Agent" diatur ke "agent-framework-python/{version}" jika header tidak ada. Kamus header yang dimodifikasi dengan "agent-framework-python/{version}" ditambahkan ke User-Agent.

Contoh


   from agent_framework import prepend_agent_framework_to_user_agent

   # Add agent-framework to new headers
   headers = prepend_agent_framework_to_user_agent()
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0"

   # Prepend to existing headers
   existing = {"User-Agent": "my-app/1.0"}
   headers = prepend_agent_framework_to_user_agent(existing)
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0 my-app/1.0"

response_handler

Dekorator untuk mendaftarkan handler untuk menangani respons permintaan.

response_handler(func: Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]) -> Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]

Parameter

Nama Deskripsi
func
Diperlukan
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

Fungsi untuk menghias.

Mengembalikan

Jenis Deskripsi
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

Fungsi yang dihiasi dengan metadata handler.

Contoh


   @handler
   async def run(self, message: int, context: WorkflowContext[str]) -> None:
       # Example of a handler that sends a request
       ...
       # Send a request with a `CustomRequest` payload and expect a `str` response.
       await context.request_info(CustomRequest(...), str)


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: str,
       context: WorkflowContext[str],
   ) -> None:
       # Example of a response handler for the above request
       ...


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: dict,
       context: WorkflowContext[int],
   ) -> None:
       # Example of a response handler for a request expecting a dict response
       ...

setup_logging

Siapkan konfigurasi pengelogan untuk kerangka kerja agen.

setup_logging() -> None

Mengembalikan

Jenis Deskripsi

use_agent_middleware

Dekorator kelas yang menambahkan dukungan middleware ke kelas agen.

Dekorator ini menambahkan fungsionalitas middleware ke kelas agen apa pun. Ini membungkus run() metode dan run_stream() untuk memberikan eksekusi middleware.

Eksekusi middleware dapat dihentikan kapan saja dengan mengatur context.terminate properti ke True. Setelah diatur, alur akan berhenti mengeksekusi middleware lebih lanjut segera setelah kontrol kembali ke alur.

Nota

Dekorator ini sudah diterapkan ke kelas agen bawaan. Anda hanya perlu menggunakan

jika Anda membuat implementasi agen kustom.

use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]

Parameter

Nama Deskripsi
agent_class
Diperlukan
type[<xref:TAgent>]

Kelas agen untuk menambahkan dukungan middleware.

Mengembalikan

Jenis Deskripsi
type[~<xref:TAgent>]

Kelas agen yang dimodifikasi dengan dukungan middleware.

Contoh


   from agent_framework import use_agent_middleware


   @use_agent_middleware
   class CustomAgent:
       async def run(self, messages, **kwargs):
           # Agent implementation
           pass

       async def run_stream(self, messages, **kwargs):
           # Streaming implementation
           pass

use_chat_middleware

Dekorator kelas yang menambahkan dukungan middleware ke kelas klien obrolan.

Dekorator ini menambahkan fungsionalitas middleware ke kelas klien obrolan apa pun. Ini membungkus get_response() metode dan get_streaming_response() untuk memberikan eksekusi middleware.

Nota

Dekorator ini sudah diterapkan ke kelas klien obrolan bawaan. Anda hanya perlu menggunakan

jika Anda membuat implementasi klien obrolan kustom.

use_chat_middleware(chat_client_class: type[TChatClient]) -> type[TChatClient]

Parameter

Nama Deskripsi
chat_client_class
Diperlukan
type[<xref:TChatClient>]

Kelas klien obrolan untuk menambahkan dukungan middleware.

Mengembalikan

Jenis Deskripsi
type[~<xref:TChatClient>]

Kelas klien obrolan yang dimodifikasi dengan dukungan middleware.

Contoh


   from agent_framework import use_chat_middleware


   @use_chat_middleware
   class CustomChatClient:
       async def get_response(self, messages, **kwargs):
           # Chat client implementation
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Streaming implementation
           pass

use_function_invocation

Dekorator kelas yang memungkinkan panggilan alat untuk klien obrolan.

Dekorator ini membungkus get_response metode dan get_streaming_response untuk secara otomatis menangani panggilan fungsi dari model, menjalankannya, dan mengembalikan hasilnya kembali ke model untuk diproses lebih lanjut.

use_function_invocation(chat_client: type[TChatClient]) -> type[TChatClient]

Parameter

Nama Deskripsi
chat_client
Diperlukan
type[<xref:TChatClient>]

Kelas klien obrolan untuk dihiasi.

Mengembalikan

Jenis Deskripsi
type[~<xref:TChatClient>]

Kelas klien obrolan yang didekorasi dengan pemanggilan fungsi diaktifkan.

Pengecualian

Jenis Deskripsi

Jika klien obrolan tidak memiliki metode yang diperlukan.

Contoh


   from agent_framework import use_function_invocation, BaseChatClient


   @use_function_invocation
   class MyCustomClient(BaseChatClient):
       async def get_response(self, messages, **kwargs):
           # Implementation here
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Implementation here
           pass


   # The client now automatically handles function calls
   client = MyCustomClient()

validate_workflow_graph

Fungsi kenyamanan untuk memvalidasi grafik alur kerja.

validate_workflow_graph(edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], start_executor: Executor) -> None

Parameter

Nama Deskripsi
edge_groups
Diperlukan
Sequence[<xref:agent_framework._workflows._edge.EdgeGroup>]

daftar grup tepi dalam alur kerja

executors
Diperlukan

Peta ID pelaksana ke instans eksekutor

start_executor
Diperlukan

Pelaksana awal (dapat berupa instans atau ID)

Mengembalikan

Jenis Deskripsi

Pengecualian

Jenis Deskripsi

Jika ada validasi yang gagal