Source value error in reusable dataflow

Kleber Rebello 0 Reputation points
2025-04-07T11:20:27.8033333+00:00

Hi,

We have a REUSABLE ADF dataflow loading a (parameterized) Synapse table from a (also parameterized) parquet file.

It failed with a source value greater than the 8,000 maximum character size Synapse accommodates. Our source is Salesforce, which does not have a substring function we could resort to address this.

Do we have an option to have the dataflow simply truncate the source value by default ?

Please advise.

Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
11,625 questions
{count} votes

1 answer

Sort by: Most helpful
  1. AnnuKumari-MSFT 34,556 Reputation points Microsoft Employee Moderator
    2025-04-21T14:28:06.6333333+00:00

    HI Kleber Rebello ,

    Thankyou for using Microsoft Q&A platform and thanks for posting your query here.

    As per my understanding you are trying to truncate values stored in multiple columns from source and load into parquet files.

    As is mentioned by other members, there is no direct way to achieve this in dataflow. The best possible approach in dataflow is already shared above. Since your requirement is to have flexible approach to consider multiple columns for truncation without having knowledge about the datatype of the columns from multiple tables, I would suggest you to try custom code using python and run it using azure functions which can be triggered via ADF.

    Below is the sample code which would be better approach in your case to have dynamic solution. Kindly modify as per the source schema :

    import pandas as pd
    
    def truncate_columns(df, columns, max_length=8000):
        """
        Truncate values in specified columns to a maximum length.
        
        Parameters:
        df (pd.DataFrame): DataFrame containing the data.
        columns (list): List of column names to be truncated.
        max_length (int): Maximum length of the values. Default is 8000.
        
        Returns:
        pd.DataFrame: DataFrame with truncated values.
        """
        for column in columns:
            if column in df.columns:
                df[column] = df[column].apply(lambda x: x[:max_length] if isinstance(x, str) and len(x) > max_length else x)
        return df
    
    def process_files(input_files, output_files, columns_to_truncate):
        """
        Process multiple files to truncate values in specified columns.
        
        Parameters:
        input_files (list): List of input file paths.
        output_files (list): List of output file paths.
        columns_to_truncate (list): List of column names to be truncated.
        """
        for input_file, output_file in zip(input_files, output_files):
            # Read the data from the source file
            df = pd.read_parquet(input_file)
            
            # Truncate values in specified columns
            df = truncate_columns(df, columns_to_truncate)
            
            # Write the processed data back to a new file
            df.to_parquet(output_file)
    
    # Example usage
    input_files = ['source_file1.parquet', 'source_file2.parquet']
    output_files = ['processed_file1.parquet', 'processed_file2.parquet']
    columns_to_truncate = ['column1', 'column2', 'column3']
    
    process_files(input_files, output_files, columns_to_truncate)
    
    

    You can also adapt this script to run as an Azure Function, allowing you to process data dynamically based on HTTP requests.

    import logging
    import azure.functions as func
    import pandas as pd
    
    def truncate_columns(df, columns, max_length=8000):
        for column in columns:
            if column in df.columns:
                df[column] = df[column].apply(lambda x: x[:max_length] if isinstance(x, str) and len(x) > max_length else x)
        return df
    
    def main(req: func.HttpRequest) -> func.HttpResponse:
        logging.info('Python HTTP trigger function processed a request.')
    
        # Read the data from the request
        data = req.get_json()
        df = pd.DataFrame(data)
        columns_to_truncate = req.params.get('columns').split(',')
    
        # Truncate values in specified columns
        df = truncate_columns(df, columns_to_truncate)
    
        # Convert the DataFrame back to JSON
        processed_data = df.to_json(orient='records')
    
        return func.HttpResponse(processed_data, mimetype='application/json')
    
    
    

    Hope it helps. Kindly accept the answer by clicking on Accept answer button. Thankyou


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.