Share via


자습서: SynapseML 및 Azure AI 검색을 사용하여 Apache Spark에서 대규모 데이터 인덱싱

이 Azure AI 검색 자습서에서는 Spark 클러스터에서 로드된 대용량 데이터를 인덱싱하고 쿼리하는 방법을 알아봅니다. 다음 작업을 수행하는 Jupyter Notebook을 설정합니다.

  • Apache Spark 세션의 데이터 프레임에 다양한 양식(청구서) 로드
  • 기능을 결정하기 위해 분석
  • 결과 출력을 표 형식 데이터 구조로 조립
  • Azure AI 검색에서 호스트되는 검색 인덱스에 출력 쓰기
  • 직접 만든 콘텐츠 탐색 및 쿼리

이 자습서는 빅 데이터에 대한 대규모 병렬 기계 학습을 지원하는 오픈 소스 라이브러리인 SynapseML에 의존합니다. SynapseML에서 검색 인덱싱 및 기계 학습은 전문 작업을 수행하는 변환기를 통해 노출됩니다. 변환기는 광범위한 AI 기능을 활용합니다. 이 연습에서는 분석 및 AI 보강을 위해 AzureSearchWriter API를 사용합니다.

Azure AI 검색에는 네이티브 AI 보강이 있지만 이 자습서에서는 Azure AI 검색 외부에서 AI 기능에 액세스하는 방법을 보여 줍니다. 인덱서 또는 기술 대신 SynapseML을 사용하면 해당 개체와 관련된 데이터 제한 또는 기타 제약 조건이 적용되지 않습니다.

https://www.youtube.com/watch?v=iXnBLwp7f88에서 이 데모의 간단한 동영상을 시청합니다. 동영상은 이 자습서에서 더 많은 단계와 시각 자료로 확장됩니다.

필수 조건

synapseml 라이브러리와 여러 Azure 리소스가 필요합니다. 가능하면 Azure 리소스에 대해 동일한 구독 및 지역을 사용하고 나중에 간단한 정리를 위해 모든 것을 하나의 리소스 그룹에 넣습니다. 다음 링크는 포털 설치용입니다. 샘플 데이터는 공용 사이트에서 가져옵니다.

1 이 링크는 패키지 로드에 대한 자습서로 연결됩니다.

2 무료 검색 계층을 사용하여 샘플 데이터의 인덱스를 생성할 수 있지만 데이터 볼륨이 큰 경우 더 높은 계층을 선택합니다. 청구 가능 계층의 경우 종속성 설정 단계에서 검색 API 키를 제공합니다.

3 이 자습서에서는 Azure AI 문서 인텔리전스 및 Azure AI 번역기를 사용합니다. 다음 지침에서는 다중 서비스 키 및 지역을 제공합니다. 두 서비스 모두에 동일한 키가 작동합니다.

4 이 자습서에서 Azure Databricks는 Spark 컴퓨팅 플랫폼을 제공합니다. 포털 지침을 사용하여 작업 영역을 설정했습니다.

참고 항목

위의 모든 Azure 리소스는 Microsoft ID 플랫폼의 보안 기능을 지원합니다. 단순성을 위해 이 자습서에서는 각 서비스의 포털 페이지에서 복사한 엔드포인트와 키를 사용하는 키 기반 인증을 가정합니다. 프로덕션 환경에서 이 워크플로를 구현하거나 솔루션을 다른 사람과 공유하는 경우 하드 코딩된 키를 통합 보안 또는 암호화된 키로 바꿔야 합니다.

1단계: Spark 클러스터 및 Notebook 만들기

이 섹션에서는 클러스터를 만들고 synapseml 라이브러리를 설치하고 코드를 실행할 Notebook을 만듭니다.

  1. Azure Portal에서 Azure Databricks 작업 영역을 찾고 작업 영역 시작을 선택합니다.

  2. 왼쪽 메뉴에서 컴퓨팅을 선택합니다.

  3. 컴퓨팅 만들기를 선택합니다.

  4. 기본 구성을 적용합니다. 클러스터를 만드는 데 몇 분 정도 걸립니다.

  5. 클러스터가 만들어진 후 synapseml 라이브러리를 설치합니다.

    1. 클러스터 페이지 상단의 탭에서 라이브러리를 선택합니다.

    2. 새로 설치를 선택합니다.

      새로 설치 명령의 스크린샷.

    3. Maven을 선택합니다.

    4. 좌표에 com.microsoft.azure:synapseml_2.12:1.0.4을 입력합니다.

    5. 설치를 선택합니다.

      Maven 패키지 사양 스크린샷.

  6. 왼쪽 메뉴에서 노트>만들기를 선택합니다.

    노트 만들기 명령의 스크린샷.

  7. Notebook에 이름을 지정하고 기본 언어로 Python을 선택한 다음 synapseml 라이브러리가 있는 클러스터를 선택합니다.

  8. 7개의 연속 셀을 만듭니다. 각 코드에 붙여넣습니다.

    자리 표시자 셀이 있는 Notebook의 스크린샷.

