Esercitazione: Indicizzare dati di grandi dimensioni da Apache Spark usando SynapseML e Ricerca di intelligenza artificiale di Azure

Questa esercitazione su Ricerca intelligenza artificiale di Azure illustra come indicizzare ed eseguire query su dati di grandi dimensioni caricati da un cluster Spark. Configurare un notebook di Jupyter che esegue le azioni seguenti:

  • Caricare vari moduli (fatture) in un frame di dati in una sessione di Apache Spark
  • Analizzarli per determinare le relative funzionalità
  • Assemblare l'output risultante in una struttura di dati tabulari
  • Scrivere l'output in un indice di ricerca ospitato in Ricerca di intelligenza artificiale di Azure
  • Esplorare ed eseguire query sul contenuto creato

Questa esercitazione usa una dipendenza da SynapseML, una libreria open source che supporta l'apprendimento automatico in parallelo massiccio su Big Data. In SynapseML, l'indicizzazione della ricerca e l'apprendimento automatico vengono esposte tramite trasformatori che eseguono attività specializzate. I trasformatori sfruttano un'ampia gamma di funzionalità di intelligenza artificiale. In questo esercizio usare le API AzureSearchWriter per l'analisi e l'arricchimento tramite intelligenza artificiale.

Anche se Ricerca intelligenza artificiale di Azure include l'arricchimento nativo dell'intelligenza artificiale, questa esercitazione illustra come accedere alle funzionalità di intelligenza artificiale all'esterno di Ricerca intelligenza artificiale di Azure. Usando SynapseML invece di indicizzatori o competenze, non si è soggetti ai limiti dei dati o ad altri vincoli associati a tali oggetti.

Suggerimento

Guardare un breve video di questa demo all'indirizzo https://www.youtube.com/watch?v=iXnBLwp7f88. Il video si espande in questa esercitazione con altri passaggi e oggetti visivi.

Prerequisiti

Sono necessarie la synapseml libreria e diverse risorse di Azure. Se possibile, usare la stessa sottoscrizione e la stessa area per le risorse di Azure e inserire tutti gli elementi in un unico gruppo di risorse per una pulizia semplice in un secondo momento. I collegamenti seguenti sono relativi alle installazioni del portale. I dati di esempio sono importati da un sito pubblico.

1 Questo collegamento si risolve in un'esercitazione per il caricamento del pacchetto.

2 È possibile usare il livello di ricerca gratuito per indicizzare i dati di esempio, ma scegliere un livello superiore se i volumi di dati sono di grandi dimensioni. Per i livelli fatturabili, specificare la chiave API di ricerca nel passaggio Configurare le dipendenze .

3 Questa esercitazione usa Azure AI Document Intelligence e Azure AI Traduttore. Nelle istruzioni seguenti specificare una chiave multiservizio e l'area. La stessa chiave funziona per entrambi i servizi.

4 In questa esercitazione Azure Databricks offre la piattaforma di calcolo Spark. Sono state usate le istruzioni del portale per configurare l'area di lavoro.

Nota

Tutte le risorse di Azure precedenti supportano le funzionalità di sicurezza in Microsoft Identity Platform. Per semplicità, questa esercitazione presuppone l'autenticazione basata su chiave, usando endpoint e chiavi copiati dalle pagine del portale di ogni servizio. Se si implementa questo flusso di lavoro in un ambiente di produzione o si condivide la soluzione con altri utenti, ricordarsi di sostituire le chiavi hardcoded con chiavi di sicurezza o crittografate integrate.

Passaggio 1: Creare un cluster Spark e un notebook

In questa sezione creare un cluster, installare la synapseml libreria e creare un notebook per eseguire il codice.

  1. In portale di Azure trovare l'area di lavoro di Azure Databricks e selezionare Avvia area di lavoro.

  2. Nel menu a sinistra selezionare Calcolo.

  3. Selezionare Crea calcolo.

  4. Accettare la configurazione predefinita. La creazione del cluster richiede alcuni minuti.

  5. Installare la synapseml libreria dopo la creazione del cluster:

    1. Selezionare Librerie nelle schede nella parte superiore della pagina del cluster.

    2. Selezionare Installa nuovo.

      Screenshot del comando Installa nuovo.

    3. Selezionare Maven.

    4. In Coordinate immettere com.microsoft.azure:synapseml_2.12:1.0.4

    5. Selezionare Installa.

      Screenshot della specifica del pacchetto Maven.

  6. Nel menu a sinistra selezionare Crea>notebook.

    Screenshot del comando Crea notebook.

  7. Assegnare un nome al notebook, selezionare Python come linguaggio predefinito e selezionare il cluster con la synapseml libreria.

  8. Creare sette celle consecutive. Incollare il codice in ognuno di essi.

    Screenshot del notebook con celle segnaposto.

