Tutorial: Index large data from Apache Spark using SynapseML and Cognitive Search

In this Azure Cognitive Search tutorial, learn how to index and query large data loaded from a Spark cluster. You'll set up a Jupyter Notebook that performs the following actions:

  • Load various forms (invoices) into a data frame in an Apache Spark session
  • Analyze them to determine their features
  • Assemble the resulting output into a tabular data structure
  • Write the output to a search index hosted in Azure Cognitive Search
  • Explore and query over the content you created

This tutorial takes a dependency on SynapseML, an open source library that supports massively parallel machine learning over big data. In SynapseML, search indexing and machine learning are exposed through transformers that perform specialized tasks. Transformers tap into a wide range of AI capabilities. In this exercise, you'll use the AzureSearchWriter APIs for analysis and AI enrichment.

Although Azure Cognitive Search has native AI enrichment, this tutorial shows you how to access AI capabilities outside of Cognitive Search. By using SynapseML instead of indexers or skills, you're not subject to data limits or other constraints associated with those objects.

Tip

Watch a short video of this demo at https://www.youtube.com/watch?v=iXnBLwp7f88. The video expands on this tutorial with more steps and visuals.

Prerequisites

You'll need the synapseml library and several Azure resources. If possible, use the same subscription and region for your Azure resources and put everything into one resource group for simple cleanup later. The following links are for portal installs. The sample data is imported from a public site.

1 This link resolves to a tutorial for loading the package.

2 You can use the free tier to index the sample data, but choose a higher tier if your data volumes are large. For non-free tiers, you'll need to provide the search API key in the Set up dependencies step further on.

3 This tutorial uses Azure Forms Recognizer and Azure Translator. In the instructions below, you'll provide a Cognitive Services multi-service key and the region, and it will work for both services.

4 This link resolves to an Azure Databricks tutorial. Azure Databricks provides the Spark computing platform. The Azure Databricks article listed in the prerequisites includes multiple steps. For this tutorial, follow only the instructions in "Create a workspace".

Note

All of the above Azure resources support security features in the Microsoft Identity platform. For simplicity, this tutorial assumes key-based authentication, using endpoints and keys copied from the portal pages of each service. If you implement this workflow in a production environment, or share the solution with others, remember to replace hard-coded keys with integrated security or encrypted keys.

1 - Create a Spark cluster and notebook

In this section, you'll create a cluster, install the synapseml library, and create a notebook to run the code.

  1. In Azure portal, find your Azure Databricks workspace and select Launch workspace.

  2. On the left menu, select Compute.

  3. Select Create cluster.

  4. Give the cluster a name, accept the default configuration, and then create the cluster. It takes several minutes to create the cluster.

  5. Install the synapseml library after the cluster is created:

    1. Select Library from the tabs at the top of the cluster's page.

    2. Select Install new.

      Screenshot of the Install New command.

    3. Select Maven.

    4. In Coordinates, enter com.microsoft.azure:synapseml_2.12:0.10.0

    5. Select Install.

      Screenshot of Maven package specification.

  6. On the left menu, select Create > Notebook.

    Screenshot of the Create Notebook command.

  7. Give the notebook a name, select Python as the default language, and select the cluster that has the synapseml library.

  8. Create seven consecutive cells. You'll paste code into each one.

    Screenshot of the notebook with placeholder cells.

2 - Set up dependencies

Paste the following code into the first cell of your notebook. Replace the placeholders with endpoints and access keys for each resource. No other modifications are required, so run the code when you're ready.

This code imports multiple packages and sets up access to the Azure resources used in this workflow.

import os
from pyspark.sql.functions import udf, trim, split, explode, col, monotonically_increasing_id, lit
from pyspark.sql.types import StringType
from synapse.ml.core.spark import FluentAPI

cognitive_services_key = "placeholder-cognitive-services-multi-service-key"
cognitive_services_region = "placeholder-cognitive-services-region"

search_service = "placeholder-search-service-name"
search_key = "placeholder-search-service-api-key"
search_index = "placeholder-search-index-name"

3 - Load data into Spark

Paste the following code into the second cell. No modifications are required, so run the code when you're ready.

This code loads a few external files from an Azure storage account that's used for demo purposes. The files are various invoices, and they're read into a data frame.

def blob_to_url(blob):
    [prefix, postfix] = blob.split("@")
    container = prefix.split("/")[-1]
    split_postfix = postfix.split("/")
    account = split_postfix[0]
    filepath = "/".join(split_postfix[1:])
    return "https://{}/{}/{}".format(account, container, filepath)


df2 = (spark.read.format("binaryFile")
    .load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
    .select("path")
    .limit(10)
    .select(udf(blob_to_url, StringType())("path").alias("url"))
    .cache())
    
display(df2)

4 - Add form recognition

Paste the following code into the third cell. No modifications are required, so run the code when you're ready.

This code loads the AnalyzeInvoices transformer and passes a reference to the data frame containing the invoices. It calls the pre-built invoice model of Azure Forms Analyzer.

from synapse.ml.cognitive import AnalyzeInvoices

