자습서: 사용자 지정 검색 엔진 및 질문 답변 시스템 만들기

이 자습서에서는 Spark 클러스터에서 로드된 큰 데이터를 인덱싱하고 쿼리하는 방법을 알아봅니다. 다음 작업을 수행하는 Jupyter Notebook을 설정합니다.

  • Apache Spark 세션의 데이터 프레임에 다양한 양식(청구서) 로드
  • 분석하여 해당 기능 확인
  • 결과 출력을 테이블 형식 데이터 구조로 어셈블
  • Azure Cognitive Search에서 호스트되는 검색 인덱스로 출력 쓰기
  • 만든 콘텐츠 탐색 및 쿼리

1 - 종속성 설정

먼저 패키지를 가져오고 이 워크플로에 사용되는 Azure 리소스에 연결합니다.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

cognitive_key = find_secret("cognitive-api-key") # replace with your cognitive api key
cognitive_location = "eastus"

translator_key = find_secret("translator-key") # replace with your cognitive api key
translator_location = "eastus"

search_key = find_secret("azure-search-key") # replace with your cognitive api key
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"

openai_key = find_secret("openai-api-key") # replace with your open ai api key
openai_service_name = "synapseml-openai"
openai_deployment_name = "gpt-35-turbo"
openai_url = f"https://{openai_service_name}.openai.azure.com/"

2 - Spark에 데이터 로드

이 코드는 데모용으로 사용되는 Azure Storage 계정에서 몇 개의 외부 파일을 로드합니다. 파일은 다양한 청구서이며 데이터 프레임으로 읽혀집니다.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def blob_to_url(blob):
    [prefix, postfix] = blob.split("@")
    container = prefix.split("/")[-1]
    split_postfix = postfix.split("/")
    account = split_postfix[0]
    filepath = "/".join(split_postfix[1:])
    return "https://{}/{}/{}".format(account, container, filepath)


df2 = (
    spark.read.format("binaryFile")
    .load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
    .select("path")
    .limit(10)
    .select(udf(blob_to_url, StringType())("path").alias("url"))
    .cache()
)

display(df2)

3 - 양식 인식 적용

이 코드는 AnalyzeInvoices 변환기를 로드하고 청구서가 포함된 데이터 프레임에 대한 참조를 전달합니다. Azure Forms Analyzer의 미리 빌드된 청구서 모델을 호출합니다.

from synapse.ml.cognitive import AnalyzeInvoices

analyzed_df = (
    AnalyzeInvoices()
    .setSubscriptionKey(cognitive_key)
    .setLocation(cognitive_location)
    .setImageUrlCol("url")
    .setOutputCol("invoices")
    .setErrorCol("errors")
    .setConcurrency(5)
    .transform(df2)
    .cache()
)

display(analyzed_df)

4 - 양식 인식 출력 간소화

이 코드는 Form Recognizer 변환기(Azure AI 문서 인텔리전스용)의 출력을 분석하고 테이블 형식 데이터 구조를 유추하는 변환기인 FormOntologyLearner를 사용합니다. AnalyzeInvoices의 출력은 동적이며 콘텐츠에서 검색된 기능에 따라 달라집니다.

FormOntologyLearner는 테이블 형식 데이터 구조를 만드는 데 사용할 수 있는 패턴을 찾아 AnalyzeInvoices 변환기의 유틸리티를 확장합니다. 출력을 여러 열과 행으로 구성하면 다운스트림 분석이 더 간단합니다.

from synapse.ml.cognitive import FormOntologyLearner

organized_df = (
    FormOntologyLearner()
    .setInputCol("invoices")
    .setOutputCol("extracted")
    .fit(analyzed_df)
    .transform(analyzed_df)
    .select("url", "extracted.*")
    .cache()
)

display(organized_df)

멋진 테이블 형식 데이터 프레임을 사용하여 양식에 있는 중첩된 테이블을 일부 SparkSQL로 평면화할 수 있습니다.

from pyspark.sql.functions import explode, col

itemized_df = (
    organized_df.select("*", explode(col("Items")).alias("Item"))
    .drop("Items")
    .select("Item.*", "*")
    .drop("Item")
)

display(itemized_df)

5 - 번역 추가

