I am trying out a simple step to aggregate data from multiple through an ML pipeline in Azure ML workspace. We have multiple part files and I am trying to create one data file from it. I am mounting the azure datastore path for read-write and writing data to it directly. The files get created. But, they are corrupted and I am not able to preview them from Azure ML datastore. Here, is the relevant code for your reference.
Step-definition:
prepare_data_step = parallel_run_function(
name="batch_prepare_data",
display_name="Prepares Data (Batch)",
description="Prepares data for training by combining customer data from individual files.",
experiment_name=experiment_name,
inputs={
"customer_id": Input(
type="string",
description="Customer ID for the data preparation task",),
"input_data": Input(
type=AssetTypes.URI_FOLDER,
mode=InputOutputModes.RO_MOUNT,
description="Input data folder containing customer files"),
},
outputs={
"train_data": Output(
type=AssetTypes.URI_FOLDER,
path=output_data_folder,
mode=InputOutputModes.RW_MOUNT),
"prepare_data_logs_output_path": Output(
type=AssetTypes.URI_FOLDER,
path=output_logs_path,
mode=InputOutputModes.RW_MOUNT),
},
compute=compute_name,
instance_count=5,
mini_batch_size="100",
max_concurrency_per_instance=2,
mini_batch_error_threshold=-1,
retry_settings=dict(max_retries=3, timeout=300),
input_data="${{inputs.input_data}}",
task=RunFunction(
code="./",
entry_script="prepare_data.py",
environment=environment_id,
program_arguments="--customer_id ${{inputs.customer_id}}"
" --output_data_folder ${{outputs.train_data}}"
" --mini_batch_timeout 600"
" --logging_level DEBUG"
" --resource_monitor_interval 300"
" --copy_logs_to_parent True",
append_row_to="${{outputs.prepare_data_logs_output_path}}"
)
)
Prepare_data.py
def run(mini_batch):
combined_df_list = []
for client_file_path in mini_batch:
combined_df_list.append(pd.read_parquet(client_file_path))
client_df = pd.concat(combined_df_list, ignore_index=True)
global output_data_folder
training_data_path = os.path.join(output_data_folder, f"dataset_{uuid.uuid4()}.snappy.parquet")
client_df.to_parquet(training_data_path, index=False, engine='pyarrow', compression='snappy')
The training_data_path
is the place I want it to persist. But, after the job is complete, when I check the datastore. The files get created. But, they are very small and I get ParquetException, when trying to open them/preview them.