Azure Function fails suddenly

Jona 455 Reputation points
2024-07-11T02:50:12.41+00:00

Hi,

I have a function called splitter with a blob trigger with even grid as a source. The only goal of this functions is to grab a blob and partition it in several pieces, so that it can be feed into an Event Hub, which has a publication limit of 1MB

This function is part of several functions to form an orchestation to inject messages to Event Hub. The files we receive have a variety of size, 100MB, 50MB, 10KB, 5KB, etc ...

So, when deployed and running on Azure, I realized that the splitter function stop executing suddenly when I stress the orchestation by uploading up to 100 files. Among those files, the 100MB or 50MB files are uploaded

Locally, my integration tests run just fine. When running on Azure, the splitter function log some messages and suddenly stops executing.

log7

This is the upper fragment of the function splitter:

def splitter_fn(blob:func.InputStream, context:func.Context):

    blob_content = blob.read()
    size_mb = round(len(blob_content) / (1024 * 1024), 2)
    logger.info(f'Downloaded blob | {blob.name} | {size_mb:,} MB', extra={
        'custom_dimensions' : {
            'experiment' : EXPERIMENT_NAME,
            'blob' : blob.name,
            'size_mb' : size_mb  
        }         
    })

    try:
        allowed_file_size_mb = float(EnvLoader.get_value('ALLOWED_FILE_SIZE_MB'))
        logger.info(f'ALLOWED_FILE_SIZE_MB set to {allowed_file_size_mb} MB')

        # We get the current metadata
        blob_service = BlobService()
        metadata = blob_service.get_blob_metadata(container='stage', blob_key=blob.name.split('/')[-1])
        partitioning = metadata['Partitioning']
        original_blob = metadata['OriginalBlob']

        if partitioning == 'Extra':
            q_files = 2
        elif partitioning == 'Normal':
            q_files = math.ceil(size_mb / allowed_file_size_mb) + 1
        else:
            raise ValueError(f'PartitionType \"{partitioning}\" is not allowed')       
        
        logger.info(f'{q_files} files will be created after splitting | PartitionType: {partitioning}', extra={
            'custom_dimensions' : { 
                'experiment' : EXPERIMENT_NAME,
                'q_files' : q_files,
                'partitioning' : partitioning  
            }           
        })

        #df = pandas.read_csv(StringIO(str(blob_content ,'utf-8')), sep='|', quotechar='`')
        df = pandas.read_csv(StringIO(blob_content.decode('utf-8')), sep='|', quotechar='`', quoting=csv.QUOTE_ALL)
        dfs = numpy.array_split(df, q_files)

        blob_service = BlobService()
        logger.info(f'Creating partitions | {len(dfs)} dataframes')
        for i, _df in enumerate(dfs):

            bytes_df = bytes(_df.to_csv(lineterminator='\r\n', index=False, sep='|', quotechar='`', quoting=csv.QUOTE_ALL), encoding='utf-8')
            #bytes_df = bytes(_df.to_csv(lineterminator='\r\n', index=False, sep='|', quotechar='`', quoting=csv.QUOTE_ALL), encoding='utf-8')
            blob_key = f"{original_blob}__{context.invocation_id}_partition_{i+1}.csv"
            metadata = blob_service.upload_file(
                data=bytes_df, 
                blob_key=blob_key, 
                container='landing',
                metadata={
                    'Source' : 'Splitter',
                    'OriginalBlob' : original_blob,
                    'PartitionNumber' : str(i+1)
                }
            )

            size_mb = round(len(bytes_df) / (1024 * 1024), 2)
            logger.info(f'Uploaded file | landing/{blob_key} | Size: {size_mb:,} MB | Rows: {_df.shape[0]:,}', extra={
                'custom_dimensions' : {
                    'experiment' : EXPERIMENT_NAME,
                    'blob' : f'landing/{blob_key}',
                    'size_mb' : size_mb,
                    'rows' : _df.shape[0],
                    'partition_number' : i+1  
                }              
            })

        blob_service.close()
    except KeyError as ex:
        logger.exception(f'{type(ex)}: {ex}')
        logger.exception(traceback.format_exc())

    except Exception as ex:
        logger.exception(f'{type(ex)}: {ex}')
        logger.exception(traceback.format_exc())

So, I went to Log Analytics to figure out what happened. This

log8 log9 log10 The error is unknown to me, since it never happened locally. In others runs on Azure, the function also stops suddenly, expressing the error shown here (System.Threading.Tasks.TaskCanceledException).

I would appeciate any help, since I don't know why this is happening. The only clue I have if that the situation only happends when I pass to the function files of 100MBor 70MB...

Regards

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
4,605 questions
{count} votes

1 answer

Sort by: Most helpful
  1. navba-MSFT 20,215 Reputation points Microsoft Employee
    2024-07-13T04:31:29.8366667+00:00

    @Jona Thanks for getting back.

    .

    As per your request, I have enabled a one-time free support ticket for your subscription-id 02f56XXX-XXXX-XXXX-XXXX-XXXX7ec3cb for quick and immediate assistance.
    .

    .

    Details on how to raise a service request below:  

    •            Go to the Health Advisory section within the Azure Portal: https://aka.ms/healthadvisories 

    •            Select the Issue Name "You have been enabled for one-time Free Technical Support"

    •            Details will populate below in the Summary Tab within the reading pane and you can click on the link "Create a Support Request" to the right of the message

    User's image

    Kindly let me know what your support request number is so that I can keep track of your case. If you run into any issues, feel free to let me know.

    After the ticket creation, Microsoft Support professional will get in touch with you and assist you further.

    Hope this helps.