Description scenario:
I have an Azure Service Bus and received data on a topic.
Also, I have an Azure function ( Service Bus Topic Trigger ) when received a message on Service Bus then this function runs some functions on this message. ( see the below code )
The steps of functions are (Details of this code)
- Received a message and convert it to JSON
- Check the received message is valid or not
- Check again a condition about the received message
- If the above condition is TRUE
- Create an array from the value of a filed in the received message
- Run feature extraction on the output of step 5
- Run normalization on the output of step 6
- Run classification on the output of step 7 and add labels into the received message
- The output of step 8, insert to database
Now, I want to know how can I implement and run these functions (steps) with Data Factory Activities as a pipeline. ( or other guide and suggestion about this scenario)
My code is:
import logging
import json
import pickle
import statistics
import config
import psycopg2
import pandas as pd
import numpy as np
import azure.functions as func
def main(message: func.ServiceBusMessage):
connection_db = psycopg2.connect(
f"host={config.database_url} dbname=developer user={config.database_username} password={config.database_password}")
cursor_connection = connection_db.cursor()
"""
this functions validate and filters data with the folloeing criteria:
message_type==50
logical_id=='BLOCK'
"""
message_body = message.get_body().decode("utf-8")
message_body = message_body.replace(";", ",")
message_json = json.loads(message_body)
print("Json Converted")
if message_json['error'] == {} and message_json['MSG_TYPE_TAG'] != '':
logging.info("Data is Valid")
else:
logging.info("Data Not Valid")
if int(message_json['MSG_TYPE_TAG']) == 50 and message_json['GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG']['logical_name'] == 'BLOCK':
message_filtered = message_json
"""
this functions makes one array from the recieved array data
"""
def _create_one_array(message_filtered):
acceleration_array_of_all = []
temp_array = message_filtered['GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG']['acceleration_array']
for value in temp_array:
acceleration_array_of_all.append(value)
return acceleration_array_of_all
"""
features extraction functions
"""
def percent_above_mean(acceleration_array_list):
percent_above_mean = 0
mean = np.mean(acceleration_array_list)
for i in acceleration_array_list:
if i > mean:
percent_above_mean += 1
return percent_above_mean/len(acceleration_array_list)
def variation_from_mean(acceleration_array_list):
variation_from_mean = 0
mean = np.mean(acceleration_array_list)
for value in acceleration_array_list:
variation_from_mean = variation_from_mean+abs(value-mean)
return variation_from_mean/len(acceleration_array_list)
def _feature_extraction(acceleration_array):
feature = dict()
feature['mean'] = np.mean(acceleration_array)
feature['max'] = max(acceleration_array)
feature['min'] = min(acceleration_array)
feature['std'] = np.std(acceleration_array)
feature['median'] = statistics.median(acceleration_array)
feature['L1'] = sum(list(map(abs, acceleration_array)))
feature['MAD'] = pd.Series(acceleration_array).mad()
feature['percent_above_mean'] = percent_above_mean(
acceleration_array)
feature['variation_from_mean'] = variation_from_mean(
acceleration_array)
features_dataframe = pd.DataFrame(feature, index=[0])
return features_dataframe
def _normalization(df):
scaler = pickle.load(open('scaler.sav', 'rb'))
scaler.transform(df)
return df
"""
classification function
"""
def _classification_lable(normalized_features):
classifier = pickle.load(
open('ExtraTreesClassifier.sav', 'rb'))
prediction = dict()
label = classifier.predict(normalized_features).tolist()[0]
if label == 0:
prediction['label'] = 'Hard'
else:
prediction['label'] = 'Easy'
probablity = classifier.predict_proba(normalized_features)
prediction['probability'] = round(max(probablity[0]), 2)
return prediction
def _classification(normalized_features):
label = _classification_lable(normalized_features)
return label
acceleration_array = _create_one_array(message_filtered)
extracted_features = _feature_extraction(acceleration_array)
normalized_features = _normalization(extracted_features)
label = _classification(normalized_features)
logging.info('functions done')
"""
Insert to database
"""
message_final = {**message_filtered, **
message_filtered['LOG_TAG']}
del message_final['error']
del message_final['LOG_TAG']
del message_final['acceleration_array']
message_final['label'] = []
message_final['probability'] = []
message_final['label'] = label['label']
message_final['probability'] = label['probability']
cursor_connection.execute(
'''INSERT into dci_output_lable VALUES (%(MSG_TYPE_TAG)s , %(ATTACHED_DEVICE_SERIAL_NUMBER_TAG)s, %(date_time)s , %(name)s , %(number)s , %(sequence)s , %(label)s , %(probability)s);''', message_final)
connection_db.commit()
logging.info("Insert to database done")
else:
logging.info(" Input data isn't BLOCKS")