2단계: 종속성 설정

Notebook의 첫 번째 셀에 다음 코드를 붙여넣습니다.

자리 표시자를 각 리소스에 대한 엔드포인트 및 액세스 키로 바꿉니다. 새 검색 인덱스 이름을 제공합니다. 다른 수정이 필요하지 않으므로 준비가 되면 코드를 실행합니다.

이 코드는 여러 패키지를 가져오고 이 워크플로에서 사용되는 Azure 리소스에 대한 액세스를 설정합니다.

import os
from pyspark.sql.functions import udf, trim, split, explode, col, monotonically_increasing_id, lit
from pyspark.sql.types import StringType
from synapse.ml.core.spark import FluentAPI

cognitive_services_key = "placeholder-cognitive-services-multi-service-key"
cognitive_services_region = "placeholder-cognitive-services-region"

search_service = "placeholder-search-service-name"
search_key = "placeholder-search-service-api-key"
search_index = "placeholder-search-index-name"

3단계: Spark에 데이터 로드

다음 코드를 두 번째 셀에 붙여넣습니다. 수정이 필요하지 않으므로 준비가 되면 코드를 실행합니다.

이 코드는 Azure Storage 계정에서 몇 개의 외부 파일을 로드합니다. 파일은 다양한 청구서이며 데이터 프레임으로 읽혀집니다.

def blob_to_url(blob):
    [prefix, postfix] = blob.split("@")
    container = prefix.split("/")[-1]
    split_postfix = postfix.split("/")
    account = split_postfix[0]
    filepath = "/".join(split_postfix[1:])
    return "https://{}/{}/{}".format(account, container, filepath)


df2 = (spark.read.format("binaryFile")
    .load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
    .select("path")
    .limit(10)
    .select(udf(blob_to_url, StringType())("path").alias("url"))
    .cache())
    
display(df2)

4단계: 문서 인텔리전스 추가

세 번째 셀에 다음 코드를 붙여넣습니다. 수정이 필요하지 않으므로 준비가 되면 코드를 실행합니다.

이 코드는 AnalyzeInvoices 변환기를 로드하고 청구서가 포함된 데이터 프레임에 대한 참조를 전달합니다. Azure AI 문서 인텔리전스의 미리 빌드된 청구서 모델을 호출하여 청구서에서 정보를 추출합니다.

from synapse.ml.cognitive import AnalyzeInvoices

analyzed_df = (AnalyzeInvoices()
    .setSubscriptionKey(cognitive_services_key)
    .setLocation(cognitive_services_region)
    .setImageUrlCol("url")
    .setOutputCol("invoices")
    .setErrorCol("errors")
    .setConcurrency(5)
    .transform(df2)
    .cache())

display(analyzed_df)

이 단계의 출력은 다음 스크린샷과 유사해야 합니다. 양식 분석이 작업하기 어려운 조밀하게 구조화된 열에 어떻게 압축되어 있는지 확인합니다. 다음 변환은 열을 행과 열로 구문 분석하여 이 문제를 해결합니다.

AnalyzeInvoices 출력의 스크린샷.

5단계: 문서 인텔리전스 출력 재구성

다음 코드를 네 번째 셀에 붙여넣고 실행합니다. 수정이 필요하지 않습니다.

이 코드는 문서 인텔리전스의 출력을 분석하고 표 형식 데이터 구조를 유추하는 변환기인 FormOntologyLearner를 로드합니다. AnalyzeInvoices의 출력은 동적이며 콘텐츠에서 검색된 기능에 따라 다릅니다. 또한 변환기는 출력을 단일 열로 통합합니다. 출력이 동적이고 통합되기 때문에 더 많은 구조가 필요한 다운스트림 변환에 사용하기 어렵습니다.

FormOntologyLearner는 표 형식 데이터 구조를 만드는 데 사용할 수 있는 패턴을 찾아 AnalyzeInvoices 변환기의 유틸리티를 확장합니다. 출력을 여러 열과 행으로 구성하면 AzureSearchWriter와 같은 다른 변환기에서 콘텐츠를 사용할 수 있습니다.

from synapse.ml.cognitive import FormOntologyLearner

itemized_df = (FormOntologyLearner()
    .setInputCol("invoices")
    .setOutputCol("extracted")
    .fit(analyzed_df)
    .transform(analyzed_df)
    .select("url", "extracted.*").select("*", explode(col("Items")).alias("Item"))
    .drop("Items").select("Item.*", "*").drop("Item"))

display(itemized_df)

이 변환이 다음 두 변환을 사용하도록 설정하는 테이블로 중첩된 필드를 다시 캐스팅하는 방법에 주목합니다. 이 스크린샷은 간결함을 위해 잘립니다. 고유의 Notebook을 따라가는 경우 19개의 열과 26개의 행이 있습니다.

FormOntologyLearner 출력 스크린샷.

6단계: 번역 추가

