inferschema error on parquet files, mergeSchema is time consuming

P, John 240 Reputation points
2024-02-21T00:57:05.95+00:00

I stored the parequet files in ADLS Gen2. Most time I read the file with no problem, but some time the collected parquet files triggered the "can not inferSchema" errors. I googled the error, most likely it was caused by an empty parquet files. One suggested fix is to turn on the option "mergeSchema" , like:

parquetFile = spark.read.option("ignoreCorruptedFiles", "true").option("mergeSchema","true").parquet(*fileForModels)

The option fixed the error, but the runtime is substantially longer (tripled or more). My questions are:

  1. how to find the problematic parquet files in the ADLS Gen2? I checked each directory, the only zero size file is "_SUCCESS", but it was present in each directory, and I don't think it is the problem
  2. Is there any better fix than "mergeSchema" option?
Azure Data Lake Storage
Azure Data Lake Storage
An Azure service that provides an enterprise-wide hyper-scale repository for big data analytic workloads and is integrated with Azure Blob Storage.
1,547 questions
0 comments No comments
{count} votes

Accepted answer
  1. Anand Prakash Yadav 7,840 Reputation points Microsoft External Staff
    2024-02-22T11:01:16.87+00:00

    Hello Pan, John,

    Thank you for posting your query here!

    Can you please check the following code to identify corrupt Parquet Files:

    import os
    from azure.storage.filedatalake import DataLakeServiceClient
    import pyarrow.parquet as pq
    from io import BytesIO
    
    
    def connect_to_adls(account_name, account_key):
        try:
            service_client = DataLakeServiceClient(account_url=f"https://{account_name}.dfs.core.windows.net",
                                                   credential=account_key)
            return service_client
        except Exception as e:
            print(f"Error connecting to ADLS: {e}")
            return None
    
    
    def list_parquet_files(file_system_client):
        parquet_files = []
        paths = file_system_client.get_paths()
        for path in paths:
            if path.name.endswith('.parquet'):
                parquet_files.append(path.name)
        return parquet_files
    
    
    def validate_parquet_files(file_system_client, parquet_files):
        corrupt_files = []
        for file in parquet_files:
            file_client = file_system_client.get_file_client(file)
            file_contents = file_client.download_file().readall()
            try:
                table = pq.read_table(BytesIO(file_contents))
            except Exception as e:
                print(f"Error reading file {file}: {e}")
                corrupt_files.append(file)
        return corrupt_files
    
    
    account_name = "<your_account_name>"
    account_key = "<your_account_key>"
    file_system_name = "<your_file_system_name>"
    
    service_client = connect_to_adls(account_name, account_key)
    
    if service_client:
        file_system_client = service_client.get_file_system_client(file_system=file_system_name)
        parquet_files = list_parquet_files(file_system_client)
        corrupt_files = validate_parquet_files(file_system_client, parquet_files)
        if corrupt_files:
            print("Corrupt files:")
            for file in corrupt_files:
                print(file)
        else:
            print("No corrupt files found.")
    
    

    The mergeSchema option in Spark is used when reading Parquet files from multiple directories or with evolving schemas. When .option("mergeSchema", "true") is set, Spark dynamically infers the schema from all files being read and merges them into a single schema which can impact performance due to the additional overhead of schema inference and merging.

    You can explicitly specify the schema when reading Parquet files. Alternatively, you can consider Delta Lake tables for Parquet, in that case, schema is stored in the Delta log, and could be fetched much faster.

    Do let us know if you have any further queries. I’m happy to assist you further.

    Please do not forget to "Accept the answer” and “up-vote” wherever the information provided helps you, this can be beneficial to other community members.

    1 person found this answer helpful.
    0 comments No comments

0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.