Background
I need to process IIS logs, the original logs are standard IISLogs (log files) which are in Customer/Year/Month/Day/Number/*.log directory format. These files are already in my DataLake.
I want want to firstly each day what to change the daily files into parquet files, grouping them to a file per customer for the day. These files will then be further filtered and subset files created for Power BI reporting.
The Issue
I have written the below script using PySpark running in a spark pool (Small (4 vCores / 32 GB) - 3 to 18 nodes). I have one notebook to loop though each customer, triggering another notebook to do the actual work.
The aim was to run these in parallel to reduce the running time. - When in one notebook (basically the first notebook below) it takes about 2-3 hours to run and it is not much shorter when run as 2 notebooks.
The first 2 lines of the log files need to be removed as the format is not great, so I re-create the files.
We have around 41 customers, with at least 2 webApp's per customer so 88 items. For each day there could be around 16k log files.
Help Needed
How to shorten the processing time, our previous U-SQL process seems to be able to do this same thing in around 10 minutes?
Help with parallel process or speeding up the code. I don't think that using a medium or large spark pool would be needed as the memory of the pool is barely used.
Or is there a better way to do this?
Trigger Notebook
# Trigger Note book
#Notebook Parameterss
YearNumber = "2023"
MonthNumber = "06"
DayNumber = "20"
from notebookutils import mssparkutils
from pyspark.sql.functions import *
localReferenceStorageContainers_path = 'abfss://<value>@<value>.dfs.core.windows.net/ReferenceDB/Customers.parquet'
customers = spark.read.parquet(localReferenceStorageContainers_path)
customers = customers.filter(col("Active") == "1").orderBy(upper(col("CustomerName")))
customerList = []
for customer in customers.collect():
customerList.append(customer.CustomerName)
def processNotebook(customerName):
mssparkutils.notebook.run("IIS Files_ForDay", 3600, { "CustomerName": customerName, "YearNumber": YearNumber, "MonthNumber": MonthNumber, "DayNumber": DayNumber })
from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)
pool.map(processNotebook, customerList)
Processing Notebook
#IIS Files_ForDay
#Notebook Parameters
CustomerName = ""
YearNumber = ""
MonthNumber = ""
DayNumber = ""
from notebookutils import mssparkutils
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.utils import ParseException
# Create a Spark Session
spark = SparkSession.builder.appName(f"AzureBloblistToParquet").getOrCreate()
# Create Schemas
customerSchema = StructType() \
.add("CustomerId",StringType(),True) \
.add("CustomerName",StringType(),True) \
.add("WebAppId",StringType(),True) \
.add("WebApp",StringType(),True) \
.add("StorageAccountName",StringType(),True) \
.add("ContainerName",StringType(),True)
outputBlobSchema = StructType() \
.add("customername", StringType(), True) \
.add("webappname", StringType(), True) \
.add("yearnumber", StringType(), True) \
.add("monthnumber", StringType(), True) \
.add("daynumber", StringType(), True) \
.add("date",StringType(),True) \
.add("time",StringType(),True) \
.add("s_sitename",StringType(),True) \
.add("cs_method",StringType(),True) \
.add("cs_uri_stem",StringType(),True) \
.add("cs_uri_query",StringType(),True) \
.add("s_port",StringType(),True) \
.add("cs_username",StringType(),True) \
.add("c_ip",StringType(),True) \
.add("cs_user_agent",StringType(),True) \
.add("cs_cookie",StringType(),True) \
.add("cs_referer",StringType(),True) \
.add("cs_host",StringType(),True) \
.add("sc_status",StringType(),True) \
.add("sc_substatus",StringType(),True) \
.add("sc_win32_status",StringType(),True) \
.add("sc_bytes",StringType(),True) \
.add("cs_bytes",StringType(),True) \
.add("time_taken",StringType(),True) \
.add("is_api",StringType(),True) \
.add("logdate",StringType(),True) \
.add("customer",StringType(),True) \
.add("year",StringType(),True) \
.add("month",StringType(),True) \
.add("day",StringType(),True) \
.add("hour",StringType(),True) \
.add("minutes",StringType(),True) \
.add("seconds",StringType(),True)
schema = StructType([
StructField('Path', StringType(), True)
])
# Create empty DataFrames
emptyRDD = spark.sparkContext.emptyRDD()
customers_to_process = spark.createDataFrame(emptyRDD, schema= customerSchema)
localReferenceStorageContainers_path = 'abfss://<value>@<value>.dfs.core.windows.net/ReferenceDB/CustomersStructureForProcessing.parquet'
customers_to_process = spark.read.parquet(localReferenceStorageContainers_path)
customers_to_process = customers_to_process.filter(col("CustomerName") == CustomerName)
storageAccounts = customers_to_process.dropDuplicates(["StorageAccountName"]) \
.select("StorageAccountName")
def get_file_list(ls_path):
for dir_path in mssparkutils.fs.ls(ls_path):
if dir_path.isFile:
yield dir_path.path
elif dir_path.isDir and ls_path != dir_path.path:
yield from get_file_list(dir_path.path)
lfs = []
data_lake_externalReports = "abfss://<value>@<value>.dfs.core.windows.net/ExternalImports/IISLogs"
filesForProcessingDF = spark.createDataFrame(emptyRDD, schema)
# for each strage account get all the files for the customer that needs to be processed.
for accounts in storageAccounts.collect():
for customerWebApp in customers_to_process.filter(col("StorageAccountName") == f"{accounts.StorageAccountName}").collect():
data_lake_externalReportsPath = f"{data_lake_externalReports}/{accounts.StorageAccountName}/{customerWebApp.ContainerName}/{customerWebApp.WebApp}/{YearNumber}/{MonthNumber}/{DayNumber}/"
try:
for i in get_file_list(data_lake_externalReportsPath):
lfs.append(i)
except:
# Do nothing
1 == 1
filesForProcessingDF = spark.createDataFrame(lfs, StringType())
filesForProcessingDF = filesForProcessingDF.repartition(2)
# Calculate if the call is from an API
isAPIColumn = substring(lower(col("cs_uri_stem")), 1, 5) == "/api/"
def process_files(file, customerName, webApp, yearNumber, monthNumber, dayNumber):
localdf = spark.createDataFrame(emptyRDD, schema = outputBlobSchema)
try:
# Load File into temp dataFrame
tempDf = spark.read.format("text").load(file)
# Remove the first 2 lines of the file
tempDf = sqlContext.createDataFrame(tempDf.tail(tempDf.count()-2))
# set the columns
tempDf = tempDf.withColumn('customername', lit(customerName))
tempDf = tempDf.withColumn('webappname', lit(webApp))
tempDf = tempDf.withColumn('yearnumber', lit(yearNumber))
tempDf = tempDf.withColumn('monthnumber', lit(monthNumber))
tempDf = tempDf.withColumn('daynumber', lit(dayNumber))
tempDf = tempDf.withColumn('date', split(tempDf[0], ' ').getItem(0))
tempDf = tempDf.withColumn('time', split(tempDf[0], ' ').getItem(1))
tempDf = tempDf.withColumn('s_sitename', split(tempDf[0], ' ').getItem(2))
tempDf = tempDf.withColumn('cs_method', split(tempDf[0], ' ').getItem(3))
tempDf = tempDf.withColumn('cs_uri_stem', split(tempDf[0], ' ').getItem(4))
tempDf = tempDf.withColumn('cs_uri_query', split(tempDf[0], ' ').getItem(5))
tempDf = tempDf.withColumn('s_port', split(tempDf[0], ' ').getItem(6))
tempDf = tempDf.withColumn('cs_username', split(tempDf[0], ' ').getItem(7))
tempDf = tempDf.withColumn('c_ip', split(tempDf[0], ' ').getItem(8))
tempDf = tempDf.withColumn('cs_user_agent', split(tempDf[0], ' ').getItem(9))
tempDf = tempDf.withColumn('cs_cookie', split(tempDf[0], ' ').getItem(10))
tempDf = tempDf.withColumn('cs_referer', split(tempDf[0], ' ').getItem(11))
tempDf = tempDf.withColumn('cs_host', split(tempDf[0], ' ').getItem(12))
tempDf = tempDf.withColumn('sc_status', split(tempDf[0], ' ').getItem(13))
tempDf = tempDf.withColumn('sc_substatus', split(tempDf[0], ' ').getItem(14))
tempDf = tempDf.withColumn('sc_win32_status', split(tempDf[0], ' ').getItem(15))
tempDf = tempDf.withColumn('sc_bytes', split(tempDf[0], ' ').getItem(16))
tempDf = tempDf.withColumn('cs_bytes', split(tempDf[0], ' ').getItem(17))
tempDf = tempDf.withColumn('time_taken', split(tempDf[0], ' ').getItem(18))
# remove the full undelimited column
tempDf = tempDf.drop(tempDf[0])
# add additional columns
tempDf = tempDf.withColumn('is_api', isAPIColumn)
tempDf = tempDf.withColumn("logdate", col("date"))
tempDf = tempDf.withColumn("customer", col("customername"))
tempDf = tempDf.withColumn("year", col("yearnumber"))
tempDf = tempDf.withColumn("month", col("monthnumber"))
tempDf = tempDf.withColumn("day", col("daynumber"))
tempDf = tempDf.withColumn("hour", hour(col("time")))
tempDf = tempDf.withColumn("minutes", minute(col("time")))
tempDf = tempDf.withColumn("seconds", second(col("time")))
localdf = spark.createDataFrame(tempDf.rdd, outputBlobSchema)
except:
1 == 1
return localdf
filesDF = filesForProcessingDF.dropDuplicates(["value"]).orderBy("value").repartition(24)
# Create an empty dataset
processed_data = spark.createDataFrame(emptyRDD, outputBlobSchema)
# for each storage account
for row in customers_to_process.collect():
containerName = row["ContainerName"]
customerName = row["CustomerName"]
webApp = row["WebApp"]
for file in filesDF.filter(col("value").contains(containerName)).filter(col("value").contains(webApp)).collect():
filePath = file.value
processed_data = processed_data.union(process_files(filePath, customerName, webApp, YearNumber, MonthNumber, DayNumber))
data_lake_all_raw_path = "abfss://<value>@<value>.dfs.core.windows.net/IISLogs/raw/"
try:
rawfilePath = data_lake_all_raw_path + f"{CustomerName}"
processed_data.orderBy("webappname", "date", "time").repartition(col("daynumber"), col("hour")) \
.coalesce(1) \
.write \
.partitionBy("date")\
.option("header", True) \
.option("compression", "snappy") \
.mode("append") \
.parquet(rawfilePath)
except:
print(rawfilePath)
1==1
mssparkutils.notebook.exit("Success")
Thanks in advance