Passaggio 2: Configurare le dipendenze

Incollare il codice seguente nella prima cella del notebook.

Sostituire i segnaposto con endpoint e chiavi di accesso per ogni risorsa. Specificare un nome per un nuovo indice di ricerca. Non sono necessarie altre modifiche, quindi eseguire il codice quando si è pronti.

Questo codice importa più pacchetti e configura l'accesso alle risorse di Azure usate in questo flusso di lavoro.

import os
from pyspark.sql.functions import udf, trim, split, explode, col, monotonically_increasing_id, lit
from pyspark.sql.types import StringType
from synapse.ml.core.spark import FluentAPI

cognitive_services_key = "placeholder-cognitive-services-multi-service-key"
cognitive_services_region = "placeholder-cognitive-services-region"

search_service = "placeholder-search-service-name"
search_key = "placeholder-search-service-api-key"
search_index = "placeholder-search-index-name"

Passaggio 3: Caricare i dati in Spark

Incollare il codice seguente nella seconda cella. Non sono necessarie modifiche, quindi eseguire il codice quando si è pronti.

Questo codice carica alcuni file esterni da un account di archiviazione di Azure. I file sono diverse fatture e vengono letti in un frame di dati.

def blob_to_url(blob):
    [prefix, postfix] = blob.split("@")
    container = prefix.split("/")[-1]
    split_postfix = postfix.split("/")
    account = split_postfix[0]
    filepath = "/".join(split_postfix[1:])
    return "https://{}/{}/{}".format(account, container, filepath)


df2 = (spark.read.format("binaryFile")
    .load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
    .select("path")
    .limit(10)
    .select(udf(blob_to_url, StringType())("path").alias("url"))
    .cache())
    
display(df2)

Passaggio 4: Aggiungere funzionalità di intelligence per i documenti

Incollare il codice seguente nella terza cella. Non sono necessarie modifiche, quindi eseguire il codice quando si è pronti.

Questo codice carica il trasformatore AnalyzeInvoices e passa un riferimento al frame di dati contenente le fatture. Chiama il modello di fattura predefinito di Intelligence per i documenti di Intelligenza artificiale di Azure per estrarre informazioni dalle fatture.

from synapse.ml.cognitive import AnalyzeInvoices

analyzed_df = (AnalyzeInvoices()
    .setSubscriptionKey(cognitive_services_key)
    .setLocation(cognitive_services_region)
    .setImageUrlCol("url")
    .setOutputCol("invoices")
    .setErrorCol("errors")
    .setConcurrency(5)
    .transform(df2)
    .cache())

display(analyzed_df)

L'output di questo passaggio dovrebbe essere simile allo screenshot successivo. Si noti che l'analisi dei moduli viene compressa in una colonna densamente strutturata, che è difficile da usare. La trasformazione successiva risolve questo problema analizzando la colonna in righe e colonne.

Screenshot dell'output AnalyzeInvoices.

Passaggio 5: Ristrutturare l'output di Intelligence per i documenti

Incollare il codice seguente nella quarta cella ed eseguirlo. Non sono necessarie modifiche.

Questo codice carica FormOntologyLearner, un trasformatore che analizza l'output dei trasformatori di Document Intelligence e deduce una struttura di dati tabulari. L'output di AnalyzeInvoices è dinamico e varia in base alle funzionalità rilevate nel contenuto. Inoltre, il trasformatore consolida l'output in una singola colonna. Poiché l'output è dinamico e consolidato, è difficile usare nelle trasformazioni downstream che richiedono una struttura maggiore.

FormOntologyLearner estende l'utilità del trasformatore AnalyzeInvoices cercando modelli che possono essere usati per creare una struttura di dati tabulari. L'organizzazione dell'output in più colonne e righe rende il contenuto utilizzabile in altri trasformatori, ad esempio AzureSearchWriter.

from synapse.ml.cognitive import FormOntologyLearner

itemized_df = (FormOntologyLearner()
    .setInputCol("invoices")
    .setOutputCol("extracted")
    .fit(analyzed_df)
    .transform(analyzed_df)
    .select("url", "extracted.*").select("*", explode(col("Items")).alias("Item"))
    .drop("Items").select("Item.*", "*").drop("Item"))

display(itemized_df)

Si noti che questa trasformazione esegue il recast dei campi annidati in una tabella, che consente le due trasformazioni successive. Questo screenshot viene tagliato per brevità. Se si segue nel proprio notebook, sono presenti 19 colonne e 26 righe.

Screenshot dell'output FormOntologyLearner.

