Azure Synapse pyspark performance/parallel

Paul Beare 25 Reputation points
2023-06-23T20:57:06.26+00:00

Background
I need to process IIS logs, the original logs are standard IISLogs (log files) which are in Customer/Year/Month/Day/Number/*.log directory format. These files are already in my DataLake.
I want want to firstly each day what to change the daily files into parquet files, grouping them to a file per customer for the day. These files will then be further filtered and subset files created for Power BI reporting.

The Issue
I have written the below script using PySpark running in a spark pool (Small (4 vCores / 32 GB) - 3 to 18 nodes). I have one notebook to loop though each customer, triggering another notebook to do the actual work.

The aim was to run these in parallel to reduce the running time. - When in one notebook (basically the first notebook below) it takes about 2-3 hours to run and it is not much shorter when run as 2 notebooks.

The first 2 lines of the log files need to be removed as the format is not great, so I re-create the files.

We have around 41 customers, with at least 2 webApp's per customer so 88 items. For each day there could be around 16k log files.

Help Needed
How to shorten the processing time, our previous U-SQL process seems to be able to do this same thing in around 10 minutes?

Help with parallel process or speeding up the code. I don't think that using a medium or large spark pool would be needed as the memory of the pool is barely used.

Or is there a better way to do this?

Trigger Notebook

# Trigger Note book
#Notebook Parameterss
YearNumber = "2023"
MonthNumber = "06"
DayNumber = "20"

from notebookutils import mssparkutils
from pyspark.sql.functions import *

localReferenceStorageContainers_path = 'abfss://<value>@<value>.dfs.core.windows.net/ReferenceDB/Customers.parquet'
customers = spark.read.parquet(localReferenceStorageContainers_path)
customers = customers.filter(col("Active") == "1").orderBy(upper(col("CustomerName")))

customerList = []
for customer in customers.collect():
    customerList.append(customer.CustomerName)

def processNotebook(customerName):
     mssparkutils.notebook.run("IIS Files_ForDay", 3600, { "CustomerName": customerName, "YearNumber": YearNumber, "MonthNumber": MonthNumber, "DayNumber": DayNumber })

from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)

pool.map(processNotebook, customerList)

Processing Notebook

#IIS Files_ForDay
#Notebook Parameters
CustomerName = ""
YearNumber = ""
MonthNumber = ""
DayNumber = ""

from notebookutils import mssparkutils
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.utils import ParseException

# Create a Spark Session
spark = SparkSession.builder.appName(f"AzureBloblistToParquet").getOrCreate()

# Create Schemas
customerSchema = StructType() \
      .add("CustomerId",StringType(),True) \
      .add("CustomerName",StringType(),True) \
      .add("WebAppId",StringType(),True) \
      .add("WebApp",StringType(),True) \
      .add("StorageAccountName",StringType(),True) \
      .add("ContainerName",StringType(),True)

outputBlobSchema = StructType() \
      .add("customername", StringType(), True) \
      .add("webappname", StringType(), True) \
      .add("yearnumber", StringType(), True) \
      .add("monthnumber", StringType(), True) \
      .add("daynumber", StringType(), True) \
      .add("date",StringType(),True) \
      .add("time",StringType(),True) \
      .add("s_sitename",StringType(),True) \
      .add("cs_method",StringType(),True) \
      .add("cs_uri_stem",StringType(),True) \
      .add("cs_uri_query",StringType(),True) \
      .add("s_port",StringType(),True) \
      .add("cs_username",StringType(),True) \
      .add("c_ip",StringType(),True) \
      .add("cs_user_agent",StringType(),True) \
      .add("cs_cookie",StringType(),True) \
      .add("cs_referer",StringType(),True) \
      .add("cs_host",StringType(),True) \
      .add("sc_status",StringType(),True) \
      .add("sc_substatus",StringType(),True) \
      .add("sc_win32_status",StringType(),True) \
      .add("sc_bytes",StringType(),True) \
      .add("cs_bytes",StringType(),True) \
      .add("time_taken",StringType(),True) \
      .add("is_api",StringType(),True) \
      .add("logdate",StringType(),True) \
      .add("customer",StringType(),True) \
      .add("year",StringType(),True) \
      .add("month",StringType(),True) \
      .add("day",StringType(),True) \
      .add("hour",StringType(),True) \
      .add("minutes",StringType(),True) \
      .add("seconds",StringType(),True)

schema = StructType([
    StructField('Path', StringType(), True)
])

# Create empty DataFrames
emptyRDD = spark.sparkContext.emptyRDD()
customers_to_process = spark.createDataFrame(emptyRDD, schema= customerSchema)

localReferenceStorageContainers_path = 'abfss://<value>@<value>.dfs.core.windows.net/ReferenceDB/CustomersStructureForProcessing.parquet'
customers_to_process = spark.read.parquet(localReferenceStorageContainers_path)
customers_to_process = customers_to_process.filter(col("CustomerName") == CustomerName)

storageAccounts = customers_to_process.dropDuplicates(["StorageAccountName"]) \
    .select("StorageAccountName")

def get_file_list(ls_path):
    for dir_path in mssparkutils.fs.ls(ls_path):
        if dir_path.isFile:
            yield dir_path.path
        elif dir_path.isDir and ls_path != dir_path.path:
            yield from get_file_list(dir_path.path)

lfs = []

data_lake_externalReports = "abfss://<value>@<value>.dfs.core.windows.net/ExternalImports/IISLogs"

filesForProcessingDF = spark.createDataFrame(emptyRDD, schema)
# for each strage account get all the files for the customer that needs to be processed.
for accounts in storageAccounts.collect():
    for customerWebApp in customers_to_process.filter(col("StorageAccountName") == f"{accounts.StorageAccountName}").collect():
        data_lake_externalReportsPath = f"{data_lake_externalReports}/{accounts.StorageAccountName}/{customerWebApp.ContainerName}/{customerWebApp.WebApp}/{YearNumber}/{MonthNumber}/{DayNumber}/"
        try:
            for i in get_file_list(data_lake_externalReportsPath):
                lfs.append(i)
        except:
            # Do nothing      
            1 == 1

filesForProcessingDF = spark.createDataFrame(lfs, StringType())
filesForProcessingDF = filesForProcessingDF.repartition(2)

# Calculate if the call is from an API
isAPIColumn = substring(lower(col("cs_uri_stem")), 1, 5) == "/api/"

def process_files(file, customerName, webApp, yearNumber, monthNumber, dayNumber):
    localdf = spark.createDataFrame(emptyRDD, schema = outputBlobSchema)
    
    try:
        # Load File into temp dataFrame
        tempDf = spark.read.format("text").load(file)
        # Remove the first 2 lines of the file
        tempDf = sqlContext.createDataFrame(tempDf.tail(tempDf.count()-2))

        # set the columns
        tempDf = tempDf.withColumn('customername', lit(customerName))
        tempDf = tempDf.withColumn('webappname', lit(webApp))
        tempDf = tempDf.withColumn('yearnumber', lit(yearNumber))
        tempDf = tempDf.withColumn('monthnumber', lit(monthNumber))
        tempDf = tempDf.withColumn('daynumber', lit(dayNumber))
        tempDf = tempDf.withColumn('date', split(tempDf[0], ' ').getItem(0))
        tempDf = tempDf.withColumn('time', split(tempDf[0], ' ').getItem(1))
        tempDf = tempDf.withColumn('s_sitename', split(tempDf[0], ' ').getItem(2))
        tempDf = tempDf.withColumn('cs_method', split(tempDf[0], ' ').getItem(3))
        tempDf = tempDf.withColumn('cs_uri_stem', split(tempDf[0], ' ').getItem(4))
        tempDf = tempDf.withColumn('cs_uri_query', split(tempDf[0], ' ').getItem(5))
        tempDf = tempDf.withColumn('s_port', split(tempDf[0], ' ').getItem(6))
        tempDf = tempDf.withColumn('cs_username', split(tempDf[0], ' ').getItem(7))
        tempDf = tempDf.withColumn('c_ip', split(tempDf[0], ' ').getItem(8))
        tempDf = tempDf.withColumn('cs_user_agent', split(tempDf[0], ' ').getItem(9))
        tempDf = tempDf.withColumn('cs_cookie', split(tempDf[0], ' ').getItem(10))
        tempDf = tempDf.withColumn('cs_referer', split(tempDf[0], ' ').getItem(11))
        tempDf = tempDf.withColumn('cs_host', split(tempDf[0], ' ').getItem(12))
        tempDf = tempDf.withColumn('sc_status', split(tempDf[0], ' ').getItem(13))
        tempDf = tempDf.withColumn('sc_substatus', split(tempDf[0], ' ').getItem(14))
        tempDf = tempDf.withColumn('sc_win32_status', split(tempDf[0], ' ').getItem(15))
        tempDf = tempDf.withColumn('sc_bytes', split(tempDf[0], ' ').getItem(16))
        tempDf = tempDf.withColumn('cs_bytes', split(tempDf[0], ' ').getItem(17))
        tempDf = tempDf.withColumn('time_taken', split(tempDf[0], ' ').getItem(18))

        # remove the full undelimited column
        tempDf = tempDf.drop(tempDf[0])
        
        # add additional columns
        tempDf = tempDf.withColumn('is_api', isAPIColumn)
        tempDf = tempDf.withColumn("logdate", col("date"))
        tempDf = tempDf.withColumn("customer", col("customername"))
        tempDf = tempDf.withColumn("year", col("yearnumber"))
        tempDf = tempDf.withColumn("month", col("monthnumber"))
        tempDf = tempDf.withColumn("day", col("daynumber"))
        tempDf = tempDf.withColumn("hour", hour(col("time")))
        tempDf = tempDf.withColumn("minutes", minute(col("time")))
        tempDf = tempDf.withColumn("seconds", second(col("time")))

        localdf = spark.createDataFrame(tempDf.rdd, outputBlobSchema)
    except:
        1 == 1
    return localdf

filesDF = filesForProcessingDF.dropDuplicates(["value"]).orderBy("value").repartition(24)
# Create an empty dataset
processed_data = spark.createDataFrame(emptyRDD, outputBlobSchema)
# for each storage account
for row in customers_to_process.collect():
    containerName = row["ContainerName"]
    customerName = row["CustomerName"]
    webApp = row["WebApp"]

    for file in filesDF.filter(col("value").contains(containerName)).filter(col("value").contains(webApp)).collect():
        filePath = file.value
        processed_data = processed_data.union(process_files(filePath, customerName, webApp, YearNumber, MonthNumber, DayNumber))

data_lake_all_raw_path = "abfss://<value>@<value>.dfs.core.windows.net/IISLogs/raw/"

try:
    rawfilePath = data_lake_all_raw_path + f"{CustomerName}"
    
    processed_data.orderBy("webappname", "date", "time").repartition(col("daynumber"), col("hour")) \
        .coalesce(1) \
        .write \
        .partitionBy("date")\
        .option("header", True) \
        .option("compression", "snappy") \
        .mode("append") \
        .parquet(rawfilePath)
except:
    print(rawfilePath)
    1==1

mssparkutils.notebook.exit("Success")

Thanks in advance

Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,373 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Vinodh247 34,661 Reputation points MVP Volunteer Moderator
    2023-06-24T11:46:55.7833333+00:00

    Hi,

    Thanks for reaching out to Microsoft Q&A.

    With "4 vCores / 32 GB) - 3 to 18 nodes" at max you will get 4vcores*18nodes=72 vCores available for processing. You will have to tweak your spark code with queue and worker count to utilize the parallel processing. See the link below for clarity where it has been clearly discussed regarding parallel processing.

    from threading import Thread

    from queue import Queue

    q = Queue()

    worker_count = 2

    https://dustinvannoy.com/2022/05/06/parallel-ingest-spark-notebook/

    Please Upvote and Accept as answer if the reply was helpful, this will be benefitting the other community members who go through the same issue.


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.