次の方法で共有


コードで AI エージェントを作成する

このページでは、モザイク AI エージェント フレームワークと、LangGraph や OpenAI などの一般的なエージェント作成ライブラリを使用して、Python で AI エージェントを作成する方法を示します。

必要条件

ヒント

Databricks では、エージェントの開発時に最新バージョンの MLflow Python クライアントをインストールすることをお勧めします。

このページの方法を使用してエージェントを作成して展開するには、次をインストールします。

  • databricks-agents 1.2.0 以降
  • mlflow 3.1.3 以降
  • Python 3.10 以降。
    • この要件を満たすには、サーバーレス コンピューティングまたは Databricks Runtime 13.3 LTS 以降を使用します。
%pip install -U -qqqq databricks-agents mlflow

Databricks では、エージェントを作成するために Databricks AI Bridge 統合パッケージをインストールすることも推奨されています。 これらの統合パッケージは、エージェント作成フレームワークと SDK 間で Databricks AI/BI Genie や Vector Search などの Databricks AI 機能と対話する API の共有レイヤーを提供します。

OpenAI

%pip install -U -qqqq databricks-openai

LangChain/LangGraph

%pip install -U -qqqq databricks-langchain

DSPy

%pip install -U -qqqq databricks-dspy

純粋な Python エージェント

%pip install -U -qqqq databricks-ai-bridge

ResponsesAgent を使用してエージェントを作成する

Databricks では、運用グレードのエージェントを作成するために、MLflow インターフェイス ResponsesAgent をお勧めします。 ResponsesAgent を使用すると、任意のサードパーティ フレームワークを使用してエージェントを構築し、それを Databricks AI 機能と統合して、堅牢なログ記録、トレース、評価、デプロイ、監視機能を実現できます。

ResponsesAgent スキーマは、OpenAI Responses スキーマと互換性があります。 OpenAI Responsesの詳細については、「 OpenAI: Responses vs. ChatCompletion」を参照してください。

Databricks では、古い ChatAgent インターフェイスが引き続きサポートされています。 ただし、新しいエージェントの場合、Databricks では最新バージョンの MLflow と ResponsesAgent インターフェイスを使用することをお勧めします。

レガシ入力および出力エージェント スキーマを参照してください。

ResponsesAgent は、Databricks の互換性のために既存のエージェントを簡単にラップします。

ResponsesAgent には、次の利点があります。

  • 高度なエージェント機能

    • マルチエージェントのサポート
    • ストリーミング出力: 出力をより小さなチャンクでストリーミングします。
    • 包括的なツール呼び出しメッセージ履歴: 品質と会話管理を向上させるために、中間ツール呼び出しメッセージを含む複数のメッセージを返します。
    • ツール呼び出しの確認のサポート
    • 実行時間の長いツールのサポート
  • 開発、デプロイ、監視の合理化

    • 任意のフレームワークを使用してエージェントを作成する: ResponsesAgent インターフェイスを使用して既存のエージェントをラップして、AI Playground、Agent Evaluation、Agent Monitoring とのすぐに使用できる互換性を実現します。
    • 型指定された作成インターフェイス: IDE とノートブックオートコンプリートの恩恵を受け、型指定された Python クラスを使用してエージェント コードを記述します。
    • 自動署名推論: MLflow は、エージェントのログ記録時に ResponsesAgent 署名を自動的に推論し、登録とデプロイを簡略化します。 「ログ中にモデル シグネチャを推論する」を参照してください。
    • 自動トレース: MLflow は、 predict および predict_stream 関数を自動的にトレースし、ストリーミング応答を集計して、評価と表示を容易にします。
    • AI Gateway で強化された推論テーブル: デプロイされたエージェントに対して AI Gateway 推論テーブルが自動的に有効にされ、詳細な要求ログ メタデータへのアクセスが提供されます。

ResponsesAgentを作成する方法については、次のセクションと MLflow ドキュメントの例を参照してください。

ResponsesAgent

