Using pySpark in Azure Synapse read files from non data lake storage account

Paul Beare 25 Reputation points
2023-05-01T21:14:39.3233333+00:00

Hi, I have a storage account where the logs of websites go. This is a Gen 2 storage account but NOT a Data Lake. I want to read the log files from this account from our Synapse Workspace using pySpark or any of the other languages to the filter/process the data before loading the results into a data lake attached to the workspace. The use serverless to make the data available to Power BI report(s). I have managed to get the files in via a pipeline, however this generated a lot of data movement expense. Is there a better way of doing this?

Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
4,997 questions
{count} votes

Accepted answer
  1. FERGUS ESSO KETCHA ASSAM 120 Reputation points Student Ambassador
    2023-05-09T09:36:29.9133333+00:00

    If you want to just read log files from a Gen 2 storage account in Synapse Workspace using PySpark and save to a DLS Account, here is an example of a code snippet

    # Import required modules
    from pyspark.sql.functions import col
    # Define storage account credentials
    storage_account_name = '<storage_account_name>'
    storage_account_key = '<storage_account_key>'
    container_name = '<container_name>'
    folder_path = '<folder_path>'
    # Create PySpark DataFrame from log files
    df_logs = spark.read.format('csv').option('header', True).option('inferSchema', True).load(f'wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{folder_path}')
    # Filter and process the data
    df_filtered = df_logs.filter(col('status') == 200).groupBy(col('url')).count().orderBy(col('count').desc())
    # Write the filtered data to data lake
    df_filtered.write.format('parquet').mode('overwrite').option('compression', 'snappy').save('abfss://<data_lake_name>.dfs.core.windows.net/<data_lake_folder>')
    

    we are reading the log files from the specified folder path in the container of the storage account using the spark.read method. We are then filtering the data based on a condition and grouping and counting the results. Finally, we are writing the filtered data to a data lake attached to the Synapse Workspace using the df_filtered.write method.

    Hope this helps

    0 comments No comments

1 additional answer

Sort by: Most helpful
  1. FERGUS ESSO KETCHA ASSAM 120 Reputation points Student Ambassador
    2023-05-09T09:33:39.5+00:00

    To list all blobs and subdirectories in a given storage account, you can use the Azure Storage SDK for Python to enumerate the containers and blobs in the storage account. Here's an example code snippet:

    from azure.storage.blob import BlobServiceClient
    
    # Create a BlobServiceClient object
    conn_str = "DefaultEndpointsProtocol=https;AccountName=<your_account_name>;AccountKey=<your_account_key>;EndpointSuffix=core.windows.net"
    blob_service_client = BlobServiceClient.from_connection_string(conn_str)
    
    # List all containers in the storage account
    containers = blob_service_client.list_containers()
    
    # Loop through all containers
    for container in containers:
        print("Container name: " + container.name)
    
        # List all blobs and subdirectories in the container
        blobs = blob_service_client.list_blobs(container.name)
    
        # Loop through all blobs
        for blob in blobs:
            print("Blob name: " + blob.name)
    
    

    To filter and write the blobs to another storage account, you can use the following steps:

    1. Create a PySpark DataFrame that reads the blobs from the source storage account using the BinaryFiles method.
    2. Use the filter method to select the blobs that match the filter criteria.
    3. Write the filtered blobs to the target storage account using the write method.
    from pyspark.sql.functions import col
    from pyspark.sql.types import StructType, StructField, BinaryType, StringType
    
    # Define the schema of the DataFrame
    schema = StructType([
        StructField("path", StringType(), True),
        StructField("content", BinaryType(), True)
    ])
    
    # Read the blobs from the source storage account
    df = spark.read.format("binaryFile").option("recursiveFileLookup", "true").schema(schema).load("wasbs://<source_container>@<source_account>.blob.core.windows.net")
    
    # Filter the blobs based on the file name
    filtered_df = df.filter(col("path").contains("example"))
    
    # Write the filtered blobs to the target storage account
    filtered_df.write.format("binaryFile").option("mode", "overwrite").option("recursiveFileLookup", "true").save("wasbs://<target_container>@<target_account>.blob.core.windows.net")
    
    
    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.