How to improve unpacking .tgz in ADF copy activities? (online 60 min vs. local machine 1 min)

Jannik Luxenburger 5 Reputation points
2023-02-27T10:23:27.3733333+00:00

Hi everyone,

we have a bunch of historical data stored in .tgz tar balls. We copy these .tgz files from a source to our data lake gen 2 (landing) and want to unpack them into our conformance layer.

Within the .tgz are very big JSON documents, but in a first step, we just want to decompress them

So: Copy activity from ADLS Gen2 to the same ADLS Gen2, binary --> binary

This copy activity achieves the desired result, but takes roughly 60 minutes to complete, scaling the DIU or using parallel copy did not affect the performance.

If we download the .tgz to a normal laptop and decompress it with the usual tools (e.g. 7zip) it takes only 1-2 minutes.

We need to decompress thousands of these .tgz balls, so investing 1 hour of compute for the full history of data is not a feasible option. If anybody has any idea on how to achieve similar performance in the cloud as we can achieve on a normal laptop, it would be highly appreciated.

Relevant parts of the configuration below:

"typeProperties": {
                                "source": {
                                    "type": "BinarySource",
                                    "storeSettings": {
                                        "type": "AzureBlobFSReadSettings",
                                        "recursive": true,
                                        "wildcardFileName": {
                                            "value": "@item().name",
                                            "type": "Expression"
                                        },
                                        "deleteFilesAfterCompletion": false
                                    },
                                    "formatSettings": {
                                        "type": "BinaryReadSettings",
                                        "compressionProperties": {
                                            "type": "TarGZipReadSettings"
                                        }
                                    }
                                },
                                "sink": {
                                    "type": "BinarySink",
                                    "storeSettings": {
                                        "type": "AzureBlobFSWriteSettings"
                                    }
                                },
                                "enableStaging": false,
                            }
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
2,878 questions
Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
6,715 questions
No comments
{count} votes

1 answer

Sort by: Most helpful
  1. Jannik Luxenburger 5 Reputation points
    2023-02-27T13:10:42.0733333+00:00

    I reply to my own question after a consultation of Microsoft Support.

    A small correction with regards to the contents: it's not one very large, but tens of thousands of small files. This is also where the performance issues arose. The bottleneck was the IO on the storage account as too many files have been written in parallel.

    The resolution for us is to step away from a copy activity and to use a small scala script instead:
    Batching the mini JSON documents into .jsonl documents with a batch size of 1000 lines resulted in a reduction of execution time from 60 minutes to 20 seconds.

    Reading .jsonl in Spark is the same syntax as reading .json.

    
    import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
    import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
    import java.io.BufferedInputStream
    import java.net.URIimport org.apache.hadoop.fs.{FileSystem, Path}
    import org.apache.commons.io.IOUtils
    
    val inputPath = "abfss://container@XYZ.dfs.core.windows.net/input/dump_From:2023-02-20_To:2023-02-21_On:2023-02-21_01-00-01.tgz"
    val outputPath = "abfss://container@XYZ.dfs.core.windows.net/Output/tar_files/dump/"
    val fileSystemClient = FileSystem.get(new URI("abfss://container@XYZ.dfs.core.windows.net/"), sc.hadoopConfiguration)
    val inputStream = new BufferedInputStream(fileSystemClient.open(new Path(inputPath)))
    
    // unzip the file
    val gzInput = new GzipCompressorInputStream(inputStream)
    val tarInput = new TarArchiveInputStream(gzInput)
    
    // Initialize the iterator with first entry of the tar archives
    var entry = tarInput.getNextEntry
    
    // Initialize varibales for batching
    var entryCounter = 0
    var batchCounter = 1
    var batchSize = 1000
    
    // create first output file
    var out = fileSystemClient.create(new Path(outputPath +  "batch_" + batchCounter +".jsonl"))
    
    while (entry != null) { 
      // Add entry to the current batch file if not empty
      if (!entry.isDirectory && entry.getSize > 0) {
        IOUtils.copy(tarInput, out)
        entryCounter += 1
      }
      if (entryCounter >= batchSize){
        out.close()
        batchCounter += 1
        entryCounter = 0
        out = fileSystemClient.create(new Path(outputPath +  "batch_" + batchCounter +".jsonl"))
      }
      entry = tarInput.getNextEntry
    }
    
    // close last, potentially incomplete batch file
    out.close()
    
    tarInput.close()
    gzInput.close()
    inputStream.close()
    
    1 person found this answer helpful.