次のノートブックは、一般的なライブラリを使用してストリーミングと非ストリーミングの ResponsesAgent を作成する方法を示しています。 これらのエージェントの機能を拡張する方法については、 AI エージェント ツールを参照してください。

OpenAI

Databricks でホストされるモデルを使用した OpenAI 単純なチャット エージェント

ノートブックを入手

Databricks でホストされるモデルを使用した OpenAI ツール呼び出しエージェント

ノートブックを入手

OpenAI でホストされるモデルを使用した OpenAI ツール呼び出しエージェント

ノートブックを入手

LangGraph

LangGraph ツール呼び出しエージェント

ノートブックを入手

DSPy

DSPy シングルターン ツール呼び出しエージェント

ノートブックを入手

マルチエージェントの例

マルチエージェント システムを作成する方法については、マルチエージェント システム での Genie の使用に関するページを参照してください。

ステートフル エージェントの例

Lakebase をメモリ ストアとして使用して短期および長期のメモリを持つステートフル エージェントを作成する方法については、 AI エージェントのメモリに関するページを参照してください。

非会話型エージェントの例

複数ターンの会話を管理する会話エージェントとは異なり、会話型でないエージェントは、明確に定義されたタスクを効率的に実行することに重点を置いています。 この合理化されたアーキテクチャにより、独立した要求のスループットが向上します。

非会話型エージェントを作成する方法については、 MLflow を使用した非会話型 AI エージェントに関するページを参照してください。

エージェントが既にある場合はどうすればよいですか?

LangChain、LangGraph、または同様のフレームワークで構築されたエージェントが既にある場合は、Databricks でエージェントを使用するようにエージェントを書き直す必要はありません。 代わりに、既存のエージェントを MLflow ResponsesAgent インターフェイスでラップするだけです。

  1. mlflow.pyfunc.ResponsesAgentから継承する Python ラッパー クラスを記述します。

    ラッパー クラス内で、既存のエージェントを属性 self.agent = your_existing_agentとして参照します。

  2. ResponsesAgent クラスでは、ストリーミング以外の要求を処理するpredictを返すResponsesAgentResponse メソッドを実装する必要があります。 ResponsesAgentResponses スキーマの例を次に示します。

    import uuid
    # input as a dict
    {"input": [{"role": "user", "content": "What did the data scientist say when their Spark job finally completed?"}]}
    
    # output example
    ResponsesAgentResponse(
        output=[
            {
                "type": "message",
                "id": str(uuid.uuid4()),
                "content": [{"type": "output_text", "text": "Well, that really sparked joy!"}],
                "role": "assistant",
            },
        ]
    )
    
  3. predict関数で、受信メッセージを ResponsesAgentRequest からエージェントが想定する形式に変換します。 エージェントが応答を生成したら、その出力を ResponsesAgentResponse オブジェクトに変換します。

既存のエージェントを ResponsesAgentに変換する方法については、次のコード例を参照してください。

基本的な変換

非ストリーミング エージェントの場合は、 predict 関数の入力と出力を変換します。

from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
)


class MyWrappedAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Call your existing agent (non-streaming)
        agent_response = self.agent.invoke(messages)

        # Convert your agent's output to ResponsesAgent format, assuming agent_response is a str
        output_item = (self.create_text_output_item(text=agent_response, id=str(uuid4())),)

        # Return the response
        return ResponsesAgentResponse(output=[output_item])

コードを再利用したストリーミング

ストリーミング エージェントの場合は、メッセージを変換するコードが重複しないように、ロジックを賢く再利用できます。

from typing import Generator
from uuid import uuid4

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)


class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # Reference your existing agent
        self.agent = agent

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)

        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        # prep_msgs_for_llm is a function you write to convert the incoming messages, included in full examples linked below
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # Stream from your existing agent
        item_id = str(uuid4())
        aggregated_stream = ""
        for chunk in self.agent.stream(messages):
            # Convert each chunk to ResponsesAgent format
            yield self.create_text_delta(delta=chunk, item_id=item_id)
            aggregated_stream += chunk

        # Emit an aggregated output_item for all the text deltas with id=item_id
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(text=aggregated_stream, id=item_id),
        )

