次の方法で共有


コードで AI エージェントを作成する

このページでは、モザイク AI エージェント フレームワークと、LangGraph、PyFunc、OpenAI などの一般的なエージェント作成ライブラリを使用して、Python で AI エージェントを作成する方法を示します。

必要条件

ヒント

Databricks では、エージェントの開発時に最新バージョンの MLflow Python クライアントをインストールすることをお勧めします。

このページの方法を使用してエージェントを作成して展開するには、次をインストールします。

  • databricks-agents 0.16.0 以降
  • mlflow 2.20.2 以降
  • Python 3.10 以降。
    • この要件を満たすために、サーバーレス コンピューティングまたは Databricks Runtime 13.3 LTS 以降を使用できます。
%pip install -U -qqqq databricks-agents>=0.16.0 mlflow>=2.20.2

Databricks では、エージェントの作成時に Databricks AI Bridge 統合パッケージをインストールすることも推奨されています。 これらの統合パッケージ ( databricks-langchaindatabricks-openai など) は、エージェント作成フレームワークと SDK 間で Databricks AI/BI Genie や Vector Search などの Databricks AI 機能と対話するための API の共有レイヤーを提供します。

LangChain/LangGraph

%pip install -U -qqqq databricks-langchain

オープンAI

%pip install -U -qqqq databricks-openai

純粋な Python エージェント

%pip install -U -qqqq databricks-ai-bridge

ChatAgent を使用してエージェントを作成する

Databricks では、運用グレードのエージェントを作成するために MLflow ChatAgent インターフェイスをお勧めします。 このチャット スキーマの仕様は、OpenAI ChatCompletion スキーマに似ていますが、厳密には互換性がありません。

ChatAgent は、Databricks の互換性のために既存のエージェントを簡単にラップします。

ChatAgent には、次の利点があります。

  • 高度なエージェント機能

    • マルチエージェントのサポート
    • ストリーミング出力: 出力をより小さなチャンクでストリーミングすることで、対話型ユーザー エクスペリエンスを有効にします。
    • 包括的なツール呼び出しメッセージ履歴: 品質と会話管理を向上させるために、中間ツール呼び出しメッセージを含む複数のメッセージを返します。
    • ツール呼び出しの確認のサポート
  • 開発、デプロイ、監視の合理化

    • 任意のフレームワークを使用してエージェントを作成する: ChatAgent インターフェイスを使用して既存のエージェントをラップして、AI Playground、Agent Evaluation、Agent Monitoring とのすぐに使用できる互換性を実現します。
    • 型指定された作成インターフェイス: IDE とノートブックオートコンプリートの恩恵を受け、型指定された Python クラスを使用してエージェント コードを記述します。
    • 自動署名推論: MLflow は、エージェントのログ記録時に ChatAgent 署名を自動的に推論し、登録とデプロイを簡略化します。 「ログ中にモデル シグネチャを推論する」を参照してください。
    • AI Gateway で強化された推論テーブル: デプロイされたエージェントに対して AI Gateway 推論テーブルが自動的に有効にされ、詳細な要求ログ メタデータへのアクセスが提供されます。

ChatAgentを作成する方法については、次のセクションの例と MLflow ドキュメント - ChatAgent インターフェイスとは何かを参照してください。

エージェントが既にある場合はどうすればよいですか?

LangChain、LangGraph、または同様のフレームワークで構築されたエージェントが既にある場合は、Databricks でエージェントを使用するようにエージェントを書き直す必要はありません。 代わりに、既存のエージェントを MLflow ChatAgent インターフェイスでラップするだけです。

  1. mlflow.pyfunc.ChatAgentから継承する Python ラッパー クラスを記述します。

    ラッパー クラス内で、既存のエージェントを属性 self.agent = your_existing_agentのままにします。

  2. ChatAgent クラスでは、ストリーミング以外の要求を処理するpredict メソッドを実装する必要があります。

    predict は以下を受け入れる必要があります。

    • messages: list[ChatAgentMessage]:ロール ("user" や "assistant" など)、プロンプト、ID を持つ各 ChatAgentMessage の一覧です。

    • (省略可能) 追加データ用の context: Optional[ChatContext] および custom_inputs: Optional[dict]

    import uuid
    
    # input example
    [
      ChatAgentMessage(
        id=str(uuid.uuid4()),  # Generate a unique ID for each message
        role="user",
        content="What's the weather in Paris?"
      )
    ]
    

    predictChatAgentResponseを返す必要があります。

    import uuid
    
    # output example
    ChatAgentResponse(
      messages=[
        ChatAgentMessage(
          id=str(uuid.uuid4()),  # Generate a unique ID for each message
          role="assistant",
          content="It's sunny in Paris."
        )
      ]
    )
    
  3. 形式間で変換する

    predictで、受信メッセージを list[ChatAgentMessage] からエージェントが想定する入力形式に変換します。

    エージェントが応答を生成したら、その出力を 1 つ以上の ChatAgentMessage オブジェクトに変換し、 ChatAgentResponseでラップします。

