Hi,
I have a function called splitter
with a blob trigger with even grid as a source. The only goal of this functions is to grab a blob and partition it in several pieces, so that it can be feed into an Event Hub, which has a publication limit of 1MB
This function is part of several functions to form an orchestation to inject messages to Event Hub. The files we receive have a variety of size, 100MB, 50MB, 10KB, 5KB, etc ...
So, when deployed and running on Azure, I realized that the splitter
function stop executing suddenly when I stress the orchestation by uploading up to 100 files. Among those files, the 100MB or 50MB files are uploaded
Locally, my integration tests run just fine. When running on Azure, the splitter
function log some messages and suddenly stops executing.
This is the upper fragment of the function splitter
:
def splitter_fn(blob:func.InputStream, context:func.Context):
blob_content = blob.read()
size_mb = round(len(blob_content) / (1024 * 1024), 2)
logger.info(f'Downloaded blob | {blob.name} | {size_mb:,} MB', extra={
'custom_dimensions' : {
'experiment' : EXPERIMENT_NAME,
'blob' : blob.name,
'size_mb' : size_mb
}
})
try:
allowed_file_size_mb = float(EnvLoader.get_value('ALLOWED_FILE_SIZE_MB'))
logger.info(f'ALLOWED_FILE_SIZE_MB set to {allowed_file_size_mb} MB')
# We get the current metadata
blob_service = BlobService()
metadata = blob_service.get_blob_metadata(container='stage', blob_key=blob.name.split('/')[-1])
partitioning = metadata['Partitioning']
original_blob = metadata['OriginalBlob']
if partitioning == 'Extra':
q_files = 2
elif partitioning == 'Normal':
q_files = math.ceil(size_mb / allowed_file_size_mb) + 1
else:
raise ValueError(f'PartitionType \"{partitioning}\" is not allowed')
logger.info(f'{q_files} files will be created after splitting | PartitionType: {partitioning}', extra={
'custom_dimensions' : {
'experiment' : EXPERIMENT_NAME,
'q_files' : q_files,
'partitioning' : partitioning
}
})
#df = pandas.read_csv(StringIO(str(blob_content ,'utf-8')), sep='|', quotechar='`')
df = pandas.read_csv(StringIO(blob_content.decode('utf-8')), sep='|', quotechar='`', quoting=csv.QUOTE_ALL)
dfs = numpy.array_split(df, q_files)
blob_service = BlobService()
logger.info(f'Creating partitions | {len(dfs)} dataframes')
for i, _df in enumerate(dfs):
bytes_df = bytes(_df.to_csv(lineterminator='\r\n', index=False, sep='|', quotechar='`', quoting=csv.QUOTE_ALL), encoding='utf-8')
#bytes_df = bytes(_df.to_csv(lineterminator='\r\n', index=False, sep='|', quotechar='`', quoting=csv.QUOTE_ALL), encoding='utf-8')
blob_key = f"{original_blob}__{context.invocation_id}_partition_{i+1}.csv"
metadata = blob_service.upload_file(
data=bytes_df,
blob_key=blob_key,
container='landing',
metadata={
'Source' : 'Splitter',
'OriginalBlob' : original_blob,
'PartitionNumber' : str(i+1)
}
)
size_mb = round(len(bytes_df) / (1024 * 1024), 2)
logger.info(f'Uploaded file | landing/{blob_key} | Size: {size_mb:,} MB | Rows: {_df.shape[0]:,}', extra={
'custom_dimensions' : {
'experiment' : EXPERIMENT_NAME,
'blob' : f'landing/{blob_key}',
'size_mb' : size_mb,
'rows' : _df.shape[0],
'partition_number' : i+1
}
})
blob_service.close()
except KeyError as ex:
logger.exception(f'{type(ex)}: {ex}')
logger.exception(traceback.format_exc())
except Exception as ex:
logger.exception(f'{type(ex)}: {ex}')
logger.exception(traceback.format_exc())
So, I went to Log Analytics to figure out what happened. This
The error is unknown to me, since it never happened locally. In others runs on Azure, the function also stops suddenly, expressing the error shown here (System.Threading.Tasks.TaskCanceledException).
I would appeciate any help, since I don't know why this is happening. The only clue I have if that the situation only happends when I pass to the function files of 100MBor 70MB...
Regards