이 코드는 Azure AI 서비스에서 Azure AI 번역기 서비스를 호출하는 변환기인 Translate를 로드합니다. "설명" 열에서 영어로 된 원본 텍스트는 다양한 언어로 기계 번역됩니다. 모든 출력은 "output.translations" 배열로 통합됩니다.

from synapse.ml.cognitive import Translate

translated_df = (
    Translate()
    .setSubscriptionKey(translator_key)
    .setLocation(translator_location)
    .setTextCol("Description")
    .setErrorCol("TranslationError")
    .setOutputCol("output")
    .setToLanguage(["zh-Hans", "fr", "ru", "cy"])
    .setConcurrency(5)
    .transform(itemized_df)
    .withColumn("Translations", col("output.translations")[0])
    .drop("output", "TranslationError")
    .cache()
)

display(translated_df)

6 - OpenAI🤯를 사용하여 제품을 이모지로 변환

from synapse.ml.cognitive.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split

emoji_template = """ 
  Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma
  
  Two Ducks: 🦆🦆,
  Light Bulb: 💡,
  Three Peaches: 🍑🍑🍑,
  Two kitchen stoves: ♨️♨️,
  A red car: 🚗,
  A person and a cat: 🧍🐈,
  A {Description}: """

prompter = (
    OpenAIPrompt()
    .setSubscriptionKey(openai_key)
    .setDeploymentName(openai_deployment_name)
    .setUrl(openai_url)
    .setMaxTokens(5)
    .setPromptTemplate(emoji_template)
    .setErrorCol("error")
    .setOutputCol("Emoji")
)

emoji_df = (
    prompter.transform(translated_df)
    .withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
    .drop("error", "prompt")
    .cache()
)
display(emoji_df.select("Description", "Emoji"))

7 - OpenAI를 사용하여 공급업체 주소 대륙 유추

continent_template = """
Which continent does the following address belong to? 

Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica. 

Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.

Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""

continent_df = (
    prompter.setOutputCol("Continent")
    .setPromptTemplate(continent_template)
    .transform(emoji_df)
    .withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
    .drop("error", "prompt")
    .cache()
)
display(continent_df.select("VendorAddress", "Continent"))

8 - 양식에 대한 Azure Search 인덱스 만들기

from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit

(
    continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
    .withColumn("SearchAction", lit("upload"))
    .writeToAzureSearch(
        subscriptionKey=search_key,
        actionCol="SearchAction",
        serviceName=search_service,
        indexName=search_index,
        keyCol="DocID",
    )
)

9 - 검색 쿼리 사용해 보기

import requests

search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06".format(
    search_service, search_index
)
requests.post(
    search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()

10 - Azure Search를 도구🧠로 사용할 수 있는 챗봇 빌드🔧

import json
import openai

openai.api_type = "azure"
openai.api_base = openai_url
openai.api_key = openai_key
openai.api_version = "2023-03-15-preview"

chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:

{continent_df.columns}

If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""


def search_query_prompt(question):
    return f"""
Given the search engine above, what would you search for to answer the following question?

Question: "{question}"

Please output a json in the form of {{"query": "example_query"}}
"""


def search_result_prompt(query):
    search_results = requests.post(
        search_url, json={"search": query}, headers={"api-key": search_key}
    ).json()
    return f"""

You previously ran a search for "{query}" which returned the following results:

{search_results}

You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem. 
"""


def prompt_gpt(messages):
    response = openai.ChatCompletion.create(
        engine=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
    )
    return response["choices"][0]["message"]["content"]


def custom_chatbot(question):
    while True:
        try:
            query = json.loads(
                prompt_gpt(
                    [
                        {"role": "system", "content": chat_context_prompt},
                        {"role": "user", "content": search_query_prompt(question)},
                    ]
                )
            )["query"]

            return prompt_gpt(
                [
                    {"role": "system", "content": chat_context_prompt},
                    {"role": "system", "content": search_result_prompt(query)},
                    {"role": "user", "content": question},
                ]
            )
        except Exception as e:
            raise e

11 - 챗봇에 질문하기

custom_chatbot("What did Luke Diaz buy?")

12 - 빠른 더블 검사

display(
    continent_df.where(col("CustomerName") == "Luke Diaz")
    .select("Description")
    .distinct()
)