How to implement some python functions with Azure Data Factory activity?

Mohsen Akhavan 831 Reputation points
2021-07-25T09:16:00.743+00:00

Description scenario:
I have an Azure Service Bus and received data on a topic.
Also, I have an Azure function ( Service Bus Topic Trigger ) when received a message on Service Bus then this function runs some functions on this message. ( see the below code )
The steps of functions are (Details of this code)

  1. Received a message and convert it to JSON
  2. Check the received message is valid or not
  3. Check again a condition about the received message
  4. If the above condition is TRUE
  5. Create an array from the value of a filed in the received message
  6. Run feature extraction on the output of step 5
  7. Run normalization on the output of step 6
  8. Run classification on the output of step 7 and add labels into the received message
  9. The output of step 8, insert to database

Now, I want to know how can I implement and run these functions (steps) with Data Factory Activities as a pipeline. ( or other guide and suggestion about this scenario)

My code is:

import logging
import json
import pickle
import statistics
import config
import psycopg2
import pandas as pd
import numpy as np
import azure.functions as func


def main(message: func.ServiceBusMessage):

    connection_db = psycopg2.connect(
        f"host={config.database_url} dbname=developer user={config.database_username} password={config.database_password}")
    cursor_connection = connection_db.cursor()

    """
    this functions validate and filters data with the folloeing criteria:
    message_type==50
    logical_id=='BLOCK'
    """
    message_body = message.get_body().decode("utf-8")
    message_body = message_body.replace(";", ",")
    message_json = json.loads(message_body)
    print("Json Converted")
    if message_json['error'] == {} and message_json['MSG_TYPE_TAG'] != '':
        logging.info("Data is Valid")
    else:
        logging.info("Data Not Valid")

    if int(message_json['MSG_TYPE_TAG']) == 50 and message_json['GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG']['logical_name'] == 'BLOCK':
        message_filtered = message_json

        """
        this functions makes one array from the recieved array data
        """

        def _create_one_array(message_filtered):
            acceleration_array_of_all = []
            temp_array = message_filtered['GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG']['acceleration_array']
            for value in temp_array:
                acceleration_array_of_all.append(value)
            return acceleration_array_of_all

        """
        features extraction functions
        """

        def percent_above_mean(acceleration_array_list):
            percent_above_mean = 0
            mean = np.mean(acceleration_array_list)
            for i in acceleration_array_list:
                if i > mean:
                    percent_above_mean += 1
            return percent_above_mean/len(acceleration_array_list)

        def variation_from_mean(acceleration_array_list):
            variation_from_mean = 0
            mean = np.mean(acceleration_array_list)
            for value in acceleration_array_list:
                variation_from_mean = variation_from_mean+abs(value-mean)
            return variation_from_mean/len(acceleration_array_list)

        def _feature_extraction(acceleration_array):
            feature = dict()
            feature['mean'] = np.mean(acceleration_array)
            feature['max'] = max(acceleration_array)
            feature['min'] = min(acceleration_array)
            feature['std'] = np.std(acceleration_array)
            feature['median'] = statistics.median(acceleration_array)
            feature['L1'] = sum(list(map(abs, acceleration_array)))
            feature['MAD'] = pd.Series(acceleration_array).mad()
            feature['percent_above_mean'] = percent_above_mean(
                acceleration_array)
            feature['variation_from_mean'] = variation_from_mean(
                acceleration_array)
            features_dataframe = pd.DataFrame(feature, index=[0])
            return features_dataframe

        def _normalization(df):
            scaler = pickle.load(open('scaler.sav', 'rb'))
            scaler.transform(df)
            return df

        """
        classification function
        """

        def _classification_lable(normalized_features):
            classifier = pickle.load(
                open('ExtraTreesClassifier.sav', 'rb'))
            prediction = dict()
            label = classifier.predict(normalized_features).tolist()[0]
            if label == 0:
                prediction['label'] = 'Hard'
            else:
                prediction['label'] = 'Easy'
            probablity = classifier.predict_proba(normalized_features)
            prediction['probability'] = round(max(probablity[0]), 2)
            return prediction

        def _classification(normalized_features):
            label = _classification_lable(normalized_features)
            return label

        acceleration_array = _create_one_array(message_filtered)
        extracted_features = _feature_extraction(acceleration_array)
        normalized_features = _normalization(extracted_features)
        label = _classification(normalized_features)
        logging.info('functions done')

        """
        Insert to database
        """
        message_final = {**message_filtered, **
                         message_filtered['LOG_TAG']}
        del message_final['error']
        del message_final['LOG_TAG']
        del message_final['acceleration_array']

        message_final['label'] = []
        message_final['probability'] = []
        message_final['label'] = label['label']
        message_final['probability'] = label['probability']
        cursor_connection.execute(
            '''INSERT into dci_output_lable VALUES (%(MSG_TYPE_TAG)s , %(ATTACHED_DEVICE_SERIAL_NUMBER_TAG)s, %(date_time)s , %(name)s , %(number)s , %(sequence)s , %(label)s , %(probability)s);''', message_final)
        connection_db.commit()
        logging.info("Insert to database done")

    else:
        logging.info(" Input data isn't BLOCKS")
Azure Service Bus
Azure Service Bus
An Azure service that provides cloud messaging as a service and hybrid integration.
700 questions
Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
5,909 questions
Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
11,623 questions
{count} votes

1 answer

Sort by: Most helpful
  1. ShaikMaheer-MSFT 38,546 Reputation points Microsoft Employee Moderator
    2021-07-26T06:47:04.9+00:00

    Hi @Mohsen Akhavan ,

    Thank you for posting your query in Microsoft Q&A Platform.

    We can leverage "Azure Function" activity to run your Azure Function in Azure data factory pipeline.

    The Azure Function activity allows you to run Azure Functions in a Data Factory pipeline. To run an Azure Function, you need to create a linked service connection and an activity that specifies the Azure Function that you plan to execute.

    Please Note, The return type of the Azure function has to be a valid JObject.

    Kindly go through below documentation to know how to use Azure Function activity and other details.
    https://learn.microsoft.com/en-us/azure/data-factory/control-flow-azure-function-activity

    Between, you already mentioned that Your Azure function is "Service Bus Topic" trigger, that means When ever you receive data on your topic then automatically your Azure Function may be running. So, I am unable to understand why you want to run it again using ADF pipeline? If you really want to run your same Azure function from ADF pipeline too explicitly then you can consider having HTTP trigger type on it and run it using Azure function activity or web activity.

    Hope this will help. Thank you.

    --------------------------------

    • Please accept an answer if correct. Original posters help the community find answers faster by identifying the correct answer. Here is how.
    • Want a reminder to come back and check responses? Here is how to subscribe to a notification.

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.