How to work with Sparklyr and a lake database in azure synapse?

Tjerkstra P. (Pieter) 30 Reputation points
2023-09-19T07:18:52.5166667+00:00

I would like to work with a lake database with sparklyr from azure synapse.

If I run a SparkR notebook I can get the desired results in the following way:

schade2 <- tableToDF("silver.schade")
head(schade2)


However if I try the same using SparklyR.

spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", config = config)

schade2 <- spark_read_table(sc,"silver.schade")

I get the following errors below. Can anyone give a hint? thanks in advance!

Error: [1] "Error: org.apache.spark.SparkException: The Spark SQL phase analysis failed with an internal error. Please, fill a bug report in, and provide the full stack trace.\n\tat org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:560)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:572)\n\tat org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:227)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)\n\tat org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:226)\n\tat org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:81)\n\tat org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:79)\n\tat org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:71)\n\tat org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)\n\tat org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:90)\n\tat org.apache.spark.sql.DataFrameReader.table(DataFrameReader.scala:619)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat sparklyr.Invoke.invoke(invoke.scala:161)\n\tat sparklyr.StreamHandler.handleMethodCall(stream.scala:141)\n\tat sparklyr.StreamHandler.read(stream.scala:62)\n\tat sparklyr.BackendHandler.$anonfun$channelRead0$1(handler.scala:60)\n\tat scala.util.control.Breaks.breakable(Breaks.scala:42)\n\tat sparklyr.BackendHandler.channelRead0(handler.scala:41)\n\tat sparklyr.BackendHandler.channelRead0(handler.scala:14)\n\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: java.lang.NullPointerException: inputStream\n\tat java.util.Objects.requireNonNull(Objects.java:228)\n\tat org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1305)\n\tat org.apache.commons.io.IOUtils.copy(IOUtils.java:978)\n\tat org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1282)\n\tat org.apache.commons.io.IOUtils.copy(IOUtils.java:953)\n\tat org.apache.commons.io.IOUtils.toByteArray(IOUtils.java:2405)\n\tat org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.doLoadClass(IsolatedClientLoader.scala:278)\n\tat org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.loadClass(IsolatedClientLoader.scala:272)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:405)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:351)\n\tat org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:327)\n\tat org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:505)\n\tat org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:365)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:70)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:69)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createDatabase$1(HiveExternalCatalog.scala:192)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog.createDatabase(HiveExternalCatalog.scala:192)\n\tat org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:155)\n\tat org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:141)\n\tat org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:173)\n\tat org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:169)\n\tat org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:70)\n\tat org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:135)\n\tat org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:135)\n\tat org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupGlobalTempView(SessionCatalog.scala:1003)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.lookupTempView(Analyzer.scala:1180)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupRelation(Analyzer.scala:1264)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$13.applyOrElse(Analyzer.scala:1136)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$13.applyOrElse(Analyzer.scala:1100)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)\n\tat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)\n\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)\n\tat org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1100)\n\tat org.apache.spark.sql.catalyst.analys"

Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
4,927 questions
{count} votes

Accepted answer
  1. QuantumCache 20,266 Reputation points
    2023-09-21T20:30:22.22+00:00

    Hello @Tjerkstra P. (Pieter)

    Posting the solution from the original poster (Tjerkstra P. (Pieter) ) as an Answer, so that the other members can find this useful.

    Below is the troubleshooting outcome we have on this escalation, please let us know if we can close this case?

    Adding the version it worked on spark 3.3. So the following code worked:

    %%sparkr

    library(sparklyr)

    spark_version <- sparkR.version()

    config <- spark_config()

    sc <- spark_connect(master = "yarn", config = config, method = "synapse",version = spark_version)


0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.