analyzed_df = (AnalyzeInvoices()
    .setSubscriptionKey(cognitive_services_key)
    .setLocation(cognitive_services_region)
    .setImageUrlCol("url")
    .setOutputCol("invoices")
    .setErrorCol("errors")
    .setConcurrency(5)
    .transform(df2)
    .cache())

display(analyzed_df)

The output from this step should look similar to the next screenshot. Notice how the forms analysis is packed into a densely structured column, which is difficult to work with. The next transformation resolves this issue by parsing the column into rows and columns.

Screenshot of the AnalyzeInvoices output.

5 - Restructure form recognition output

Paste the following code into the fourth cell and run it. No modifications are required.

This code loads FormOntologyLearner, a transformer that analyzes the output of Form Recognizer transformers and infers a tabular data structure. The output of AnalyzeInvoices is dynamic and varies based on the features detected in your content. Furthermore, the transformer consolidates output into a single column. Because the output is dynamic and consolidated, it's difficult to use in downstream transformations that require more structure.

FormOntologyLearner extends the utility of the AnalyzeInvoices transformer by looking for patterns that can be used to create a tabular data structure. Organizing the output into multiple columns and rows makes the content consumable in other transformers, like AzureSearchWriter.

from synapse.ml.cognitive import FormOntologyLearner

itemized_df = (FormOntologyLearner()
    .setInputCol("invoices")
    .setOutputCol("extracted")
    .fit(analyzed_df)
    .transform(analyzed_df)
    .select("url", "extracted.*").select("*", explode(col("Items")).alias("Item"))
    .drop("Items").select("Item.*", "*").drop("Item"))

display(itemized_df)

Notice how this transformation recasts the nested fields into a table, which enables the next two transformations. This screenshot is trimmed for brevity. If you're following along in your own notebook, you'll have 19 columns and 26 rows.

Screenshot of the FormOntologyLearner output.

6 - Add translations

Paste the following code into the fifth cell. No modifications are required, so run the code when you're ready.

This code loads Translate, a transformer that calls the Azure Translator service in Cognitive Services. The original text, which is in English in the "Description" column, is machine-translated into various languages. All of the output is consolidated into "output.translations" array.

from synapse.ml.cognitive import Translate

translated_df = (Translate()
    .setSubscriptionKey(cognitive_services_key)
    .setLocation(cognitive_services_region)
    .setTextCol("Description")
    .setErrorCol("TranslationError")
    .setOutputCol("output")
    .setToLanguage(["zh-Hans", "fr", "ru", "cy"])
    .setConcurrency(5)
    .transform(itemized_df)
    .withColumn("Translations", col("output.translations")[0])
    .drop("output", "TranslationError")
    .cache())

display(translated_df)

Tip

To check for translated strings, scroll to the end of the rows.

Screenshot of table output, showing the Translations column.

7 - Add a search index with AzureSearchWriter

Paste the following code in the sixth cell and then run it. No modifications are required.

This code loads AzureSearchWriter. It consumes a tabular dataset and infers a search index schema that defines one field for each column. The translations structure is an array, so it's articulated in the index as a complex collection with subfields for each language translation. The generated index will have a document key and use the default values for fields created using the Create Index REST API.

from synapse.ml.cognitive import *

(translated_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
    .withColumn("SearchAction", lit("upload"))
    .writeToAzureSearch(
        subscriptionKey=search_key,
        actionCol="SearchAction",
        serviceName=search_service,
        indexName=search_index,
        keyCol="DocID",
    ))

You can check the search service pages in Azure portal to explore the index definition created by AzureSearchWriter.

Note

If you can't use default search index, you can provide an external custom definition in JSON, passing its URI as a string in the "indexJson" property. Generate the default index first so that you know which fields to specify, and then follow with customized properties if you need specific analyzers, for example.

8 - Query the index

Paste the following code into the seventh cell and then run it. No modifications are required, except that you might want to vary the syntax or try more examples to further explore your content:

There's no transformer or module that issues queries. This cell is a simple call to the Search Documents REST API.

This particular example is searching for the word "door" ("search": "door"). It also returns a "count" of the number of matching documents, and selects just the contents of the "Description' and "Translations" fields for the results. If you want to see the full list of fields, remove the "select" parameter.

import requests

url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2020-06-30".format(search_service, search_index)
requests.post(url, json={"search": "door", "count": "true", "select": "Description, Translations"}, headers={"api-key": search_key}).json()

The following screenshot shows the cell output for above script.

Screenshot of query results showing the count, search string, and return fields.

Clean up resources

When you're working in your own subscription, at the end of a project, it's a good idea to remove the resources that you no longer need. Resources left running can cost you money. You can delete resources individually or delete the resource group to delete the entire set of resources.

You can find and manage resources in the portal, using the All resources or Resource groups link in the left-navigation pane.

Next steps

In this tutorial, you learned about the AzureSearchWriter transformer in SynapseML, which is a new way of creating and loading search indexes in Azure Cognitive Search. The transformer takes structured JSON as an input. The FormOntologyLearner can provide the necessary structure for output produced by the Forms Recognizer transformers in SynapseML.

As a next step, review the other SynapseML tutorials that produce transformed content you might want to explore through Azure Cognitive Search: