Azure Database for PostgreSQL と Python を使用して RAG アプリケーションを構築する

完了

データベースの埋め込みとベクター インデックスの準備ができたので、次は、取得を動作するアプリケーションに変換します。 目標は単純です。ユーザーの質問を受け取り、PostgreSQL から最も関連性の高いチャンクを取得し、LangChain を介して Azure OpenAI を使用してそれらのチャンクに基づき回答を生成します。

このユニットでは、 Python で RAG アプリケーションを構築する基本について説明します。 アプリケーションはデータベースに接続し、データベースで類似性検索を実行し、検索結果をモデルに渡して明確な指示を出して、幻覚を回避します。 モデルはデータベース データに基づいて適切に定義されたコンテキストを受け取るので、そのコンテキストに基づいてより正確で関連性の高い回答を生成できます。

PostgreSQL と LangChain を使用した検索拡張生成

前のユニットから思い出してください。RAG フローは、取得と生成を組み合わせた一連の手順です。 いくつかに分けてみましょう。

  1. ユーザーが質問をします。
  2. データベースは質問の埋め込みを計算し、ベクター インデックスを使用して、データベース内の最も近い一致 (上位チャンク) を検索します。
  3. 上位のチャンクは、 "このコンテキストからのみ回答する" というプロンプトでモデルに渡されます。わからない場合は、そう言います。
  4. モデルは、提供されたコンテキストに基づいて自然言語の回答を返します。

LangChain は、言語モデルを使用してアプリケーションを構築するためのフレームワークを提供します。 これにより、さまざまなデータ ソースへの接続、プロンプトの管理、応答の処理のプロセスが簡略化されます。 Azure Database for PostgreSQLPythonLangChain を組み合わせることで、関連情報を取得し、正確な応答を生成する強力な RAG アプリケーションを作成できます。 RAG アプリケーションが進化する可能性がある一方で、これらの基本原則は開発の指針となります。

このアプリケーションでは、 LangChain AzureChatOpenAI ラッパーを使用して 、Azure OpenAI サービスと対話します。

次の Python の例では、次のテーブル構造を使用します。

CREATE TABLE company_policies (
    id SERIAL PRIMARY KEY,
    title TEXT,
    policy_text TEXT,
    embedding VECTOR(1536)
);

この表では、効率的な類似性検索を可能にするために、 embedding 列にベクター インデックスが作成されていることを前提としています。

各ステップの Python コード スニペットを確認してみましょう。 ほとんどの RAG アプリケーションは、同様の構造に従います。

データベースに接続する

接続の有効期間を短くし、セキュリティで保護します。 この例では、シークレットは環境変数内にあり、接続に渡されます。

import os, psycopg2
from contextlib import contextmanager

@contextmanager
def get_conn():
    conn = psycopg2.connect(
        host=os.getenv("PGHOST"),
        user=os.getenv("PGUSER"),
        password=os.getenv("PGPASSWORD"),
        dbname=os.getenv("PGDATABASE"),
        connect_timeout=10
    )
    try:
        yield conn
    finally:
        conn.close()

このスクリプトは、データベース接続用のコンテキスト マネージャーを設定し、使用後に適切に閉じられるようにします。 実際の取得に移る時です。

関連するチャンクを取得する

ユーザーが質問を行っているので、その質問でデータベースにクエリを実行する関数が必要です。 その関数は、質問に基づいて関連するチャンクを返す必要があります。 この例では、ベクトル インデックスが コサイン距離を使用してランク付けされていることを前提としているため、クエリにはそれぞれの演算子クラス (<=>) を使用します。

def retrieve_chunks(question, top_k=5):
    sql = """
    WITH q AS (
        SELECT azure_openai.create_embeddings(%s, %s)::vector AS qvec
    )
    SELECT id, title, policy_text
    FROM company_policies, q
    ORDER BY embedding <=> q.qvec
    LIMIT %s;
    """
    params = (os.getenv("OPENAI_EMBED_DEPLOYMENT"), question, top_k)
    with get_conn() as conn, conn.cursor() as cur:
        cur.execute(sql, params)
        rows = cur.fetchall()
    return [{"id": r[0], "title": r[1], "text": r[2]} for r in rows]