ChatCompletions からの移行

既存のエージェントで OpenAI ChatCompletions API を使用している場合は、コア ロジックを書き換えずに ResponsesAgent に移行できます。 次のラッパーを追加します。

  1. 受信 ResponsesAgentRequest メッセージを、エージェントが想定する ChatCompletions 形式に変換します。
  2. ChatCompletions出力をResponsesAgentResponse スキーマに変換します。
  3. 必要に応じて、 ChatCompletions からの増分差分を ResponsesAgentStreamEvent オブジェクトにマッピングすることでストリーミングをサポートします。
from typing import Generator
from uuid import uuid4

from databricks.sdk import WorkspaceClient
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)


# Legacy agent that outputs ChatCompletions objects
class LegacyAgent:
    def __init__(self):
        self.w = WorkspaceClient()
        self.OpenAI = self.w.serving_endpoints.get_open_ai_client()

    def stream(self, messages):
        for chunk in self.OpenAI.chat.completions.create(
            model="databricks-claude-sonnet-4-5",
            messages=messages,
            stream=True,
        ):
            yield chunk.to_dict()


# Wrapper that converts the legacy agent to a ResponsesAgent
class MyWrappedStreamingAgent(ResponsesAgent):
    def __init__(self, agent):
        # `agent` is your existing ChatCompletions agent
        self.agent = agent

    def prep_msgs_for_llm(self, messages):
        # dummy example of prep_msgs_for_llm
        # real example of prep_msgs_for_llm included in full examples linked below
        return [{"role": "user", "content": "Hello, how are you?"}]

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming predict: collects all streaming chunks into a single response."""
        # Reuse the streaming logic and collect all output items
        output_items = []
        for stream_event in self.predict_stream(request):
            if stream_event.type == "response.output_item.done":
                output_items.append(stream_event.item)

        # Return all collected items as a single response
        return ResponsesAgentResponse(output=output_items)

    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming predict: the core logic that both methods use."""
        # Convert incoming messages to your agent's format
        messages = self.prep_msgs_for_llm([i.model_dump() for i in request.input])

        # process the ChatCompletion output stream
        agent_content = ""
        tool_calls = []
        msg_id = None
        for chunk in self.agent.stream(messages):  # call the underlying agent's stream method
            delta = chunk["choices"][0]["delta"]
            msg_id = chunk.get("id", None)
            content = delta.get("content", None)
            if tc := delta.get("tool_calls"):
                if not tool_calls:  # only accommodate for single tool call right now
                    tool_calls = tc
                else:
                    tool_calls[0]["function"]["arguments"] += tc[0]["function"]["arguments"]
            elif content is not None:
                agent_content += content
                yield ResponsesAgentStreamEvent(**self.create_text_delta(content, item_id=msg_id))

        # aggregate the streamed text content
        yield ResponsesAgentStreamEvent(
            type="response.output_item.done",
            item=self.create_text_output_item(agent_content, msg_id),
        )

        for tool_call in tool_calls:
            yield ResponsesAgentStreamEvent(
                type="response.output_item.done",
                item=self.create_function_call_item(
                    str(uuid4()),
                    tool_call["id"],
                    tool_call["function"]["name"],
                    tool_call["function"]["arguments"],
                ),
            )


agent = MyWrappedStreamingAgent(LegacyAgent())

for chunk in agent.predict_stream(
    ResponsesAgentRequest(input=[{"role": "user", "content": "Hello, how are you?"}])
):
    print(chunk)

完全な例については、 ResponsesAgent 例を参照してください。

ストリーミング応答

ストリーミングを使用すると、エージェントは完全な応答を待つ代わりに、リアルタイム チャンクで応答を送信できます。 ResponsesAgentを使用してストリーミングを実装するには、一連のデルタ イベントの後に最終的な完了イベントを出力します。

  1. デルタ イベントを出力する: 同じoutput_text.deltaを持つ複数のitem_id イベントを送信して、テキスト チャンクをリアルタイムでストリーミングします。
  2. 完了イベントで終了: 完全な最終出力テキストを含むデルタ イベントと同じresponse.output_item.doneを使用して、最終的なitem_id イベントを送信します。

各デルタ イベントは、テキストのチャンクをクライアントにストリーミングします。 最後に完了したイベントには、完全な応答テキストが含まれており、Databricks に次のことを行うことを通知します。

  • MLflow トレースを使用してエージェントの出力をトレースする
  • AI Gateway 推論テーブルでストリーミングされた応答を集計する
  • AI Playground UI に完全な出力を表示する

ストリーミングエラーの伝播

モザイク AI は、 databricks_output.errorの最後のトークンでストリーミング中に発生したすべてのエラーを伝達します。 このエラーを適切に処理して表示するのは、呼び出し元のクライアント次第です。

{
  "delta": …,
  "databricks_output": {
    "trace": {...},
    "error": {
      "error_code": BAD_REQUEST,
      "message": "TimeoutException: Tool XYZ failed to execute."
    }
  }
}

高度な機能

カスタム入力と出力

シナリオによっては、 client_typesession_idなどの追加のエージェント入力や、今後の対話のためにチャット履歴に含めてはならない取得ソース リンクなどの出力が必要になる場合があります。

これらのシナリオでは、MLflow ResponsesAgent は、 custom_inputs および custom_outputsフィールドをネイティブにサポートします。 request.custom_inputs でリンクされているすべての例のを使用して、カスタム入力にアクセスできます。

警告

エージェント評価レビュー アプリでは、追加の入力フィールドを持つエージェントのトレースのレンダリングはサポートされていません。

カスタム入力と出力を設定する方法については、次のノートブックを参照してください。

AI プレイグラウンドで custom_inputs を提供し、アプリを確認する

エージェントが custom_inputs フィールドを使用して追加の入力を受け入れる場合は、 AI Playgroundレビュー アプリの両方でこれらの入力を手動で指定できます。

  1. AI プレイグラウンドまたはエージェントレビューアプリにある歯車アイコン 歯車アイコンを選択します。

  2. custom_inputsを有効にします。

  3. エージェントの定義された入力スキーマに一致する JSON オブジェクトを指定します。

    AI プレイグラウンドでカスタム入力を提供します。

カスタム取得スキーマを指定する

AI エージェントは、一般的に、ベクター検索インデックスから非構造化データを検索してクエリを実行するために、レトリバーを使用します。 レトリバー ツールの例については、 非構造化データの取得ツールのビルドとトレースを参照してください。

次のような Databricks 製品の機能を有効にするには、 MLflow RETRIEVER スパン を使用してエージェント内でこれらのレトリバーをトレースします。

  • AI Playground UI で取得したソース ドキュメントへのリンクを自動的に表示する
  • エージェント評価における検索の根拠と関連性の判断を自動的に実行する

Databricks では、 databricks_langchain.VectorSearchRetrieverTooldatabricks_openai.VectorSearchRetrieverTool などの Databricks AI Bridge パッケージによって提供されるレトリバー ツールを使用することをお勧めします。これは、既に MLflow レトリバー スキーマに準拠しているためです。 AI Bridge を使用したベクター検索取得ツールのローカル開発を参照してください。

エージェントにカスタム スキーマを含むレトリバー スパンが含まれている場合は、コードでエージェントを定義するときに mlflow.models.set_retriever_schema を呼び出します。 これにより、取得元の出力列が MLflow の予期されるフィールド (primary_keytext_columndoc_uri) にマップされます。

