Hi
I'm dealing with a huge number of reviews (app. 300k) stored in many single-column csv files (for each market) in the Data Lake. As they're in many different languages, my goal is to translate them and conduct a sentiment analysis with the translated sentences.
As there are request limits (100 in case of Translator and 10 for Sentiment analysis) - I needed to divide the database into chunks and use some loops. The below code works just fine but it takes ages for so many sentences. Is there any way to speed up this process (multiprocessing etc.?) and/or optimize it to decrease the cost (a single run with standard pricing tier is app. 600 EUR which is extremely high compared with AWS Comprehend, for example).
from typing import Container
import pandas as pd
from azure.storage.blob import BlobServiceClient, BlobClient
from azure.storage.blob import ContainerClient
from azure.ai.textanalytics import TextAnalyticsClient
from azure.core.credentials import AzureKeyCredential
from io import StringIO
from datetime import date
import time
import re
import requests, uuid, json
# Starting timer
t0 = time.time()
###################################################################################################
# #
# Container for input data #
# #
###################################################################################################
# Necessary credentials
STORAGEACCOUNTURL='https://myblob.blob.core.windows.net'
STORAGEACCOUNTKEY= 'key'
DATA_VERSION = date.today().isoformat()
# Container with inpout data ('input' container in Data Lake)
input_container = ContainerClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY, container_name='input')
# List of all files (blobs) from the container within the data version file
input_blob_list = []
for blob in input_container.list_blobs():
input_blob_list.append(blob)
# List of names of these files (blobs)
input_blob_names = [input_blob_list[i].name for i in range(len(input_blob_list))]
# Loop for mapping clients and downloading the files
input_blob_clients = [BlobClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY,
container_name='input', blob_name=i) for i in input_blob_names]
# Downloading blobs
t1 = time.time()
downloaded_input_blobs = [blob.download_blob() for blob in input_blob_clients]
t2 = time.time()
print(('It took %s seconds to download the input files.') % (t2 - t1))
# Read the files as *.csv
input_df_list = [pd.read_csv(StringIO(downloaded_input_blobs[i].content_as_text()), header=None, sep=';', names=['sentence'])
for i in range(len(input_blob_clients))]
# Dataframe
df = pd.concat(input_df_list, ignore_index=True)
# List for phrases
phrases = []
for i in range(len(df['sentence'])):
phrases.append({'text': df['sentence'][i]})
###################################################################################################
# #
# Translation if necessary #
# #
###################################################################################################
# Text service key and endpoint
translator_subscription_key = 'key'
translator_endpoint = 'https://api.cognitive.microsofttranslator.com'
# Adding the location, also known as region. The default is global.
# This is required if using a Cognitive Services resource.
location = 'westeurope'
path = '/translate'
constructed_url = translator_endpoint + path
params = {
'api-version': '3.0',
'to': ['en']
}
constructed_url = translator_endpoint + path
headers = {
'Ocp-Apim-Subscription-Key': translator_subscription_key,
'Ocp-Apim-Subscription-Region': location,
'Content-type': 'application/json',
'X-ClientTraceId': str(uuid.uuid4())
}
# As there is 100 rows limit per request - the dataset needs to be divided
# into 100-sized chunks sub lists.
chunked_translator_list = list()
chunk_translator_size = 100
for i in range(0, len(phrases), chunk_translator_size):
chunked_translator_list.append(phrases[i:i+chunk_translator_size])
# Loop for translating chunks
translates = []
for i in range(len(chunked_translator_list)):
request = requests.post(constructed_url, params=params, headers=headers, json=chunked_translator_list[i])
response = request.json()
translates.append(response)
# Extracting data from the result list
sentences = [df['sentence'][i] for i in range(len(df['sentence']))]
languages = [translates[i][j]['detectedLanguage']['language'] for i in range(len(translates)) for j in range(len(translates[i]))]
scores = [translates[i][j]['detectedLanguage']['score'] for i in range(len(translates)) for j in range(len(translates[i]))]
texts_english = [translates[i][j]['translations'][0]['text'] for i in range(len(translates)) for j in range(len(translates[i]))]
# DataFrame with translations
translations_df = pd.DataFrame({'sentence': sentences,
'language': languages,
'score': scores,
'text_english': texts_english,
})
###################################################################################################
# #
# Sentiment Analysis #
# #
###################################################################################################
# Language service key and endpoint
language_key = 'key'
language_endpoint = 'https://westeurope.api.cognitive.microsoft.com/'
# Authenticate the client using your key and endpoint
def authenticate_client():
ta_credential = AzureKeyCredential(language_key)
text_analytics_client = TextAnalyticsClient(
endpoint=language_endpoint,
credential=ta_credential)
return text_analytics_client
client = authenticate_client()
en_phrases = []
for i in range(len(translations_df['text_english'])):
en_phrases.append(translations_df['text_english'][i])
# As there is 10 rows limit per request - the dataset needs to be divided
# into 10-sized chunks sub lists.
chunked_language_list = list()
chunk_language_size = 10
for i in range(0, len(en_phrases), chunk_language_size):
chunked_language_list.append(en_phrases[i:i+chunk_language_size])
# Loop for sentiment analysis for chunks
responses = []
for i in range(len(chunked_language_list)):
responses.append(client.analyze_sentiment(documents=chunked_language_list[i]))
# Extracting data from the result list
sentiments = [responses[i][j].sentiment for i in range(len(responses)) for j in range(len(responses[i]))]
sentiments_score_positive = [responses[i][j].confidence_scores['positive'] for i in range(len(responses)) for j in range(len(responses[i]))]
sentiments_score_neutral = [responses[i][j].confidence_scores['neutral'] for i in range(len(responses)) for j in range(len(responses[i]))]
sentiments_score_negative = [responses[i][j].confidence_scores['negative'] for i in range(len(responses)) for j in range(len(responses[i]))]
# DataFrame with results (original sentence, language detection, translation, sentiment analysis)
results_df = translations_df
results_df['sentiment'] = sentiments
results_df['positive_score'] = sentiments_score_positive
results_df['neutral_score'] = sentiments_score_neutral
results_df['negative_score'] = sentiments_score_negative
###################################################################################################
# #
# Upload the results to the new Container #
# #
###################################################################################################
container_results = ContainerClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY, container_name='results')
# container_results.create_container()
# Upload the results to the output blob
t1 = time.time()
output = results_df.to_csv(encoding='utf-8', index=False)
container_results.upload_blob('results/{}/output.csv'.format(DATA_VERSION), output, overwrite=True, encoding='utf-8')
t2 = time.time()
print(('It took %s seconds to upload results file to the container.') % (t2 - t1))
# Timer summary
t3 = time.time()
print(('The whole ETL process took %s seconds.') % (t3 - t0))
Would really appreciate some help.