Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files

Anonymous
2023-03-15T15:04:49.8033333+00:00
Py4JJavaError: An error occurred while calling o2391.count.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#216]
+- *(5) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#2072L])
   +- *(5) Project
      +- *(5) BroadcastHashJoin [KeyId#1696L], [KeyId#1778L], LeftOuter, BuildRight, false
         :- *(5) HashAggregate(keys=[knownfloatingpointnormalized(normalizenanandzero(Round_Trip_Discount#1506)) AS Round_Trip_Discount#1506, knownfloatingpointnormalized(normalizenanandzero(Discount_Rate#936)) AS Discount_Rate#936, Create_Date#1240, Deleted_From_Source_Flag#1582, Account_Note#1164, Booking_Guarantee_Expiration_Date#974, Is_Membership_Transaction_Bonus_Account#1202L, Member_Id#860L, Month#252, knownfloatingpointnormalized(normalizenanandzero(Deposit_Base_Amount#1126)) AS Deposit_Base_Amount#1126, Inactive_Override#898L, Inactive_Date#670, Ato_Fund_Forfeiture_Date#1392, Account_Id#328L, Description#480, Aircraft_Type_Id#822L, Account_Description_Id#518L, Year#215, Purchase_Date#1012, Account_Type#442L, _Ts#290L, knownfloatingpointnormalized(normalizenanandzero(Peak_Day_Surcharge#1316)) AS Peak_Day_Surcharge#1316, Update_Date#366, Create_User#1278, ... 13 more fields], functions=[], output=[KeyId#1696L])
         :  +- *(5) Project [_ts#33L AS _Ts#290L, accountId#34L AS Account_Id#328L, updateDate#35 AS Update_Date#366, updateUser#36 AS Update_User#404, accountType#37L AS Account_Type#442L, description#38 AS Description#480, accountDescriptionId#39L AS Account_Description_Id#518L, beginningBalance#40 AS Beginning_Balance#556, beginningHours#41 AS Beginning_Hours#594, cardNumber#42 AS Card_Number#632, inactiveDate#43 AS Inactive_Date#670, notifiedOfBalance#44L AS Notified_Of_Balance#708L, peakPeriodLevel#45L AS Peak_Period_Level#746L, startDate#46 AS Start_Date#784, aircraftTypeId#47L AS Aircraft_Type_Id#822L, memberId#48L AS Member_Id#860L, inactiveOverride#49L AS Inactive_Override#898L, discountRate#50 AS Discount_Rate#936, bookingGuaranteeExpirationDate#51 AS Booking_Guarantee_Expiration_Date#974, purchaseDate#52 AS Purchase_Date#1012, incentiveHours#53 AS Incentive_Hours#1050, incentiveValue#54 AS Incentive_Value#1088, depositBaseAmount#55 AS Deposit_Base_Amount#1126, accountNote#56 AS Account_Note#1164, ... 13 more fields]
         :     +- Exchange hashpartitioning(Year#215, Month#252, 200), REPARTITION, [id=#192]
         :        +- *(1) Project [_ts#33L, accountId#34L, updateDate#35, updateUser#36, accountType#37L, description#38, accountDescriptionId#39L, beginningBalance#40, beginningHours#41, cardNumber#42, inactiveDate#43, notifiedOfBalance#44L, peakPeriodLevel#45L, startDate#46, aircraftTypeId#47L, memberId#48L, inactiveOverride#49L, discountRate#50, bookingGuaranteeExpirationDate#51, purchaseDate#52, incentiveHours#53, incentiveValue#54, depositBaseAmount#55, accountNote#56, ... 13 more fields]
         :           +- *(1) Filter ((isnotnull(LoadDate#66) AND AtLeastNNulls(n, accountid#34L)) AND (LoadDate#66 > 2023-02-04 03:45:09.0194557))
         :              +- FileScan parquet [_type#32,_ts#33L,accountId#34L,updateDate#35,updateUser#36,accountType#37L,description#38,accountDescriptionId#39L,beginningBalance#40,beginningHours#41,cardNumber#42,inactiveDate#43,notifiedOfBalance#44L,peakPeriodLevel#45L,startDate#46,aircraftTypeId#47L,memberId#48L,inactiveOverride#49L,discountRate#50,bookingGuaranteeExpirationDate#51,purchaseDate#52,incentiveHours#53,incentiveValue#54,depositBaseAmount#55,... 11 more fields] Batched: false, DataFilters: [isnotnull(LoadDate#66), AtLeastNNulls(n, accountId#34L), (LoadDate#66 > 2023-02-04 03:45:09.0194..., Format: Parquet, Location: InMemoryFileIndex[abfss://******@xxxxxxxxx.dfs.core.windows.net/Atlas/yyyyyy/cccccc..., PartitionFilters: [], PushedFilters: [IsNotNull(LoadDate), GreaterThan(LoadDate,2023-02-04 03:45:09.0194557)], ReadSchema: struct<_type:string,_ts:bigint,accountId:bigint,updateDate:string,updateUser:string,accountType:b...
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#210]
            +- *(4) HashAggregate(keys=[KeyId#1778L], functions=[], output=[KeyId#1778L])
               +- Exchange hashpartitioning(KeyId#1778L, 200), ENSURE_REQUIREMENTS, [id=#206]
                  +- *(3) HashAggregate(keys=[KeyId#1778L], functions=[], output=[KeyId#1778L])
                     +- *(3) HashAggregate(keys=[knownfloatingpointnormalized(normalizenanandzero(Round_Trip_Discount#1506)) AS Round_Trip_Discount#1506, knownfloatingpointnormalized(normalizenanandzero(Discount_Rate#936)) AS Discount_Rate#936, Create_Date#1240, Deleted_From_Source_Flag#1582, Account_Note#1164, Booking_Guarantee_Expiration_Date#974, Is_Membership_Transaction_Bonus_Account#1202L, Member_Id#860L, Month#252, knownfloatingpointnormalized(normalizenanandzero(Deposit_Base_Amount#1126)) AS Deposit_Base_Amount#1126, Inactive_Override#898L, Inactive_Date#670, Ato_Fund_Forfeiture_Date#1392, Account_Id#328L, Description#480, Aircraft_Type_Id#822L, Account_Description_Id#518L, Year#215, Purchase_Date#1012, Account_Type#442L, _Ts#290L, knownfloatingpointnormalized(normalizenanandzero(Peak_Day_Surcharge#1316)) AS Peak_Day_Surcharge#1316, Update_Date#366, Create_User#1278, ... 13 more fields], functions=[], output=[KeyId#1778L])
                        +- *(3) Project [_ts#33L AS _Ts#290L, accountId#34L AS Account_Id#328L, updateDate#35 AS Update_Date#366, updateUser#36 AS Update_User#404, accountType#37L AS Account_Type#442L, description#38 AS Description#480, accountDescriptionId#39L AS Account_Description_Id#518L, beginningBalance#40 AS Beginning_Balance#556, beginningHours#41 AS Beginning_Hours#594, cardNumber#42 AS Card_Number#632, inactiveDate#43 AS Inactive_Date#670, notifiedOfBalance#44L AS Notified_Of_Balance#708L, peakPeriodLevel#45L AS Peak_Period_Level#746L, startDate#46 AS Start_Date#784, aircraftTypeId#47L AS Aircraft_Type_Id#822L, memberId#48L AS Member_Id#860L, inactiveOverride#49L AS Inactive_Override#898L, discountRate#50 AS Discount_Rate#936, bookingGuaranteeExpirationDate#51 AS Booking_Guarantee_Expiration_Date#974, purchaseDate#52 AS Purchase_Date#1012, incentiveHours#53 AS Incentive_Hours#1050, incentiveValue#54 AS Incentive_Value#1088, depositBaseAmount#55 AS Deposit_Base_Amount#1126, accountNote#56 AS Account_Note#1164, ... 13 more fields]
                           +- Exchange hashpartitioning(Year#215, Month#252, 200), REPARTITION, [id=#200]
                              +- *(2) Project [_ts#33L, accountId#34L, updateDate#35, updateUser#36, accountType#37L, description#38, accountDescriptionId#39L, beginningBalance#40, beginningHours#41, cardNumber#42, inactiveDate#43, notifiedOfBalance#44L, peakPeriodLevel#45L, startDate#46, aircraftTypeId#47L, memberId#48L, inactiveOverride#49L, discountRate#50, bookingGuaranteeExpirationDate#51, purchaseDate#52, incentiveHours#53, incentiveValue#54, depositBaseAmount#55, accountNote#56, ... 13 more fields]
                                 +- *(2) Filter ((((isnotnull(LoadDate#66) AND AtLeastNNulls(n, accountid#34L)) AND (LoadDate#66 > 2023-02-04 03:45:09.0194557)) AND (CASE WHEN (_type#32 = delete) THEN 1 ELSE 0 END = 1)) AND isnotnull(accountId#34L))
                                    +- FileScan parquet [_type#32,_ts#33L,accountId#34L,updateDate#35,updateUser#36,accountType#37L,description#38,accountDescriptionId#39L,beginningBalance#40,beginningHours#41,cardNumber#42,inactiveDate#43,notifiedOfBalance#44L,peakPeriodLevel#45L,startDate#46,aircraftTypeId#47L,memberId#48L,inactiveOverride#49L,discountRate#50,bookingGuaranteeExpirationDate#51,purchaseDate#52,incentiveHours#53,incentiveValue#54,depositBaseAmount#55,... 11 more fields] Batched: false, DataFilters: [isnotnull(LoadDate#66), AtLeastNNulls(n, accountId#34L), (LoadDate#66 > 2023-02-04 03:45:09.0194..., Format: Parquet, Location:InMemoryFileIndex[abfss://******@xxxxxxxxx.dfs.core.windows.net/Atlas/yyyyyy/cccccc..., PartitionFilters: [], PushedFilters: [IsNotNull(LoadDate), GreaterThan(LoadDate,2023-02-04 03:45:09.0194557), IsNotNull(accountId)], ReadSchema: struct<_type:string,_ts:bigint,accountId:bigint,updateDate:string,updateUser:string,accountType:b...

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:163)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:218)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:214)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:163)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:750)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:218)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:214)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:359)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:425)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3019)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3018)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:181)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:94)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3018)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 7.0 failed 4 times, most recent failure: Lost task 8.3 in stage 7.0 (TID 239) (vm-6dd71473 executor 1): org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:217)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:762)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file abfss://******@xxxxxxxxx.dfs.core.windows.net/Atlas/yyyyyy/cccccc/account/account_2023_02_15_130001.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:203)
	... 17 more
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableDouble cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableLong
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.setLong(SpecificInternalRow.scala:303)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.setLong(ParquetRowConverter.scala:179)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addLong(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
	... 22 more

Driver stacktrace:
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:206)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:194)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:515)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:227)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:212)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareRelation(BroadcastHashJoinExec.scala:226)
	at org.apache.spark.sql.execution.joins.HashJoin.codegenOuter(HashJoin.scala:503)
	at org.apache.spark.sql.execution.joins.HashJoin.codegenOuter$(HashJoin.scala:502)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:41)
	at org.apache.spark.sql.execution.joins.HashJoin.doConsume(HashJoin.scala:358)
	at org.apache.spark.sql.execution.joins.HashJoin.doConsume$(HashJoin.scala:355)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:41)
	at org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction(WholeStageCodegenExec.scala:221)
	at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:192)
	at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.consume(HashAggregateExec.scala:48)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultFunction(HashAggregateExec.scala:633)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:769)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:170)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:48)
	at org.apache.spark.sql.execution.joins.HashJoin.doProduce(HashJoin.scala:352)
	at org.apache.spark.sql.execution.joins.HashJoin.doProduce$(HashJoin.scala:351)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:41)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:41)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:55)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithoutKeys(HashAggregateExec.scala:244)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:168)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:48)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:658)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:721)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:218)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:214)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:151)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:149)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:172)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 40 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 7.0 failed 4 times, most recent failure: Lost task 8.3 in stage 7.0 (TID 239) (vm-6dd71473 executor 1): org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:217)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:762)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file abfss://******@xxxxxxxxx.dfs.core.windows.net/Atlas/yyyyyy/cccccc/account/account_2023_02_15_130001.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:203)
	... 17 more
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableDouble cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableLong
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.setLong(SpecificInternalRow.scala:303)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.setLong(ParquetRowConverter.scala:179)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addLong(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
	... 22 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2313)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2262)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2261)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2261)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1132)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1132)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1132)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2500)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2442)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2431)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:908)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2306)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2327)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2346)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2371)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1037)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:415)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1036)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:435)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:203)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:217)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:762)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	... 3 more
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file abfss://******@xxxxxxxxx.dfs.core.windows.net/Atlas/yyyyyy/cccccc/account/account_2023_02_15_130001.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:203)
	... 17 more
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableDouble cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableLong
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.setLong(SpecificInternalRow.scala:303)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.setLong(ParquetRowConverter.scala:179)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addLong(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
	... 22 more
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,379 questions
{count} votes

1 answer

Sort by: Most helpful
  1. KranthiPakala-MSFT 46,642 Reputation points Microsoft Employee Moderator
    2023-03-20T17:28:51.63+00:00

    Hi @Tilahun Hieni ,

    Welcome to Microsoft Q&A forum and thanks for reaching out here.

    Could you please provide few additional information about your implementation like below:

    1. What components (pipeline activities or notebooks, or etc..) of synapse being used for reading the parquet file?
    2. Is your source parquet a delta lake file? if so what is the version the delta lake version of it?
    3. If you are using spark pool for execution of the read process, then what is the Synapse spark Pool version and what is the Delta lake version?

    Based on the error message my understanding is that:

    1. Either there is a mismatch between the data type of the column in the source and the data type of the corresponding sink column
    2. If you are sure that the column data types isn't an issue, then I would suggest checking the Delta Lake version of the Delta Lake table (parquet file) v/s Apache Spark version (Delta lake version). If you see incompatibility, then that could be one reason.

    Hope this helps. If it didn't help, please share few additional details as requested.


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.