import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
#     {
#         'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
#         'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
#         'doc_uri': 'https://mlflow.org/docs/latest/index.html',
#         'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
#     },
#     {
#         'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
#         'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
#         'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
#         'title': 'Getting Started with MLflow'
#     },
# ...
# ]
mlflow.models.set_retriever_schema(
    # Specify the name of your retriever span
    name="mlflow_docs_vector_search",
    # Specify the output column name to treat as the primary key (ID) of each retrieved document
    primary_key="document_id",
    # Specify the output column name to treat as the text content (page content) of each retrieved document
    text_column="chunk_text",
    # Specify the output column name to treat as the document URI of each retrieved document
    doc_uri="doc_uri",
    # Specify any other columns returned by the retriever
    other_columns=["title"],
)

doc_uri列は、レトリバーのパフォーマンスを評価する際に特に重要です。 doc_uri は、レトリバーによって返されるドキュメントの主な識別子であり、地上真偽評価セットと比較できます。 評価セット (MLflow 2) を参照してください。

デプロイに関する考慮事項

Databricks モデルサービスの準備

Databricks は、Databricks Model Serving の分散環境に ResponsesAgentをデプロイします。 これは、複数ターンの会話中に、同じサービス レプリカがすべての要求を処理しない可能性があることを意味します。 エージェントの状態を管理する場合は、次の影響に注意してください。

  • ローカル キャッシュを回避する: ResponsesAgentをデプロイするときに、同じレプリカが複数ターンの会話内のすべての要求を処理すると想定しないでください。 ターンごとにディクショナリ ResponsesAgentRequest スキーマを使用して内部状態を再構築します。

  • スレッド セーフな状態: エージェントの状態をスレッド セーフに設計し、マルチスレッド環境での競合を防ぎます。

  • predict関数の状態を初期化する: 初期化中ではなく、predict関数が呼び出されるたびに状態ResponsesAgent初期化します。 ResponsesAgent レベルで状態を格納すると、1 つのResponsesAgent レプリカが複数の会話からの要求を処理できるため、会話間の情報が漏えいし、競合が発生する可能性があります。

環境間でのデプロイ用にコードをパラメーター化する

エージェント コードをパラメーター化して、異なる環境で同じエージェント コードを再利用します。

パラメーターは、Python ディクショナリまたは .yaml ファイルで定義するキーと値のペアです。

コードを構成するには、Python ディクショナリまたはModelConfig ファイルを使用して.yamlを作成します。 ModelConfig は、柔軟な構成管理を可能にするキー値パラメーターのセットです。 たとえば、開発中にディクショナリを使用し、運用環境のデプロイと CI/CD 用の .yaml ファイルに変換できます。

ModelConfigの例を次に示します。

llm_parameters:
  max_tokens: 500
  temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
  question that indicates your prompt template came from a YAML file. Your response
  must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
  - question

エージェント コードでは、 .yaml ファイルまたはディクショナリから既定の (開発) 構成を参照できます。

import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)

# Example of using a dictionary
config_dict = {
    "prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
    "prompt_template_input_vars": ["question"],
    "model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
    "llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}

model_config = mlflow.models.ModelConfig(development_config=config_dict)

# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')

次に、エージェントのログを記録するときに、model_config パラメーターをlog_modelに指定して、ログ記録したエージェントを読み込む際に使用するカスタムパラメーターのセットを指定します。 MLflow のドキュメント - ModelConfig を参照してください。

同期コードまたはコールバック パターンを使用する

安定性と互換性を確保するには、エージェントの実装で同期コードまたはコールバック ベースのパターンを使用します。

Azure Databricks は、エージェントをデプロイするときに最適なコンカレンシーとパフォーマンスを提供するために、非同期通信を自動的に管理します。 カスタム イベント ループや非同期フレームワークを導入すると、 RuntimeError: This event loop is already running and caused unpredictable behaviorなどのエラーが発生する可能性があります。

Azure Databricks では、エージェントの開発時に asyncio の使用やカスタム イベント ループの作成などの非同期プログラミングを回避することをお勧めします。

次のステップ