Using spark.sql.execution.datasources.csv.VegasCSVFileFormat and encountering NullPointerException
I use Synapse Spark for some workloads that consume data from a CSV.
I keep encountering a java NullPointerException (NPE) when reading CSV files out of the storage account. This is in a synapse pool with default "intelligent cache" settings, and I am reading data from adls gen2 storage.
Below is a stack trace from the executor. The "WARN" severity level at the top is deceptive. It eventually causes the driver to crash.
2023-11-03 12:35:51,155 WARN
BlockManager [Executor task launch worker for task 0.0 in stage 43.0 (TID 52)]: Putting block rdd_132_0 failed due to exception java.io.IOException: NullPointerExceptionjava.lang.NullPointerException
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:121) at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:65) at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:97) at
>>>>>>>>> org.apache.spark.sql.execution.datasources.csv.VegasCSVFileFormat.$anonfun$buildReader$1(VegasCSVFileFormat.scala:423)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:150) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:135) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile (FileScanRDD.scala:162) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:224) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:137) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:763) at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:239) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:318) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1482) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1409) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1473) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:385) at org.apache.spark.rdd.RDD.iterator(RDD.scala:336) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
... I'm not really a java or scala programmer, but based on the stack it looks to me like Microsoft has replaced the default handling of CSV read operations. There is something called a "VegasCSVFileFormat" in this stack trace and I'm not sure how it got there but it is suspicious to me. The reason I distrust it is because (1) it is not found in google, and (2) this "vegas" code-word has Microsoft's name written all over it. I think it is related to their home-grown "intelligent cache" technology.
I will also send this thru CSS (Microsoft's secondary/"professional" support service) but that can be a multi-month engagement, just to diagnose. I'm hoping that someone is already aware of this buggy component, and how I might simply disable it.
The code to read this CSV into my Spark session is straightforward. I don't have anything fancy going on here, aside from the C# language bindings. This code on the driver would NOT have any impact the executors that are trying to load the CSV and failing. I'm certain the same bug would happen in a Python or Scala repro.
My CSV file is a bit on the hefty side (200 MiB) and I may one day use another format for this intermediate data, but I don't want Microsoft to force the issue right now, because of a buggy component that throw's NPE's.
The error happens in about 500 ms, on two executors of four cores each.
I am encountering this error about 5 or 10 percent of the time. It seems to be a fairly recent problem in the past couple months. It is just a theory, but I'm thinking the error message itself is misleading and could probably be enhanced. There may be an underlying root cause below the NPE that is being obscured because of poor structured error-handling . (For example, my stuff runs in a Synapse "Managed VNET" which has always seemed unreliable , and there may be socket exceptions that prevent files from being retrieved data consistently. The socket exceptions are the type of thing that no Microsoft programmer - of the "VegasCSVFileFormat" - would have planned for. Few of us would expect to encounter unreliable networking when connecting to resources in the same region that are only ~0ms away. But that type of problem has become a repeating theme on Synapse.)
Please let me know if anyone has experienced random NPE's on Synapse Spark, especially while using the "VegasCSVFileFormat". My goal is to figure out how to disable the Microsoft customizations in the reading of CSV files. I suspect the root cause of this problem wouldn't go away (eg. the underlying socket exception) but perhaps I will start getting better exception messages from the actual Apache libraries. I already have another ticket open with Microsoft about the unreliability of their "Managed VNET" but they haven't been motivated to troubleshoot or fix it yet. Perhaps if I can show them yet another repro then that would help.