Passaggio 6: Aggiungere traduzioni

Incollare il codice seguente nella quinta cella. Non sono necessarie modifiche, quindi eseguire il codice quando si è pronti.

Questo codice carica Translate, un trasformatore che chiama il servizio Azure AI Traduttore nei servizi di intelligenza artificiale di Azure. Il testo originale, che è in inglese nella colonna "Descrizione", viene tradotto in varie lingue. Tutto l'output viene consolidato nella matrice "output.translations".

from synapse.ml.cognitive import Translate

translated_df = (Translate()
    .setSubscriptionKey(cognitive_services_key)
    .setLocation(cognitive_services_region)
    .setTextCol("Description")
    .setErrorCol("TranslationError")
    .setOutputCol("output")
    .setToLanguage(["zh-Hans", "fr", "ru", "cy"])
    .setConcurrency(5)
    .transform(itemized_df)
    .withColumn("Translations", col("output.translations")[0])
    .drop("output", "TranslationError")
    .cache())

display(translated_df)

Suggerimento

Per verificare la presenza di stringhe tradotte, scorrere fino alla fine delle righe.

Screenshot dell'output della tabella che mostra la colonna Traduzioni.

Passaggio 7: Aggiungere un indice di ricerca con AzureSearchWriter

Incollare il codice seguente nella sesta cella e quindi eseguirlo. Non sono necessarie modifiche.

Questo codice carica AzureSearchWriter. Usa un set di dati tabulare e deduce uno schema dell'indice di ricerca che definisce un campo per ogni colonna. Poiché la struttura delle traduzioni è una matrice, è articolata nell'indice come raccolta complessa con sottocampi per ogni traduzione linguistica. L'indice generato ha una chiave del documento e usa i valori predefiniti per i campi creati usando l'API REST Crea indice.

from synapse.ml.cognitive import *

(translated_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
    .withColumn("SearchAction", lit("upload"))
    .writeToAzureSearch(
        subscriptionKey=search_key,
        actionCol="SearchAction",
        serviceName=search_service,
        indexName=search_index,
        keyCol="DocID",
    ))

È possibile controllare le pagine del servizio di ricerca in portale di Azure per esplorare la definizione dell'indice creata da AzureSearchWriter.

Nota

Se non è possibile usare l'indice di ricerca predefinito, è possibile fornire una definizione personalizzata esterna in JSON, passandone l'URI come stringa nella proprietà "indexJson". Generare prima l'indice predefinito in modo da conoscere i campi da specificare e quindi seguire con le proprietà personalizzate, ad esempio se sono necessari analizzatori specifici.

Passaggio 8: Eseguire una query sull'indice

Incollare il codice seguente nella settima cella e quindi eseguirlo. Non sono necessarie modifiche, ad eccezione del fatto che si potrebbe voler variare la sintassi o provare altri esempi per esplorare ulteriormente il contenuto:

Nessun trasformatore o modulo che genera query. Questa cella è una semplice chiamata all'API REST Cerca documenti.

Questo particolare esempio cerca la parola "porta" ("search": "door"). Restituisce anche un "conteggio" del numero di documenti corrispondenti e seleziona solo il contenuto dei campi "Descrizione" e "Traduzioni" per i risultati. Per visualizzare l'elenco completo dei campi, rimuovere il parametro "select".

import requests

url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2020-06-30".format(search_service, search_index)
requests.post(url, json={"search": "door", "count": "true", "select": "Description, Translations"}, headers={"api-key": search_key}).json()

Lo screenshot seguente mostra l'output della cella per lo script di esempio.

Screenshot dei risultati della query che mostra il conteggio, la stringa di ricerca e i campi restituiti.

Pulire le risorse

Quando si lavora nella propria sottoscrizione, alla fine di un progetto è opportuno rimuovere le risorse che non sono più necessarie. Le risorse che rimangono in esecuzione hanno un costo. È possibile eliminare risorse singole oppure gruppi di risorse per eliminare l'intero set di risorse.

Per trovare e gestire le risorse nel portale, usare il collegamento Tutte le risorse o Gruppi di risorse nel riquadro di spostamento a sinistra.

Passaggi successivi

In questa esercitazione è stato illustrato il trasformatore AzureSearchWriter in SynapseML, un nuovo modo per creare e caricare indici di ricerca in Ricerca di intelligenza artificiale di Azure. Il trasformatore accetta json strutturato come input. FormOntologyLearner può fornire la struttura necessaria per l'output prodotto dai trasformatori di Document Intelligence in SynapseML.

Come passaggio successivo, esaminare le altre esercitazioni di SynapseML che producono contenuto trasformato da esplorare tramite Ricerca di intelligenza artificiale di Azure: