このページでは、モザイク AI エージェント フレームワークと、LangGraph や OpenAI などの一般的なエージェント作成ライブラリを使用して、Python で AI エージェントを作成する方法を示します。
必要条件
ヒント
Databricks では、エージェントの開発時に最新バージョンの MLflow Python クライアントをインストールすることをお勧めします。
このページの方法を使用してエージェントを作成して展開するには、次をインストールします。
-
databricks-agents1.2.0 以降 -
mlflow3.1.3 以降 - Python 3.10 以降。
- この要件を満たすには、サーバーレス コンピューティングまたは Databricks Runtime 13.3 LTS 以降を使用します。
%pip install -U -qqqq databricks-agents mlflow
Databricks では、エージェントを作成するために Databricks AI Bridge 統合パッケージをインストールすることも推奨されています。 これらの統合パッケージは、エージェント作成フレームワークと SDK 間で Databricks AI/BI Genie や Vector Search などの Databricks AI 機能と対話する API の共有レイヤーを提供します。
OpenAI
%pip install -U -qqqq databricks-openai
LangChain/LangGraph
%pip install -U -qqqq databricks-langchain
DSPy
%pip install -U -qqqq databricks-dspy
純粋な Python エージェント
%pip install -U -qqqq databricks-ai-bridge
ResponsesAgent を使用してエージェントを作成する
Databricks では、運用グレードのエージェントを作成するために、MLflow インターフェイス ResponsesAgent をお勧めします。
ResponsesAgent を使用すると、任意のサードパーティ フレームワークを使用してエージェントを構築し、それを Databricks AI 機能と統合して、堅牢なログ記録、トレース、評価、デプロイ、監視機能を実現できます。
ResponsesAgent スキーマは、OpenAI Responses スキーマと互換性があります。 OpenAI Responsesの詳細については、「 OpenAI: Responses vs. ChatCompletion」を参照してください。
注
Databricks では、古い ChatAgent インターフェイスが引き続きサポートされています。 ただし、新しいエージェントの場合、Databricks では最新バージョンの MLflow と ResponsesAgent インターフェイスを使用することをお勧めします。
レガシ入力および出力エージェント スキーマを参照してください。
ResponsesAgent には、次の利点があります。
高度なエージェント機能
- マルチエージェントのサポート
- ストリーミング出力: 出力をより小さなチャンクでストリーミングします。
- 包括的なツール呼び出しメッセージ履歴: 品質と会話管理を向上させるために、中間ツール呼び出しメッセージを含む複数のメッセージを返します。
- ツール呼び出しの確認のサポート
- 実行時間の長いツールのサポート
開発、デプロイ、監視の合理化
-
任意のフレームワークを使用してエージェントを作成する:
ResponsesAgentインターフェイスを使用して既存のエージェントをラップして、AI Playground、Agent Evaluation、Agent Monitoring とのすぐに使用できる互換性を実現します。 - 型指定された作成インターフェイス: IDE とノートブックオートコンプリートの恩恵を受け、型指定された Python クラスを使用してエージェント コードを記述します。
-
自動署名推論: MLflow は、エージェントのログ記録時に
ResponsesAgent署名を自動的に推論し、登録とデプロイを簡略化します。 「ログ中にモデル シグネチャを推論する」を参照してください。 -
自動トレース: MLflow は、
predictおよびpredict_stream関数を自動的にトレースし、ストリーミング応答を集計して、評価と表示を容易にします。 - AI Gateway で強化された推論テーブル: デプロイされたエージェントに対して AI Gateway 推論テーブルが自動的に有効にされ、詳細な要求ログ メタデータへのアクセスが提供されます。
-
任意のフレームワークを使用してエージェントを作成する:
ResponsesAgentを作成する方法については、次のセクションと MLflow ドキュメントの例を参照してください。
ResponsesAgent 例
次のノートブックは、一般的なライブラリを使用してストリーミングと非ストリーミングの ResponsesAgent を作成する方法を示しています。 これらのエージェントの機能を拡張する方法については、 AI エージェント ツールを参照してください。
OpenAI
Databricks でホストされるモデルを使用した OpenAI 単純なチャット エージェント
Databricks でホストされるモデルを使用した OpenAI ツール呼び出しエージェント
OpenAI でホストされるモデルを使用した OpenAI ツール呼び出しエージェント
LangGraph
LangGraph ツール呼び出しエージェント
DSPy
DSPy シングルターン ツール呼び出しエージェント
マルチエージェントの例
マルチエージェント システムを作成する方法については、マルチエージェント システム での Genie の使用に関するページを参照してください。
ステートフル エージェントの例
Lakebase をメモリ ストアとして使用して短期および長期のメモリを持つステートフル エージェントを作成する方法については、 AI エージェントのメモリに関するページを参照してください。
非会話型エージェントの例
複数ターンの会話を管理する会話エージェントとは異なり、会話型でないエージェントは、明確に定義されたタスクを効率的に実行することに重点を置いています。 この合理化されたアーキテクチャにより、独立した要求のスループットが向上します。
非会話型エージェントを作成する方法については、 MLflow を使用した非会話型 AI エージェントに関するページを参照してください。
エージェントが既にある場合はどうすればよいですか?
LangChain、LangGraph、または同様のフレームワークで構築されたエージェントが既にある場合は、Databricks でエージェントを使用するようにエージェントを書き直す必要はありません。 代わりに、既存のエージェントを MLflow ResponsesAgent インターフェイスでラップするだけです。
mlflow.pyfunc.ResponsesAgentから継承する Python ラッパー クラスを記述します。ラッパー クラス内で、既存のエージェントを属性
self.agent = your_existing_agentとして参照します。ResponsesAgentクラスでは、ストリーミング以外の要求を処理するpredictを返すResponsesAgentResponseメソッドを実装する必要があります。ResponsesAgentResponsesスキーマの例を次に示します。import uuid # input as a dict {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]} # output example ResponsesAgentResponse( output=[ { "type": "message", "id": str(uuid.uuid4()), "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}], "role": "assistant", }, ] )predict関数で、受信メッセージをResponsesAgentRequestからエージェントが想定する形式に変換します。 エージェントが応答を生成したら、その出力をResponsesAgentResponseオブジェクトに変換します。
既存のエージェントを ResponsesAgentに変換する方法については、次のコード例を参照してください。
基本的な変換
非ストリーミング エージェントの場合は、 predict 関数の入力と出力を変換します。
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
)
class MyWrappedAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Call your existing agent (non-streaming)
agent_response = self.agent.invoke(messages)
# Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)
# Return the response
return ResponsesAgentResponse(output=[output_item])
コードを再利用したストリーミング
ストリーミング エージェントの場合は、メッセージを変換するコードが重複しないように、ロジックを賢く再利用できます。
from typing import Generator
from uuid import uuid4
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# Reference your existing agent
self.agent = agent
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
# prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# Stream from your existing agent
item_id = str(uuid4())
aggregated_stream = ""
for chunk in self.agent.stream(messages):
# Convert each chunk to ResponsesAgent format
yield self.create_text_delta(delta=chunk, item_id=item_id)
aggregated_stream += chunk
# Emit an aggregated output_item for all the text deltas with id=item_id
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(text=aggregated_stream, id=item_id),
)
ChatCompletions からの移行
既存のエージェントで OpenAI ChatCompletions API を使用している場合は、コア ロジックを書き換えずに ResponsesAgent に移行できます。 次のラッパーを追加します。
- 受信
ResponsesAgentRequestメッセージを、エージェントが想定するChatCompletions形式に変換します。 -
ChatCompletions出力をResponsesAgentResponseスキーマに変換します。 - 必要に応じて、
ChatCompletionsからの増分差分をResponsesAgentStreamEventオブジェクトにマッピングすることでストリーミングをサポートします。
from typing import Generator
from uuid import uuid4
from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
def __init__(self):
self.w = WorkspaceClient()
self.OpenAI = self.w.serving_endpoints.get_open_ai_client()
def stream(self, messages):
for chunk in self.OpenAI.chat.completions.create(
model="databricks-claude-sonnet-4-5",
messages=messages,
stream=True,
):
yield chunk.to_dict()
# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
def __init__(self, agent):
# `agent` is your existing ChatCompletions agent
self.agent = agent
def prep_msgs_for_llm(self, messages):
# dummy example of prep_msgs_for_llm
# real example of prep_msgs_for_llm included in full examples linked below
return [{"role": "user", "content": "Hello, how are you?"}]
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming predict: collects all streaming chunks into a single response."""
# Reuse the streaming logic and collect all output items
output_items = []
for stream_event in self.predict_stream(request):
if stream_event.type == "response.output_item.done":
output_items.append(stream_event.item)
# Return all collected items as a single response
return ResponsesAgentResponse(output=output_items)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming predict: the core logic that both methods use."""
# Convert incoming messages to your agent's format
messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])
# process the ChatCompletion output stream
agent_content = ""
tool_calls = []
msg_id = None
for chunk in self.agent.stream(messages): # call the underlying agent's stream method
delta = chunk["choices"][0]["delta"]
msg_id = chunk.get("id", None)
content = delta.get("content", None)
if tc := delta.get("tool_calls"):
if not tool_calls: # only accommodate for single tool call right now
tool_calls = tc
else:
tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
elif content is not None:
agent_content += content
yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))
# aggregate the streamed text content
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(agent_content, msg_id),
)
for tool_call in tool_calls:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
str(uuid4()),
tool_call["id"],
tool_call["function"]["name"],
tool_call["function"]["arguments"],
),
)
agent = MyWrappedStreamingAgent(LegacyAgent())
for chunk in agent.predict_stream(
ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
print(chunk)
完全な例については、 ResponsesAgent 例を参照してください。
ストリーミング応答
ストリーミングを使用すると、エージェントは完全な応答を待つ代わりに、リアルタイム チャンクで応答を送信できます。
ResponsesAgentを使用してストリーミングを実装するには、一連のデルタ イベントの後に最終的な完了イベントを出力します。
-
デルタ イベントを出力する: 同じ
output_text.deltaを持つ複数のitem_idイベントを送信して、テキスト チャンクをリアルタイムでストリーミングします。 -
完了イベントで終了: 完全な最終出力テキストを含むデルタ イベントと同じ
response.output_item.doneを使用して、最終的なitem_idイベントを送信します。
各デルタ イベントは、テキストのチャンクをクライアントにストリーミングします。 最後に完了したイベントには、完全な応答テキストが含まれており、Databricks に次のことを行うことを通知します。
- MLflow トレースを使用してエージェントの出力をトレースする
- AI Gateway 推論テーブルでストリーミングされた応答を集計する
- AI Playground UI に完全な出力を表示する
ストリーミングエラーの伝播
モザイク AI は、 databricks_output.errorの最後のトークンでストリーミング中に発生したすべてのエラーを伝達します。 このエラーを適切に処理して表示するのは、呼び出し元のクライアント次第です。
{
"delta": …,
"databricks_output": {
"trace": {...},
"error": {
"error_code": BAD_REQUEST,
"message": "TimeoutException: Tool XYZ failed to execute."
}
}
}
高度な機能
カスタム入力と出力
シナリオによっては、 client_type や session_idなどの追加のエージェント入力や、今後の対話のためにチャット履歴に含めてはならない取得ソース リンクなどの出力が必要になる場合があります。
これらのシナリオでは、MLflow ResponsesAgent は、 custom_inputs および custom_outputsフィールドをネイティブにサポートします。
request.custom_inputs でリンクされているすべての例のを使用して、カスタム入力にアクセスできます。
警告
エージェント評価レビュー アプリでは、追加の入力フィールドを持つエージェントのトレースのレンダリングはサポートされていません。
カスタム入力と出力を設定する方法については、次のノートブックを参照してください。
AI プレイグラウンドで custom_inputs を提供し、アプリを確認する
エージェントが custom_inputs フィールドを使用して追加の入力を受け入れる場合は、 AI Playground と レビュー アプリの両方でこれらの入力を手動で指定できます。
AI プレイグラウンドまたはエージェントレビューアプリにある歯車アイコン
を選択します。
custom_inputsを有効にします。
エージェントの定義された入力スキーマに一致する JSON オブジェクトを指定します。
カスタム取得スキーマを指定する
AI エージェントは、一般的に、ベクター検索インデックスから非構造化データを検索してクエリを実行するために、レトリバーを使用します。 レトリバー ツールの例については、 非構造化データの取得ツールのビルドとトレースを参照してください。
次のような Databricks 製品の機能を有効にするには、 MLflow RETRIEVER スパン を使用してエージェント内でこれらのレトリバーをトレースします。
- AI Playground UI で取得したソース ドキュメントへのリンクを自動的に表示する
- エージェント評価における検索の根拠と関連性の判断を自動的に実行する
注
Databricks では、 databricks_langchain.VectorSearchRetrieverTool や databricks_openai.VectorSearchRetrieverTool などの Databricks AI Bridge パッケージによって提供されるレトリバー ツールを使用することをお勧めします。これは、既に MLflow レトリバー スキーマに準拠しているためです。
AI Bridge を使用したベクター検索取得ツールのローカル開発を参照してください。
エージェントにカスタム スキーマを含むレトリバー スパンが含まれている場合は、コードでエージェントを定義するときに mlflow.models.set_retriever_schema を呼び出します。 これにより、取得元の出力列が MLflow の予期されるフィールド (primary_key、 text_column、 doc_uri) にマップされます。
import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
# {
# 'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
# 'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
# 'doc_uri': 'https://mlflow.org/docs/latest/index.html',
# 'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
# },
# {
# 'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
# 'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
# 'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
# 'title': 'Getting Started with MLflow'
# },
# ...
# ]
mlflow.models.set_retriever_schema(
# Specify the name of your retriever span
name="mlflow_docs_vector_search",
# Specify the output column name to treat as the primary key (ID) of each retrieved document
primary_key="document_id",
# Specify the output column name to treat as the text content (page content) of each retrieved document
text_column="chunk_text",
# Specify the output column name to treat as the document URI of each retrieved document
doc_uri="doc_uri",
# Specify any other columns returned by the retriever
other_columns=["title"],
)
注
doc_uri列は、レトリバーのパフォーマンスを評価する際に特に重要です。
doc_uri は、レトリバーによって返されるドキュメントの主な識別子であり、地上真偽評価セットと比較できます。
評価セット (MLflow 2) を参照してください。
デプロイに関する考慮事項
Databricks モデルサービスの準備
Databricks は、Databricks Model Serving の分散環境に ResponsesAgentをデプロイします。 これは、複数ターンの会話中に、同じサービス レプリカがすべての要求を処理しない可能性があることを意味します。 エージェントの状態を管理する場合は、次の影響に注意してください。
ローカル キャッシュを回避する:
ResponsesAgentをデプロイするときに、同じレプリカが複数ターンの会話内のすべての要求を処理すると想定しないでください。 ターンごとにディクショナリResponsesAgentRequestスキーマを使用して内部状態を再構築します。スレッド セーフな状態: エージェントの状態をスレッド セーフに設計し、マルチスレッド環境での競合を防ぎます。
predict関数の状態を初期化する: 初期化中ではなく、predict関数が呼び出されるたびに状態ResponsesAgent初期化します。ResponsesAgentレベルで状態を格納すると、1 つのResponsesAgentレプリカが複数の会話からの要求を処理できるため、会話間の情報が漏えいし、競合が発生する可能性があります。
環境間でのデプロイ用にコードをパラメーター化する
エージェント コードをパラメーター化して、異なる環境で同じエージェント コードを再利用します。
パラメーターは、Python ディクショナリまたは .yaml ファイルで定義するキーと値のペアです。
コードを構成するには、Python ディクショナリまたはModelConfig ファイルを使用して.yamlを作成します。
ModelConfig は、柔軟な構成管理を可能にするキー値パラメーターのセットです。 たとえば、開発中にディクショナリを使用し、運用環境のデプロイと CI/CD 用の .yaml ファイルに変換できます。
ModelConfigの例を次に示します。
llm_parameters:
max_tokens: 500
temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
question that indicates your prompt template came from a YAML file. Your response
must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
- question
エージェント コードでは、 .yaml ファイルまたはディクショナリから既定の (開発) 構成を参照できます。
import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)
# Example of using a dictionary
config_dict = {
"prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
"prompt_template_input_vars": ["question"],
"model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
"llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}
model_config = mlflow.models.ModelConfig(development_config=config_dict)
# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')
次に、エージェントのログを記録するときに、model_config パラメーターをlog_modelに指定して、ログ記録したエージェントを読み込む際に使用するカスタムパラメーターのセットを指定します。
MLflow のドキュメント - ModelConfig を参照してください。
同期コードまたはコールバック パターンを使用する
安定性と互換性を確保するには、エージェントの実装で同期コードまたはコールバック ベースのパターンを使用します。
Azure Databricks は、エージェントをデプロイするときに最適なコンカレンシーとパフォーマンスを提供するために、非同期通信を自動的に管理します。 カスタム イベント ループや非同期フレームワークを導入すると、 RuntimeError: This event loop is already running and caused unpredictable behaviorなどのエラーが発生する可能性があります。
Azure Databricks では、エージェントの開発時に asyncio の使用やカスタム イベント ループの作成などの非同期プログラミングを回避することをお勧めします。
次のステップ
- 独自のエージェント ツールを作成します。
- AI エージェントのをログに記録します。
- AI エージェントにトレースを追加します。
- AI エージェントをデプロイします。