ヒント

LangChain の出力を自動的に変換する

LangChain エージェントをラップする場合は、 mlflow.langchain.output_parsers.ChatAgentOutputParser を使用して、LangChain 出力を MLflow ChatAgentMessage および ChatAgentResponse スキーマに自動的に変換できます。

エージェントを変換するための簡略化されたテンプレートを次に示します。

from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import ChatAgentMessage, ChatAgentResponse, ChatAgentChunk
import uuid


class MyWrappedAgent(ChatAgent):
  def __init__(self, agent):
    self.agent = agent

  def predict(self, messages, context=None, custom_inputs=None):
    # Convert messages to your agent's format
    agent_input = ... # build from messages
    agent_output = self.agent.invoke(agent_input)
    # Convert output to ChatAgentMessage
    return ChatAgentResponse(
      messages=[ChatAgentMessage(role="assistant", content=agent_output, id=str(uuid.uuid4()),)]
    )

  def predict_stream(self, messages, context=None, custom_inputs=None):
    # If your agent supports streaming
    for chunk in self.agent.stream(...):
      yield ChatAgentChunk(delta=ChatAgentMessage(role="assistant", content=chunk, id=str(uuid.uuid4())))

完全な例については、次のセクションのノートブックを参照してください。

ChatAgent の例

次のノートブックでは、一般的なライブラリ OpenAI、LangGraph、AutoGen を使用して、ストリーミング ChatAgents と非ストリーミング を作成する方法を示します。

LangGraph

LangChain エージェントをラップする場合は、 mlflow.langchain.output_parsers.ChatAgentOutputParser を使用して、LangChain 出力を MLflow ChatAgentMessage および ChatAgentResponse スキーマに自動的に変換できます。

LangGraph ツール呼び出しエージェント

ノートブックを入手

オープンAI

OpenAI ツール呼び出しエージェント

ノートブックを入手

OpenAI Responses API ツール呼び出しエージェント

ノートブックを入手

OpenAI チャット専用エージェント

ノートブックを入手

AutoGen

AutoGen ツール呼び出しエージェント

ノートブックを入手

DSPy

DSPy チャット専用エージェント

ノートブックを入手

ツールを追加してこれらのエージェントの機能を拡張する方法については、 AI エージェント ツールを参照してください。

マルチエージェントの例

Genie を使用してマルチエージェント システムを作成する方法については、 マルチエージェント システムでの Genie の使用に関するページを参照してください。

ストリーミング出力エージェント

ストリーミング エージェントは、より小さい増分チャンクの連続ストリームで応答を配信します。 ストリーミングにより、認識される待機時間が短縮され、会話エージェントのユーザー エクスペリエンスが向上します。

ストリーミング ChatAgentを作成するには、predict_stream オブジェクトを生成するジェネレーターを返すChatAgentChunk メソッドを定義します。各ChatAgentChunkには応答の一部が含まれます。 ChatAgentの理想的なストリーミング動作の詳細を参照してください。

次のコードは、 predict_stream 関数の例を示しています。ストリーミング エージェントの完全な例については、 ChatAgent の例を参照してください。