다음 코드를 다섯 번째 셀에 붙여넣습니다. 수정이 필요하지 않으므로 준비가 되면 코드를 실행합니다.

이 코드는 Azure AI 서비스에서 Azure AI 번역기 서비스를 호출하는 변환기인 Translate를 로드합니다. "설명" 열에 영어로 된 원본 텍스트는 다양한 언어로 컴퓨터 번역됩니다. 모든 출력은 "output.translations" 배열로 통합됩니다.

from synapse.ml.cognitive import Translate

translated_df = (Translate()
    .setSubscriptionKey(cognitive_services_key)
    .setLocation(cognitive_services_region)
    .setTextCol("Description")
    .setErrorCol("TranslationError")
    .setOutputCol("output")
    .setToLanguage(["zh-Hans", "fr", "ru", "cy"])
    .setConcurrency(5)
    .transform(itemized_df)
    .withColumn("Translations", col("output.translations")[0])
    .drop("output", "TranslationError")
    .cache())

display(translated_df)

번역된 문자열을 확인하려면 행 끝까지 스크롤합니다.

번역 열을 보여 주는 테이블 출력의 스크린샷.

7단계: AzureSearchWriter를 사용하여 검색 인덱스 추가

여섯 번째 셀에 다음 코드를 붙여넣고 실행합니다. 수정이 필요하지 않습니다.

이 코드는 AzureSearchWriter를 로드합니다. 표 형식 데이터 세트를 사용하고 각 열에 대해 하나의 필드를 정의하는 검색 인덱스 스키마를 유추합니다. 번역 구조는 배열이므로 각 언어 번역에 대한 하위 필드가 있는 복잡한 컬렉션으로 인덱스에 연결됩니다. 만들어진 인덱스에는 문서 키가 있으며 Create Index REST API를 사용하여 만들어진 필드의 기본값을 사용합니다.

from synapse.ml.cognitive import *

(translated_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
    .withColumn("SearchAction", lit("upload"))
    .writeToAzureSearch(
        subscriptionKey=search_key,
        actionCol="SearchAction",
        serviceName=search_service,
        indexName=search_index,
        keyCol="DocID",
    ))

Azure Portal에서 Search Service 페이지를 확인하여 AzureSearchWriter에서 만든 인덱스 정의를 탐색할 수 있습니다.

참고 항목

기본 검색 인덱스를 사용할 수 없는 경우 "indexJson" 속성에서 해당 URI를 문자열로 전달하여 JSON에서 외부 사용자 지정 정의를 제공할 수 있습니다. 지정할 필드를 알 수 있도록 먼저 기본 인덱스를 생성한 다음, 예를 들어 특정 분석기가 필요한 경우 사용자 지정 속성을 따릅니다.

8단계: 인덱스 쿼리

다음 코드를 일곱 번째 셀에 붙여넣은 다음 실행합니다. 구문을 변경하거나 콘텐츠를 더 자세히 탐색하기 위해 더 많은 예를 시도하려는 경우를 제외하고 수정이 필요하지 않습니다.

쿼리를 발급하는 변환기나 모듈이 없습니다. 이 셀은 문서 검색 REST API에 대한 간단한 호출입니다.

이 특정 예는 "door"("search": "door")라는 단어를 검색하는 것입니다. 또한 일치하는 문서 수의 "개수"를 반환하고 결과에 대해 "설명" 및 "번역" 필드의 콘텐츠만 선택합니다. 전체 필드 목록을 보려면 "select" 매개 변수를 제거합니다.

import requests

url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2020-06-30".format(search_service, search_index)
requests.post(url, json={"search": "door", "count": "true", "select": "Description, Translations"}, headers={"api-key": search_key}).json()

다음 스크린샷은 샘플 스크립트의 셀 출력을 보여 줍니다.

카운트, 검색 문자열 및 반환 필드를 보여 주는 쿼리 결과의 스크린샷.

리소스 정리

사용자 고유의 구독에서 작업하는 경우 프로젝트의 끝에서 더 이상 필요하지 않은 리소스를 제거하는 것이 좋습니다. 계속 실행되는 리소스에는 요금이 부과될 수 있습니다. 리소스를 개별적으로 삭제하거나 리소스 그룹을 삭제하여 전체 리소스 세트를 삭제할 수 있습니다.

왼쪽 탐색 창의 모든 리소스 또는 리소스 그룹 링크를 사용하여 포털에서 리소스를 찾고 관리할 수 있습니다.

다음 단계

이 자습서에서는 Azure AI 검색에서 검색 인덱스를 만들고 로드하는 새로운 방법인 SynapseML의 AzureSearchWriter 변환기에 대해 알아보았습니다. 변환기는 구조화된 JSON을 입력으로 사용합니다. FormOntologyLearner는 SynapseML의 문서 인텔리전스 변환기에서 생성된 출력에 필요한 구조를 제공할 수 있습니다.

다음 단계로 Azure AI 검색을 통해 탐색할 수 있는 변환된 콘텐츠를 생성하는 다른 SynapseML 자습서를 검토합니다.