Problem:
I tried to query data from the Cosmos DB Analytical Store, which is linked to my synapse workspace by using a Spark Notebook.
Configuration:
Cosmos DB:
- Azure Synapse Service Principal has Contributor role
- My Azure User Account has Contributor role
Azure Synapse Analytics
- My User Account has Owner Role
Azure Synapse Workspace
- My User Account has Synapse Administrator
- My User Account has Synapse Compute Operator Role
Spark:
- Spark 2.4
- Python 3.6
- Scala 2.11.12
- Java: 1.8.0_272
- Delta Lake 0.6
Remarks:
I can query the analytical store with a query from the serverless SQL pool. Only the spark connections fails with this error.
Code:
dfStream = spark.readStream\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "CosmosDb_Test")\
.option("spark.cosmos.container", "TestContainer")\
.option("spark.cosmos.changeFeed.readEnabled", "true")\
.option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
.option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
.option("spark.cosmos.changeFeed.queryName", "streamQuery")\
.load()
Error:
Py4JJavaError: An error occurred while calling o208.load.
: java.lang.RuntimeException: Resolving Azure CosmosDB LinkedService [CosmosDb_Test] in Azure Synapse failed.Validate the configured LinkedService. If still seeing this, try using the Azure CosmosDB account name and credentials directly.
at com.microsoft.azure.cosmos.analytics.spark.connector.common.ArcadiaLinkService$.fetchCosmosAccountInfo(ArcadiaLinkService.scala:112)
at com.microsoft.azure.cosmos.analytics.spark.connector.common.ArcadiaLinkService.fetchCosmosAccountInfo(ArcadiaLinkService.scala:138)
at com.microsoft.azure.cosmos.oltp.spark.ConnectionResolver.resolveAccountInfoThroughLinkedService(ConnectionResolver.scala:31)
at com.microsoft.azure.cosmos.oltp.spark.CosmosOLTPSource.transformConfig(CosmosOLTPSource.scala:46)
at com.microsoft.azure.cosmos.oltp.spark.CosmosOLTPSource.sourceSchema(CosmosOLTPSource.scala:53)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:208)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:171)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Access token couldn't be obtained {"result":"DependencyError","errorId":"BadRequest","errorMessage":"LSRServiceException is [{\"StatusCode\":400,\"ErrorResponse\":{\"code\":\"LSRLinkedServiceFailure\",\"message\":\"Could not load the Linked Service\",\"target\":\"CosmosDb_Test\"},\"Message\":\"Could not load the Linked Service\",\"Data\":{},\"InnerException\":null,\"StackTrace\":\" at Microsoft.Marlin.Common.ADF.Impl.LSRClient.CheckForFailures(HttpResponseMessage response, String responseContent) in C:\\\\source\\\\Common\\\\Microsoft.Marlin.Common.ADF\\\\Impl\\\\LSRClient.cs:line 272\\r\\n at Microsoft.Marlin.Common.ADF.Impl.LSRClient.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken, String traceId) in C:\\\\source\\\\Common\\\\Microsoft.Marlin.Common.ADF\\\\Impl\\\\LSRClient.cs:line 288\\r\\n at Microsoft.Marlin.Common.ADF.Impl.LSRClient.ResolveLinkedServiceAsync(String linkedServiceName, ResolveAudienceRequest request, String traceId, CancellationToken cancellationToken) in C:\\\\source\\\\Common\\\\Microsoft.Marlin.Common.ADF\\\\Impl\\\\LSRClient.cs:line 185\\r\\n at Microsoft.Marlin.TokenService.Token.LSRAudienceTokenProvider.GetToken(Boolean isLinkedService, String audience, String sessionToken, CancellationToken cancellationToken) in C:\\\\source\\\\TokenService\\\\Microsoft.Marlin.TokenService\\\\Token\\\\LSRAudienceTokenProvider.cs:line 129\\r\\n at Microsoft.Marlin.TokenService.Token.LSRAudienceTokenProvider.GetTokenForAudienceAsync(Boolean isLinkedService, String audience, String account, String sessionToken, SignaturePayload signaturePayload, CancellationToken cancellationToken) in C:\\\\source\\\\TokenService\\\\Microsoft.Marlin.TokenService\\\\Token\\\\LSRAudienceTokenProvider.cs:line 67\\r\\n at Microsoft.Marlin.TokenService.Controllers.TokenController.GetTokenAsync(TokenRequest request, CancellationToken cancellationToken) in C:\\\\source\\\\TokenService\\\\Microsoft.Marlin.TokenService\\\\Controllers\\\\TokenController.cs:line 67\\r\\n at lambda_method(Closure , Object )\\r\\n at Microsoft.AspNetCore.Mvc.Internal.ActionMethodExecutor.AwaitableObjectResultExecutor.Execute(IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)\\r\\n at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.InvokeActionMethodAsync()\\r\\n at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.InvokeNextActionFilterAsync()\\r\\n at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.Rethrow(ActionExecutedContext context)\\r\\n at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)\\r\\n at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.InvokeInnerFilterAsync()\\r\\n at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.InvokeNextResourceFilter()\\r\\n at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.Rethrow(ResourceExecutedContext context)\\r\\n at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)\\r\\n at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.InvokeFilterPipelineAsync()\\r\\n at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.InvokeAsync()\\r\\n at Microsoft.AspNetCore.Routing.EndpointMiddleware.Invoke(HttpContext httpContext)\\r\\n at Microsoft.AspNetCore.Routing.EndpointRoutingMiddleware.Invoke(HttpContext httpContext)\\r\\n at Microsoft.AspNetCore.StaticFiles.StaticFileMiddleware.Invoke(HttpContext context)\\r\\n at Swashbuckle.AspNetCore.SwaggerUI.SwaggerUIMiddleware.Invoke(HttpContext httpContext)\\r\\n at Swashbuckle.AspNetCore.Swagger.SwaggerMiddleware.Invoke(HttpContext httpContext, ISwaggerProvider swaggerProvider)\\r\\n at Microsoft.AspNetCore.StaticFiles.StaticFileMiddleware.Invoke(HttpContext context)\\r\\n at Microsoft.AspNetCore.Builder.Extensions.MapWhenMiddleware.Invoke(HttpContext context)\\r\\n at Microsoft.AspNetCore.Diagnostics.ExceptionHandlerMiddleware.Invoke(HttpContext context)\",\"HelpLink\":null,\"Source\":\"Microsoft.Marlin.Common.ADF\",\"HResult\":-2146233088}]. TraceId : f099b286-8a14-4847-ab11-0b0434f6b9b1. Error Component : LSR"}
at com.microsoft.azure.synapse.tokenlibrary.TokenLibrary$$anonfun$9.apply(TokenLibrary.scala:382)
at com.microsoft.azure.synapse.tokenlibrary.TokenLibrary$$anonfun$9.apply(TokenLibrary.scala:374)
at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:1808)
at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:1807)
at com.twitter.util.Promise$FutureTransformer.liftedTree1$1(Promise.scala:240)
at com.twitter.util.Promise$FutureTransformer.k(Promise.scala:240)
at com.twitter.util.Promise$Transformer.apply(Promise.scala:215)
at com.twitter.util.Promise$WaitQueue.com$twitter$util$Promise$WaitQueue$$run(Promise.scala:91)
at com.twitter.util.Promise$WaitQueue$$anon$4.run(Promise.scala:86)
at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:198)
at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157)
at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:274)
at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:109)
at com.twitter.util.Promise$WaitQueue.runInScheduler(Promise.scala:86)
at com.twitter.util.Promise.updateIfEmpty(Promise.scala:778)
at com.twitter.util.Promise.update(Promise.scala:750)
at com.twitter.util.Promise.setValue(Promise.scala:726)
at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:123)
at com.twitter.finagle.netty4.transport.ChannelTransport$$anon$2.channelRead(ChannelTransport.scala:168)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at com.twitter.finagle.netty4.http.handler.UnpoolHttpHandler$.channelRead(UnpoolHttpHandler.scala:32)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at com.twitter.finagle.netty4.http.handler.ClientExceptionMapper$.channelRead(ClientExceptionMapper.scala:35)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at com.twitter.finagle.netty4.http.handler.HeaderValidatorHandler$.channelRead(HeaderValidatorHandler.scala:51)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:483)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:383)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at com.twitter.finagle.util.BlockingTimeTrackingThreadFactory$$anon$1.run(BlockingTimeTrackingThreadFactory.scala:23)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 400, in load
return self._df(self._jreader.load())
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
return f(*a, **kw)
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
Steps to reproduce:
- Create Cosmos DB + Synapse Analytics
- Create Linked Services and Cosmos DB Analytical Store
- Paste above Code