def predict_stream(
  self,
  messages: list[ChatAgentMessage],
  context: Optional[ChatContext] = None,
  custom_inputs: Optional[dict[str, Any]] = None,
) -> Generator[ChatAgentChunk, None, None]:
  # Convert messages to a format suitable for your agent
  request = {"messages": self._convert_messages_to_dict(messages)}

  # Stream the response from your agent
  for event in self.agent.stream(request, stream_mode="updates"):
    for node_data in event.values():
      # Yield each chunk of the response
      yield from (
        ChatAgentChunk(**{"delta": msg}) for msg in node_data["messages"]
      )

Databricks Model Serving 向けのデプロイ対応 ChatAgent を作成する

Databricks は、Databricks Model Serving の分散環境に ChatAgentをデプロイします。つまり、複数ターンの会話中に、同じサービス レプリカですべての要求が処理されない可能性があります。 エージェントの状態を管理する場合は、次の影響に注意してください。

  • ローカル キャッシュを回避する: ChatAgentをデプロイするときに、同じレプリカが複数ターンの会話内のすべての要求を処理すると想定しないでください。 ターンごとにディクショナリ ChatAgentRequest スキーマを使用して内部状態を再構築します。

  • スレッド セーフな状態: エージェントの状態をスレッド セーフに設計し、マルチスレッド環境での競合を防ぎます。

  • predict関数の状態を初期化する: 初期化中ではなく、predict関数が呼び出されるたびに状態ChatAgent初期化します。 ChatAgent レベルで状態を格納すると、1 つのChatAgent レプリカが複数の会話からの要求を処理できるため、会話間の情報が漏えいし、競合が発生する可能性があります。

カスタム入力と出力

シナリオによっては、 client_typesession_idなどの追加のエージェント入力や、今後の対話のためにチャット履歴に含めてはならない取得ソース リンクなどの出力が必要になる場合があります。

これらのシナリオでは、MLflow ChatAgent は、 custom_inputs および custom_outputsフィールドをネイティブにサポートします。

警告

エージェント評価レビュー アプリは、現在、追加の入力フィールドを持つエージェントのトレースのレンダリングをサポートしていません。

カスタム入力と出力を設定する方法については、次のノートブックを参照してください。

OpenAI + PyFunc カスタム スキーマ エージェント ノートブック

ノートブックを入手

LangGraph カスタム スキーマ エージェント ノートブック

ノートブックを入手

AI プレイグラウンドとエージェント レビュー アプリで custom_inputs を提供する

エージェントが custom_inputs フィールドを使用して追加の入力を受け入れる場合、AI Playgroundエージェントレビューアプリの両方でこれらの入力を手動で提供できます。

  1. AI プレイグラウンドまたはエージェントレビューアプリにある歯車アイコン 歯車アイコンを選択します。

  2. custom_inputsを有効にします。

  3. エージェントの定義された入力スキーマに一致する JSON オブジェクトを指定します。

    AI プレイグラウンドでカスタム入力を提供します。

カスタム取得スキーマを指定する

AI エージェントは、一般的に、ベクター検索インデックスから非構造化データを検索してクエリを実行するために、レトリバーを使用します。 レトリバー ツールの例については、 非構造化データの取得ツールのビルドとトレースを参照してください。

次のような Databricks 製品の機能を有効にするには、 MLflow RETRIEVER スパン を使用してエージェント内でこれらのレトリバーをトレースします。

  • AI Playground UI で取得したソース ドキュメントへのリンクを自動的に表示する
  • エージェント評価における検索の根拠と関連性の判断を自動的に実行する

Databricks では、 databricks_langchain.VectorSearchRetrieverTooldatabricks_openai.VectorSearchRetrieverTool などの Databricks AI Bridge パッケージによって提供されるレトリバー ツールを使用することをお勧めします。これは、既に MLflow レトリバー スキーマに準拠しているためです。 AI Bridge を使用したベクター検索取得ツールのローカル開発を参照してください。

エージェントにカスタム スキーマを含む取得機能スパンが含まれている場合は、コードでエージェントを定義するときに mlflow.models.set_retriever_schema を呼び出します。 これにより、取得元の出力列が MLflow の予期されるフィールド (primary_keytext_columndoc_uri) にマップされます。

