Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files

Tilahun Hieni 0 Reputation points
2023-03-15T15:04:49.8033333+00:00
Py4JJavaError: An error occurred while calling o2391.count.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#216]
+- *(5) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#2072L])
   +- *(5) Project
      +- *(5) BroadcastHashJoin [KeyId#1696L], [KeyId#1778L], LeftOuter, BuildRight, false
         :- *(5) HashAggregate(keys=[knownfloatingpointnormalized(normalizenanandzero(Round_Trip_Discount#1506)) AS Round_Trip_Discount#1506, knownfloatingpointnormalized(normalizenanandzero(Discount_Rate#936)) AS Discount_Rate#936, Create_Date#1240, Deleted_From_Source_Flag#1582, Account_Note#1164, Booking_Guarantee_Expiration_Date#974, Is_Membership_Transaction_Bonus_Account#1202L, Member_Id#860L, Month#252, knownfloatingpointnormalized(normalizenanandzero(Deposit_Base_Amount#1126)) AS Deposit_Base_Amount#1126, Inactive_Override#898L, Inactive_Date#670, Ato_Fund_Forfeiture_Date#1392, Account_Id#328L, Description#480, Aircraft_Type_Id#822L, Account_Description_Id#518L, Year#215, Purchase_Date#1012, Account_Type#442L, _Ts#290L, knownfloatingpointnormalized(normalizenanandzero(Peak_Day_Surcharge#1316)) AS Peak_Day_Surcharge#1316, Update_Date#366, Create_User#1278, ... 13 more fields], functions=[], output=[KeyId#1696L])
         :  +- *(5) Project [_ts#33L AS _Ts#290L, accountId#34L AS Account_Id#328L, updateDate#35 AS Update_Date#366, updateUser#36 AS Update_User#404, accountType#37L AS Account_Type#442L, description#38 AS Description#480, accountDescriptionId#39L AS Account_Description_Id#518L, beginningBalance#40 AS Beginning_Balance#556, beginningHours#41 AS Beginning_Hours#594, cardNumber#42 AS Card_Number#632, inactiveDate#43 AS Inactive_Date#670, notifiedOfBalance#44L AS Notified_Of_Balance#708L, peakPeriodLevel#45L AS Peak_Period_Level#746L, startDate#46 AS Start_Date#784, aircraftTypeId#47L AS Aircraft_Type_Id#822L, memberId#48L AS Member_Id#860L, inactiveOverride#49L AS Inactive_Override#898L, discountRate#50 AS Discount_Rate#936, bookingGuaranteeExpirationDate#51 AS Booking_Guarantee_Expiration_Date#974, purchaseDate#52 AS Purchase_Date#1012, incentiveHours#53 AS Incentive_Hours#1050, incentiveValue#54 AS Incentive_Value#1088, depositBaseAmount#55 AS Deposit_Base_Amount#1126, accountNote#56 AS Account_Note#1164, ... 13 more fields]
         :     +- Exchange hashpartitioning(Year#215, Month#252, 200), REPARTITION, [id=#192]
         :        +- *(1) Project [_ts#33L, accountId#34L, updateDate#35, updateUser#36, accountType#37L, description#38, accountDescriptionId#39L, beginningBalance#40, beginningHours#41, cardNumber#42, inactiveDate#43, notifiedOfBalance#44L, peakPeriodLevel#45L, startDate#46, aircraftTypeId#47L, memberId#48L, inactiveOverride#49L, discountRate#50, bookingGuaranteeExpirationDate#51, purchaseDate#52, incentiveHours#53, incentiveValue#54, depositBaseAmount#55, accountNote#56, ... 13 more fields]
         :           +- *(1) Filter ((isnotnull(LoadDate#66) AND AtLeastNNulls(n, accountid#34L)) AND (LoadDate#66 > 2023-02-04 03:45:09.0194557))
         :              +- FileScan parquet [_type#32,_ts#33L,accountId#34L,updateDate#35,updateUser#36,accountType#37L,description#38,accountDescriptionId#39L,beginningBalance#40,beginningHours#41,cardNumber#42,inactiveDate#43,notifiedOfBalance#44L,peakPeriodLevel#45L,startDate#46,aircraftTypeId#47L,memberId#48L,inactiveOverride#49L,discountRate#50,bookingGuaranteeExpirationDate#51,purchaseDate#52,incentiveHours#53,incentiveValue#54,depositBaseAmount#55,... 11 more fields] Batched: false, DataFilters: [isnotnull(LoadDate#66), AtLeastNNulls(n, accountId#34L), (LoadDate#66 > 2023-02-04 03:45:09.0194..., Format: Parquet, Location: InMemoryFileIndex[abfss://zzzzz@xxxxxxxxx.dfs.core.windows.net/Atlas/yyyyyy/cccccc..., PartitionFilters: [], PushedFilters: [IsNotNull(LoadDate), GreaterThan(LoadDate,2023-02-04 03:45:09.0194557)], ReadSchema: struct<_type:string,_ts:bigint,accountId:bigint,updateDate:string,updateUser:string,accountType:b...
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#210]
            +- *(4) HashAggregate(keys=[KeyId#1778L], functions=[], output=[KeyId#1778L])
               +- Exchange hashpartitioning(KeyId#1778L, 200), ENSURE_REQUIREMENTS, [id=#206]
                  +- *(3) HashAggregate(keys=[KeyId#1778L], functions=[], output=[KeyId#1778L])
                     +- *(3) HashAggregate(keys=[knownfloatingpointnormalized(normalizenanandzero(Round_Trip_Discount#1506)) AS Round_Trip_Discount#1506, knownfloatingpointnormalized(normalizenanandzero(Discount_Rate#936)) AS Discount_Rate#936, Create_Date#1240, Deleted_From_Source_Flag#1582, Account_Note#1164, Booking_Guarantee_Expiration_Date#974, Is_Membership_Transaction_Bonus_Account#1202L, Member_Id#860L, Month#252, knownfloatingpointnormalized(normalizenanandzero(Deposit_Base_Amount#1126)) AS Deposit_Base_Amount#1126, Inactive_Override#898L, Inactive_Date#670, Ato_Fund_Forfeiture_Date#1392, Account_Id#328L, Description#480, Aircraft_Type_Id#822L, Account_Description_Id#518L, Year#215, Purchase_Date#1012, Account_Type#442L, _Ts#290L, knownfloatingpointnormalized(normalizenanandzero(Peak_Day_Surcharge#1316)) AS Peak_Day_Surcharge#1316, Update_Date#366, Create_User#1278, ... 13 more fields], functions=[], output=[KeyId#1778L])
                        +- *(3) Project [_ts#33L AS _Ts#290L, accountId#34L AS Account_Id#328L, updateDate#35 AS Update_Date#366, updateUser#36 AS Update_User#404, accountType#37L AS Account_Type#442L, description#38 AS Description#480, accountDescriptionId#39L AS Account_Description_Id#518L, beginningBalance#40 AS Beginning_Balance#556, beginningHours#41 AS Beginning_Hours#594, cardNumber#42 AS Card_Number#632, inactiveDate#43 AS Inactive_Date#670, notifiedOfBalance#44L AS Notified_Of_Balance#708L, peakPeriodLevel#45L AS Peak_Period_Level#746L, startDate#46 AS Start_Date#784, aircraftTypeId#47L AS Aircraft_Type_Id#822L, memberId#48L AS Member_Id#860L, inactiveOverride#49L AS Inactive_Override#898L, discountRate#50 AS Discount_Rate#936, bookingGuaranteeExpirationDate#51 AS Booking_Guarantee_Expiration_Date#974, purchaseDate#52 AS Purchase_Date#1012, incentiveHours#53 AS Incentive_Hours#1050, incentiveValue#54 AS Incentive_Value#1088, depositBaseAmount#55 AS Deposit_Base_Amount#1126, accountNote#56 AS Account_Note#1164, ... 13 more fields]
                           +- Exchange hashpartitioning(Year#215, Month#252, 200), REPARTITION, [id=#200]
                              +- *(2) Project [_ts#33L, accountId#34L, updateDate#35, updateUser#36, accountType#37L, description#38, accountDescriptionId#39L, beginningBalance#40, beginningHours#41, cardNumber#42, inactiveDate#43, notifiedOfBalance#44L, peakPeriodLevel#45L, startDate#46, aircraftTypeId#47L, memberId#48L, inactiveOverride#49L, discountRate#50, bookingGuaranteeExpirationDate#51, purchaseDate#52, incentiveHours#53, incentiveValue#54, depositBaseAmount#55, accountNote#56, ... 13 more fields]
                                 +- *(2) Filter ((((isnotnull(LoadDate#66) AND AtLeastNNulls(n, accountid#34L)) AND (LoadDate#66 > 2023-02-04 03:45:09.0194557)) AND (CASE WHEN (_type#32 = delete) THEN 1 ELSE 0 END = 1)) AND isnotnull(accountId#34L))
                                    +- FileScan parquet [_type#32,_ts#33L,accountId#34L,updateDate#35,updateUser#36,accountType#37L,description#38,accountDescriptionId#39L,beginningBalance#40,beginningHours#41,cardNumber#42,inactiveDate#43,notifiedOfBalance#44L,peakPeriodLevel#45L,startDate#46,aircraftTypeId#47L,memberId#48L,inactiveOverride#49L,discountRate#50,bookingGuaranteeExpirationDate#51,purchaseDate#52,incentiveHours#53,incentiveValue#54,depositBaseAmount#55,... 11 more fields] Batched: false, DataFilters: [isnotnull(LoadDate#66), AtLeastNNulls(n, accountId#34L), (LoadDate#66 > 2023-02-04 03:45:09.0194..., Format: Parquet, Location:InMemoryFileIndex[abfss://zzzzz@xxxxxxxxx.dfs.core.windows.net/Atlas/yyyyyy/cccccc..., PartitionFilters: [], PushedFilters: [IsNotNull(LoadDate), GreaterThan(LoadDate,2023-02-04 03:45:09.0194557), IsNotNull(accountId)], ReadSchema: struct<_type:string,_ts:bigint,accountId:bigint,updateDate:string,updateUser:string,accountType:b...

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:163)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:218)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:214)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:163)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:750)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:218)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:214)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:359)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:425)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3019)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3018)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:181)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:94)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3018)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 7.0 failed 4 times, most recent failure: Lost task 8.3 in stage 7.0 (TID 239) (vm-6dd71473 executor 1): org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:217)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:762)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file abfss://zzzzz@xxxxxxxxx.dfs.core.windows.net/Atlas/yyyyyy/cccccc/account/account_2023_02_15_130001.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:203)
	... 17 more
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableDouble cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableLong
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.setLong(SpecificInternalRow.scala:303)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.setLong(ParquetRowConverter.scala:179)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addLong(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
	... 22 more

Driver stacktrace:
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:206)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:194)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:515)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:227)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:212)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareRelation(BroadcastHashJoinExec.scala:226)
	at org.apache.spark.sql.execution.joins.HashJoin.codegenOuter(HashJoin.scala:503)
	at org.apache.spark.sql.execution.joins.HashJoin.codegenOuter$(HashJoin.scala:502)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:41)
	at org.apache.spark.sql.execution.joins.HashJoin.doConsume(HashJoin.scala:358)
	at org.apache.spark.sql.execution.joins.HashJoin.doConsume$(HashJoin.scala:355)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:41)
	at org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction(WholeStageCodegenExec.scala:221)
	at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:192)
	at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.consume(HashAggregateExec.scala:48)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultFunction(HashAggregateExec.scala:633)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:769)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:170)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:48)
	at org.apache.spark.sql.execution.joins.HashJoin.doProduce(HashJoin.scala:352)
	at org.apache.spark.sql.execution.joins.HashJoin.doProduce$(HashJoin.scala:351)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:41)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:41)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:55)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithoutKeys(HashAggregateExec.scala:244)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:168)
	at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:48)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:658)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:721)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:218)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:256)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:214)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:151)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:149)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:172)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 40 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 7.0 failed 4 times, most recent failure: Lost task 8.3 in stage 7.0 (TID 239) (vm-6dd71473 executor 1): org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:217)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:762)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file abfss://zzzzz@xxxxxxxxx.dfs.core.windows.net/Atlas/yyyyyy/cccccc/account/account_2023_02_15_130001.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:203)
	... 17 more
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableDouble cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableLong
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.setLong(SpecificInternalRow.scala:303)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.setLong(ParquetRowConverter.scala:179)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addLong(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
	... 22 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2313)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2262)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2261)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2261)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1132)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1132)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1132)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2500)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2442)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2431)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:908)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2306)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2327)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2346)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2371)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1037)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:415)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1036)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:435)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:203)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:217)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:762)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	... 3 more
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file abfss://zzzzz@xxxxxxxxx.dfs.core.windows.net/Atlas/yyyyyy/cccccc/account/account_2023_02_15_130001.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:111)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:203)
	... 17 more
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableDouble cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableLong
	at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.setLong(SpecificInternalRow.scala:303)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.setLong(ParquetRowConverter.scala:179)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addLong(ParquetRowConverter.scala:89)
	at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
	... 22 more
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
4,369 questions
{count} votes

1 answer

Sort by: Most helpful
  1. KranthiPakala-MSFT 46,422 Reputation points Microsoft Employee
    2023-03-20T17:28:51.63+00:00

    Hi @Tilahun Hieni ,

    Welcome to Microsoft Q&A forum and thanks for reaching out here.

    Could you please provide few additional information about your implementation like below:

    1. What components (pipeline activities or notebooks, or etc..) of synapse being used for reading the parquet file?
    2. Is your source parquet a delta lake file? if so what is the version the delta lake version of it?
    3. If you are using spark pool for execution of the read process, then what is the Synapse spark Pool version and what is the Delta lake version?

    Based on the error message my understanding is that:

    1. Either there is a mismatch between the data type of the column in the source and the data type of the corresponding sink column
    2. If you are sure that the column data types isn't an issue, then I would suggest checking the Delta Lake version of the Delta Lake table (parquet file) v/s Apache Spark version (Delta lake version). If you see incompatibility, then that could be one reason.

    Hope this helps. If it didn't help, please share few additional details as requested.