したがって、この関数は、ユーザーの質問に基づいてデータベースから関連するチャンクを取得します。 これらのチャンクは、後続の手順でコンテキスト対応の回答を生成するために使用されます。

LangChain を使用して回答を生成する

関連するチャンクを取得する関数が作成されたので、それらのチャンクを使用して回答を生成する必要があります。 モデルでは、取得したコンテキストのみを使用し、ポリシー タイトルを引用する必要があります。 回答生成関数を作成する時間。

from langchain_openai import AzureChatOpenAI

SYSTEM_PROMPT = """
You are a helpful assistant. Answer using ONLY the provided context.
If the answer is not in the context, say you don’t have enough information.
Cite policy titles in square brackets, e.g., [Vacation policy].
"""

def format_context(chunks):
    return "\n\n".join([f"[{c['title']}] {c['text']}" for c in chunks])

def generate_answer(question, chunks):
    llm = AzureChatOpenAI(
        azure_deployment=os.getenv("OPENAI_CHAT_DEPLOYMENT"),
        api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
        api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
        temperature=0
    )
    context = format_context(chunks)
    messages = [
        ("system", SYSTEM_PROMPT),
        ("human", f"Question: {question}\nContext:\n{context}")
    ]
    return llm.invoke(messages).content

AzureChatOpenAI を使用するには:

  • (ロール、コンテンツ) タプル (またはメッセージ オブジェクト) のリストとしてメッセージを渡します。
  • env vars とコンストラクター ( AZURE_OPENAI_API_KEYAZURE_OPENAI_ENDPOINTazure_deploymentapi_version) を使用して Azure 設定を指定します。
  • 事実に基づく回答を得るための temperature=0 を維持します。 値を大きくすると創造性が向上しますが、精度が低下する可能性があります。

この関数は RAG アプリケーションの中核であり、言語モデルとの対話を処理します。 取得されたチャンクがどのように書式設定され、コンテキストに含まれているかを確認します。 さらに、システム プロンプトは、モデルが提供されたコンテキストに準拠し、間違っている可能性がある AI によって生成される応答を減らすことを目的として設計されています。 最後に、言語モデルによってメッセージが処理され、 LangChain呼び出し メソッドを使用して応答が生成されます。 invoke メソッドは書式設定されたメッセージで呼び出され、モデルの応答は自然言語テキストとして返されます。

統合する

最後に必要なのは、フル フローを実行するための単純な関数です。

def answer_question(question):
    chunks = retrieve_chunks(question)
    if not chunks:
        return "I couldn’t find relevant content in the policy store."
    return generate_answer(question, chunks)

# Quick test
print(answer_question("How many vacation days do employees get?"))

この例では、データベースから関連するチャンクを取得し、それらを使用してコンテキスト対応の回答を生成します。 この関数呼び出しは、質問から回答の生成までの完全なフローを示しています。 まず、 retrieve_chunks 関数を呼び出して関連するコンテキスト (基本的にはデータベースから返される行) を取得します。 この関数は、そのコンテキストを言語モデルと対話する generate_answer 関数に渡して、自然言語応答として最終的な回答 (コンテキストの一部として行を使用) を生成します。 完全なフローにより、取得したデータに確実に回答が接地され、より正確で信頼性の高い応答が提供されます。

重要なポイント

実用的な RAG アプリケーションでは、ユーザーの質問を受け取り、SQL に埋め込みを作成し、ベクター インデックスを使用してデータベース内の最も近い通路をフェッチし、そのコンテキストのみをモデルに提供します。 幻覚を減らすために、モデルは提供されたコンテキスト内に留まり、情報が欠落している場合にはそれを認めるように指示されています。 接続の有効期間を短くし、クエリをパラメーター化し、環境変数を介してシークレットを渡します。 実際の回答には低温を使用し、軽量の引用文献 (タイトルまたは ID) を含めて応答を追跡できるようにします。

これで、Azure Database for PostgreSQLPython を使用して、取得拡張生成 (RAG) アプリケーションを構築する方法について理解を深める必要があります。 実際のシナリオでは RAG アプリケーションがはるかに複雑になる可能性がある一方で、コア原則は変わりません。