import mlflow
# Define the retriever's schema by providing your column names
# For example, the following call specifies the schema of a retriever that returns a list of objects like
# [
#     {
#         'document_id': '9a8292da3a9d4005a988bf0bfdd0024c',
#         'chunk_text': 'MLflow is an open-source platform, purpose-built to assist machine learning practitioners...',
#         'doc_uri': 'https://mlflow.org/docs/latest/index.html',
#         'title': 'MLflow: A Tool for Managing the Machine Learning Lifecycle'
#     },
#     {
#         'document_id': '7537fe93c97f4fdb9867412e9c1f9e5b',
#         'chunk_text': 'A great way to get started with MLflow is to use the autologging feature. Autologging automatically logs your model...',
#         'doc_uri': 'https://mlflow.org/docs/latest/getting-started/',
#         'title': 'Getting Started with MLflow'
#     },
# ...
# ]
mlflow.models.set_retriever_schema(
    # Specify the name of your retriever span
    name="mlflow_docs_vector_search",
    # Specify the output column name to treat as the primary key (ID) of each retrieved document
    primary_key="document_id",
    # Specify the output column name to treat as the text content (page content) of each retrieved document
    text_column="chunk_text",
    # Specify the output column name to treat as the document URI of each retrieved document
    doc_uri="doc_uri",
    # Specify any other columns returned by the retriever
    other_columns=["title"],
)

doc_uri列は、レトリバーのパフォーマンスを評価する際に特に重要です。 doc_uri は、レトリバーによって返されるドキュメントの主な識別子であり、地上真偽評価セットと比較できます。 評価セット (MLflow 2) を参照してください。

環境間でのデプロイ用にエージェント コードをパラメーター化する

エージェント コードをパラメーター化して、異なる環境で同じエージェント コードを再利用できます。

パラメーターは、Python ディクショナリまたは .yaml ファイルで定義するキーと値のペアです。

コードを構成するには、Python ディクショナリまたは ModelConfigfile を使用して.yaml を作成します。 ModelConfig は、柔軟な構成管理を可能にするキー値パラメーターのセットです。 たとえば、開発中にディクショナリを使用し、運用環境のデプロイと CI/CD 用の .yaml ファイルに変換できます。

ModelConfigの詳細については、MLflow のドキュメントを参照してください。

ModelConfigの例を次に示します。

llm_parameters:
  max_tokens: 500
  temperature: 0.01
model_serving_endpoint: databricks-meta-llama-3-3-70b-instruct
vector_search_index: ml.docs.databricks_docs_index
prompt_template: 'You are a hello world bot. Respond with a reply to the user''s
  question that indicates your prompt template came from a YAML file. Your response
  must use the word "YAML" somewhere. User''s question: {question}'
prompt_template_input_vars:
  - question

エージェント コードでは、 .yaml ファイルまたはディクショナリから既定の (開発) 構成を参照できます。

import mlflow
# Example for loading from a .yml file
config_file = "configs/hello_world_config.yml"
model_config = mlflow.models.ModelConfig(development_config=config_file)

# Example of using a dictionary
config_dict = {
    "prompt_template": "You are a hello world bot. Respond with a reply to the user's question that is fun and interesting to the user. User's question: {question}",
    "prompt_template_input_vars": ["question"],
    "model_serving_endpoint": "databricks-meta-llama-3-3-70b-instruct",
    "llm_parameters": {"temperature": 0.01, "max_tokens": 500},
}

model_config = mlflow.models.ModelConfig(development_config=config_dict)

# Use model_config.get() to retrieve a parameter value
# You can also use model_config.to_dict() to convert the loaded config object
# into a dictionary
value = model_config.get('sample_param')

次に、エージェントのログを記録するときに、model_config パラメーターをlog_modelに指定して、ログ記録したエージェントを読み込む際に使用するカスタムパラメーターのセットを指定します。 MLflow のドキュメント - ModelConfig を参照してください。

ストリーミングエラーの伝播

モザイク AI は、databricks_output.errorの下の最後のトークンでのストリーミング中に発生したエラーを伝播します。 このエラーを適切に処理して表示するのは、呼び出し元のクライアント次第です。

{
  "delta": …,
  "databricks_output": {
    "trace": {...},
    "error": {
      "error_code": BAD_REQUEST,
      "message": "TimeoutException: Tool XYZ failed to execute."
    }
  }
}

次のステップ