How do I persist data in Azure ML datastore?

Nik Adlakha 0 Reputation points
2025-06-09T22:40:07.0366667+00:00

I am trying out a simple step to aggregate data from multiple through an ML pipeline in Azure ML workspace. We have multiple part files and I am trying to create one data file from it. I am mounting the azure datastore path for read-write and writing data to it directly. The files get created. But, they are corrupted and I am not able to preview them from Azure ML datastore. Here, is the relevant code for your reference.

Step-definition:


        prepare_data_step = parallel_run_function(
                                    name="batch_prepare_data",
                                    display_name="Prepares Data (Batch)",
                                    description="Prepares data for training by combining customer data from individual files.",
                                    experiment_name=experiment_name,
                                    inputs={
                                        "customer_id": Input( 
                                                            type="string",
                                                            description="Customer ID for the data preparation task",),
                                        "input_data": Input(
                                                            type=AssetTypes.URI_FOLDER, 
                                                            mode=InputOutputModes.RO_MOUNT,
                                                            description="Input data folder containing customer files"),
                                    },
                                    outputs={
                                        "train_data": Output(
                                                            type=AssetTypes.URI_FOLDER, 
                                                            path=output_data_folder,
                                                            mode=InputOutputModes.RW_MOUNT),
                                        "prepare_data_logs_output_path": Output(
                                                            type=AssetTypes.URI_FOLDER, 
                                                            path=output_logs_path, 
                                                            mode=InputOutputModes.RW_MOUNT),
                                    },
                                    compute=compute_name,
                                    instance_count=5,
                                    mini_batch_size="100",
                                    max_concurrency_per_instance=2,
                                    mini_batch_error_threshold=-1,
                                    retry_settings=dict(max_retries=3, timeout=300),
                                    input_data="${{inputs.input_data}}",
                                    task=RunFunction(
                                        code="./",
                                        entry_script="prepare_data.py",
                                        environment=environment_id,
                                        program_arguments="--customer_id ${{inputs.customer_id}}"
                                        " --output_data_folder ${{outputs.train_data}}"
                                        " --mini_batch_timeout 600"
                                        " --logging_level DEBUG"
                                        " --resource_monitor_interval 300"
                                        " --copy_logs_to_parent True",
                                        append_row_to="${{outputs.prepare_data_logs_output_path}}"
                                    )
                                )


Prepare_data.py


def run(mini_batch):
  
    combined_df_list = []
    for client_file_path in mini_batch:
        combined_df_list.append(pd.read_parquet(client_file_path))

    client_df = pd.concat(combined_df_list, ignore_index=True)

    global output_data_folder

    training_data_path = os.path.join(output_data_folder, f"dataset_{uuid.uuid4()}.snappy.parquet")

    client_df.to_parquet(training_data_path, index=False, engine='pyarrow', compression='snappy')

The training_data_path is the place I want it to persist. But, after the job is complete, when I check the datastore. The files get created. But, they are very small and I get ParquetException, when trying to open them/preview them.

Azure Machine Learning
Azure Machine Learning
An Azure machine learning service for building and deploying models.
3,334 questions
0 comments No comments
{count} votes

2 answers

Sort by: Most helpful
  1. Pavankumar Purilla 8,335 Reputation points Microsoft External Staff Moderator
    2025-06-10T05:26:41.49+00:00

    Hi Nik Adlakha,

    You're likely encountering corrupted Parquet files due to the use of RW_MOUNT mode in a parallel pipeline step in Azure ML. While RW_MOUNT provides read-write access to mounted storage, it does not guarantee safe concurrent writes. When multiple parallel processes attempt to write simultaneously to the same mounted directory, it can lead to file corruption, incomplete writes, or unreadable outputs. This often manifests as very small or empty .parquet files that cannot be opened.

    A more reliable solution is to switch the output mode to UPLOAD. In UPLOAD mode, each compute node writes its output to a local temporary directory, and Azure ML safely uploads the output files to the datastore after job completion. This approach avoids race conditions and ensures data integrity, especially in parallel or distributed execution scenarios.


  2. JAYA SHANKAR G S 4,035 Reputation points Microsoft External Staff Moderator
    2025-06-17T11:42:36.31+00:00

    Hello @Nik Adlakha ,

    You write it as mltable then you will be able to preview the output data.

    Below is the output configuration you need to modify.

    outputs={
          "train_data": Output(
                           type=AssetTypes.MLTABLE, 
                           path=output_data_folder,
                           mode=InputOutputModes.RW_MOUNT),
          "prepare_data_logs_output_path": Output(
                                              path=output_logs_path, 
                                              mode=InputOutputModes.RW_MOUNT),
                                        },
    
    

    Just change the type URI_FOLDER to MLTABLE, pass the same folder path to output_data_folder.

    Next, in Prepare_data.py you read parquet files using mltable and save it to the output_data_folder.

    Sample code to read parquet files

    import mltable
    
    paths = [
        {
            "pattern": "wasbs://******@azureopendatastorage.blob.core.windows.net/green/puYear=2015/puMonth=*/*.parquet"
        },
        {
            "pattern": "wasbs://******@azureopendatastorage.blob.core.windows.net/green/puYear=2016/puMonth=*/*.parquet"
        }
        {
            "pattern": "wasbs://******@azureopendatastorage.blob.core.windows.net/green/puYear=2017/puMonth=*/*.parquet"
        },
        {
            "pattern": "wasbs://******@azureopendatastorage.blob.core.windows.net/green/puYear=2018/puMonth=*/*.parquet"
        },
        {
            "pattern": "wasbs://******@azureopendatastorage.blob.core.windows.net/green/puYear=2019/puMonth=*/*.parquet"
        },
    ]
    
    # create a table from the parquet paths
    tbl = mltable.from_parquet_files(paths)
    
    #save to output path
    
    
    tbl.save(path=output_data_folder)
    
    
    

    Above code will create MLTable file in output_data_folder, now you can preview it, if you want in pandas dataframe further jobs you can load this mltable file and convert it like below.

    import mltable
    
    tbl = mltable.load(output_data_folder)
    tbl.show(5)
    
    df = tbl.to_pandas_dataframe()
    

    Here, you can pass 3 types of paths.

    1. file
    2. folder
    3. pattern

    I have used pattern for parquet files, you alter it accordingly.

    Output:

    User's image

    Click on output data asset and explore.

    User's image

    Try it and let me know if you have any query in comments

    Note: Make sure you have mltable==1.3.0 and azureml-dataprep[pandas]==4.10.6 libraries installed in environment of your job.

    Refer this documentation for more about using mltable.

    Thank you


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.