Help with Delta load pyspark script error.

Anonymous
2024-11-11T14:53:35+00:00

Script below is from Notebook. Same template has been used for other delta tables successfully - why not this?! Thanks in advance.

Creating my dataframe (source data)

dfNewData = spark.read.load('abfss://******@xxxxxxxxxxxxxxxuat.dfs.core.windows.net/bronze/C365 compliance/entities/', format='parquet')

dfNewData.createOrReplaceTempView('entities')


-- Using SQL to adopt CETAS approach - creating Delta table if not there.

%%sql CREATE TABLE IF NOT EXISTS C365.entitiesUSING DELTALOCATION 'abfss://******@xxxxxxxxxxxxxxxxuat.dfs.core.windows.net/silver/C365/entities'AS SELECT * FROM entities;

Merge data.

import delta FactEntities = delta.DeltaTable.forPath(spark, 'abfss://******@xxxxxxxxxxxxxxxxxxxuat.dfs.core.windows.net/silver/C365/entities/')

Merge new data(FactEntities.alias('entities').merge(dfNewData.alias('New'),"entities.uprn = New.uprn").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute())


ERROR DETAILS

Py4JJavaError Traceback (most recent call last) Cell In[25], line 13 2 FactEntities = delta.DeltaTable.forPath(spark, 'abfss://******@xxxxxxxxxxxxxxxxxxxuat.dfs.core.windows.net/silver/C365/entities/') 4 # Merge new data 5 ( 6 FactEntities.alias('entities') 7 .merge( 8 dfNewData.alias('New'), 9 "entities.uprn = New.uprn" 10 ) 11 .whenMatchedUpdateAll() 12 .whenNotMatchedInsertAll() ---> 13 .execute() 14 ) File /usr/hdp/current/spark3-client/jars/delta-core_2.12-2.4.0.19.jar/delta/tables.py:1051, in DeltaMergeBuilder.execute(self) 1044 @since(0.4) # type: ignore[arg-type] 1045 def execute(self) -> None: 1046 """ 1047 Execute the merge operation based on the built matched and not matched actions. 1048 1049 See :py:class:~delta.tables.DeltaMergeBuilder for complete usage details. 1050 """ -> 1051 self._jbuilder.execute() File ~/cluster-env/env/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw) 167 def deco(*a: Any, **kw: Any) -> Any: 168 try: --> 169 return f(*a, **kw) 170 except Py4JJavaError as e: 171 converted = convert_exception(e.java_exception) File ~/cluster-env/env/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTERtype 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) Py4JJavaError: An error occurred while calling o5340.execute. : org.apache.spark.sql.delta.DeltaUnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge, when multiple source rows match on the same target row, the result may be ambiguous as it is unclear which source row should be used to update or delete the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches. Please refer to https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge at org.apache.spark.sql.delta.DeltaErrorsBase.multipleSourceRowMatchingTargetRowInMergeException(DeltaErrors.scala:1102) at org.apache.spark.sql.delta.DeltaErrorsBase.multipleSourceRowMatchingTargetRowInMergeException$(DeltaErrors.scala:1099) at org.apache.spark.sql.delta.DeltaErrors$.multipleSourceRowMatchingTargetRowInMergeException(DeltaErrors.scala:2808) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$findTouchedFilesLowShuffleMerge$1(MergeIntoCommand.scala:689) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:1263) at org.apache.spark.sql.delta.commands.MergeIntoCommand.findTouchedFilesLowShuffleMerge(MergeIntoCommand.scala:600) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$5(MergeIntoCommand.scala:313) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32) at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27) at org.apache.spark.sql.delta.commands.MergeIntoCommand.withStatusCode(MergeIntoCommand.scala:83) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2(MergeIntoCommand.scala:309) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2$adapted(MergeIntoCommand.scala:247) at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:237) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$1(MergeIntoCommand.scala:247) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:141) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:139) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordFrameProfile(MergeIntoCommand.scala:83) at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:134) at com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation(SynapseLoggingShim.scala:111) at com.microsoft.spark.telemetry.delta.SynapseLoggingShim.recordOperation$(SynapseLoggingShim.scala:93) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:83) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:133) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:123) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:113) at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:83) at org.apache.spark.sql.delta.commands.MergeIntoCommand.runMerge(MergeIntoCommand.scala:245) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runOrig$1(MergeIntoCommand.scala:238) at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries(MergeIntoMaterializeSource.scala:103) at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries$(MergeIntoMaterializeSource.scala:91) at org.apache.spark.sql.delta.commands.MergeIntoCommand.runWithMaterializedSourceLostRetries(MergeIntoCommand.scala:83) at org.apache.spark.sql.delta.commands.MergeIntoCommand.runOrig(MergeIntoCommand.scala:238) at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$1(MergeIntoCommand.scala:201) at org.apache.spark.sql.delta.sources.SQLConfUtils$.withGlutenDisabled(SQLConfUtils.scala:39) at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:201) at io.delta.tables.DeltaMergeBuilder.executeImpl(DeltaMergeBuilder.scala:306) at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$2(DeltaMergeBuilder.scala:269) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at io.delta.tables.DeltaMergeBuilder.withActiveSession(DeltaMergeBuilder.scala:277) at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:269) at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:105) at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:91) at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:148) at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:266) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829)

Windows for business Windows Server Windows cloud Other

Locked Question. This question was migrated from the Microsoft Support Community. You can vote on whether it's helpful, but you can't add comments or replies or follow the question. To protect privacy, user profiles for migrated questions are anonymized.

0 comments No comments
{count} votes
Accepted answer
  1. Anonymous
    2024-11-12T03:32:19+00:00

    Hi AC (BI),

    Thank you for posting in the Microsoft Community Forums.

    Read Data:

    Your read data section looks correct, just make sure your storage account, path and format (parquet) are all correct.

    Creating temporary views:

    Creating the temporary view entities is also correct, which allows you to use it in subsequent SQL queries.

    Create the Delta table using CETAS:

    Creating a Delta table in a SQL cell using the CETAS (Create Table As Select) syntax is correct. However, there are a few things to keep in mind:

    Make sure that C365 is the database that you have configured in Delta Lake, or make sure that you are authorized to create tables in the default database.

    Ensure that the specified path is valid and that the Spark cluster has permission to access it.

    Delta table merge operations:

    There are several potential issues here:

    Path to the DeltaTable object: Make sure that the path to the FactEntities object matches the path you defined in CETAS.

    Alias usage: You are using the aliases entities and New correctly in the merge method. however, if uprn is the unique identifier you are using for the merge, make sure that this field exists in both datasets and is of the same data type.

    Performing the merge: Your merge logic (whenMatchedUpdateAll().whenNotMatchedInsertAll()) looks correct, which means that if the records match (based on the uprn), the existing records are updated; if they don't match, the new records are inserted.

    Script structure:

    Your script is a mix of PySpark code and SQL cells. Make sure to execute these cells correctly in a Jupyter Notebook or Databricks environment, as SQL cells require specific environment support.

    Error Handling:

    In real-world deployments, it's a good practice to add error handling logic (e.g., try-except blocks) to catch and handle exceptions that may occur (e.g., file access issues, network problems, etc.).

    If you still can't solve the problems encountered in executing your scripts, you may want to seek the advice of a coding professional.

    Best regards

    Neuvi

    0 comments No comments

0 additional answers

Sort by: Most helpful