Synapse Spark: Reading Data from Cosmos DB -Resolving Azure CosmosDB LinkedService in Azure Synapse failed. Access token couldn't be obtained

Sandro Falter 31 Reputation points
2021-09-15T06:44:25.267+00:00

Problem:
I tried to query data from the Cosmos DB Analytical Store, which is linked to my synapse workspace by using a Spark Notebook.

Configuration:
Cosmos DB:

  • Azure Synapse Service Principal has Contributor role
  • My Azure User Account has Contributor role

Azure Synapse Analytics

  • My User Account has Owner Role

Azure Synapse Workspace

  • My User Account has Synapse Administrator
  • My User Account has Synapse Compute Operator Role

Spark:

  • Spark 2.4
  • Python 3.6
  • Scala 2.11.12
  • Java: 1.8.0_272
  • Delta Lake 0.6

Remarks:
I can query the analytical store with a query from the serverless SQL pool. Only the spark connections fails with this error.

Code:

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "CosmosDb_Test")\
    .option("spark.cosmos.container", "TestContainer")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

Error:

Py4JJavaError: An error occurred while calling o208.load.
    : java.lang.RuntimeException: Resolving Azure CosmosDB LinkedService [CosmosDb_Test] in Azure Synapse failed.Validate the configured LinkedService. If still seeing this, try using the Azure CosmosDB account name and credentials directly.
        at com.microsoft.azure.cosmos.analytics.spark.connector.common.ArcadiaLinkService$.fetchCosmosAccountInfo(ArcadiaLinkService.scala:112)
        at com.microsoft.azure.cosmos.analytics.spark.connector.common.ArcadiaLinkService.fetchCosmosAccountInfo(ArcadiaLinkService.scala:138)
        at com.microsoft.azure.cosmos.oltp.spark.ConnectionResolver.resolveAccountInfoThroughLinkedService(ConnectionResolver.scala:31)
        at com.microsoft.azure.cosmos.oltp.spark.CosmosOLTPSource.transformConfig(CosmosOLTPSource.scala:46)
        at com.microsoft.azure.cosmos.oltp.spark.CosmosOLTPSource.sourceSchema(CosmosOLTPSource.scala:53)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:208)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:171)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.Exception: Access token couldn't be obtained {"result":"DependencyError","errorId":"BadRequest","errorMessage":"LSRServiceException is [{\"StatusCode\":400,\"ErrorResponse\":{\"code\":\"LSRLinkedServiceFailure\",\"message\":\"Could not load the Linked Service\",\"target\":\"CosmosDb_Test\"},\"Message\":\"Could not load the Linked Service\",\"Data\":{},\"InnerException\":null,\"StackTrace\":\"   at Microsoft.Marlin.Common.ADF.Impl.LSRClient.CheckForFailures(HttpResponseMessage response, String responseContent) in C:\\\\source\\\\Common\\\\Microsoft.Marlin.Common.ADF\\\\Impl\\\\LSRClient.cs:line 272\\r\\n   at Microsoft.Marlin.Common.ADF.Impl.LSRClient.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken, String traceId) in C:\\\\source\\\\Common\\\\Microsoft.Marlin.Common.ADF\\\\Impl\\\\LSRClient.cs:line 288\\r\\n   at Microsoft.Marlin.Common.ADF.Impl.LSRClient.ResolveLinkedServiceAsync(String linkedServiceName, ResolveAudienceRequest request, String traceId, CancellationToken cancellationToken) in C:\\\\source\\\\Common\\\\Microsoft.Marlin.Common.ADF\\\\Impl\\\\LSRClient.cs:line 185\\r\\n   at Microsoft.Marlin.TokenService.Token.LSRAudienceTokenProvider.GetToken(Boolean isLinkedService, String audience, String sessionToken, CancellationToken cancellationToken) in C:\\\\source\\\\TokenService\\\\Microsoft.Marlin.TokenService\\\\Token\\\\LSRAudienceTokenProvider.cs:line 129\\r\\n   at Microsoft.Marlin.TokenService.Token.LSRAudienceTokenProvider.GetTokenForAudienceAsync(Boolean isLinkedService, String audience, String account, String sessionToken, SignaturePayload signaturePayload, CancellationToken cancellationToken) in C:\\\\source\\\\TokenService\\\\Microsoft.Marlin.TokenService\\\\Token\\\\LSRAudienceTokenProvider.cs:line 67\\r\\n   at Microsoft.Marlin.TokenService.Controllers.TokenController.GetTokenAsync(TokenRequest request, CancellationToken cancellationToken) in C:\\\\source\\\\TokenService\\\\Microsoft.Marlin.TokenService\\\\Controllers\\\\TokenController.cs:line 67\\r\\n   at lambda_method(Closure , Object )\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ActionMethodExecutor.AwaitableObjectResultExecutor.Execute(IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.InvokeActionMethodAsync()\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.InvokeNextActionFilterAsync()\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.Rethrow(ActionExecutedContext context)\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ControllerActionInvoker.InvokeInnerFilterAsync()\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.InvokeNextResourceFilter()\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.Rethrow(ResourceExecutedContext context)\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.InvokeFilterPipelineAsync()\\r\\n   at Microsoft.AspNetCore.Mvc.Internal.ResourceInvoker.InvokeAsync()\\r\\n   at Microsoft.AspNetCore.Routing.EndpointMiddleware.Invoke(HttpContext httpContext)\\r\\n   at Microsoft.AspNetCore.Routing.EndpointRoutingMiddleware.Invoke(HttpContext httpContext)\\r\\n   at Microsoft.AspNetCore.StaticFiles.StaticFileMiddleware.Invoke(HttpContext context)\\r\\n   at Swashbuckle.AspNetCore.SwaggerUI.SwaggerUIMiddleware.Invoke(HttpContext httpContext)\\r\\n   at Swashbuckle.AspNetCore.Swagger.SwaggerMiddleware.Invoke(HttpContext httpContext, ISwaggerProvider swaggerProvider)\\r\\n   at Microsoft.AspNetCore.StaticFiles.StaticFileMiddleware.Invoke(HttpContext context)\\r\\n   at Microsoft.AspNetCore.Builder.Extensions.MapWhenMiddleware.Invoke(HttpContext context)\\r\\n   at Microsoft.AspNetCore.Diagnostics.ExceptionHandlerMiddleware.Invoke(HttpContext context)\",\"HelpLink\":null,\"Source\":\"Microsoft.Marlin.Common.ADF\",\"HResult\":-2146233088}]. TraceId : f099b286-8a14-4847-ab11-0b0434f6b9b1. Error Component : LSR"}
        at com.microsoft.azure.synapse.tokenlibrary.TokenLibrary$$anonfun$9.apply(TokenLibrary.scala:382)
        at com.microsoft.azure.synapse.tokenlibrary.TokenLibrary$$anonfun$9.apply(TokenLibrary.scala:374)
        at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:1808)
        at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:1807)
        at com.twitter.util.Promise$FutureTransformer.liftedTree1$1(Promise.scala:240)
        at com.twitter.util.Promise$FutureTransformer.k(Promise.scala:240)
        at com.twitter.util.Promise$Transformer.apply(Promise.scala:215)
        at com.twitter.util.Promise$WaitQueue.com$twitter$util$Promise$WaitQueue$$run(Promise.scala:91)
        at com.twitter.util.Promise$WaitQueue$$anon$4.run(Promise.scala:86)
        at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:198)
        at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157)
        at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:274)
        at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:109)
        at com.twitter.util.Promise$WaitQueue.runInScheduler(Promise.scala:86)
        at com.twitter.util.Promise.updateIfEmpty(Promise.scala:778)
        at com.twitter.util.Promise.update(Promise.scala:750)
        at com.twitter.util.Promise.setValue(Promise.scala:726)
        at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:123)
        at com.twitter.finagle.netty4.transport.ChannelTransport$$anon$2.channelRead(ChannelTransport.scala:168)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at com.twitter.finagle.netty4.http.handler.UnpoolHttpHandler$.channelRead(UnpoolHttpHandler.scala:32)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at com.twitter.finagle.netty4.http.handler.ClientExceptionMapper$.channelRead(ClientExceptionMapper.scala:35)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at com.twitter.finagle.netty4.http.handler.HeaderValidatorHandler$.channelRead(HeaderValidatorHandler.scala:51)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:483)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:383)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at com.twitter.finagle.util.BlockingTimeTrackingThreadFactory$$anon$1.run(BlockingTimeTrackingThreadFactory.scala:23)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more

    Traceback (most recent call last):

      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 400, in load
        return self._df(self._jreader.load())

      File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
        answer, self.gateway_client, self.target_id, self.name)

      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
        return f(*a, **kw)

      File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
        format(target_id, ".", name), value)

Steps to reproduce:

  1. Create Cosmos DB + Synapse Analytics
  2. Create Linked Services and Cosmos DB Analytical Store
  3. Paste above Code
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
4,554 questions
Azure Cosmos DB
Azure Cosmos DB
An Azure NoSQL database service for app development.
1,498 questions
{count} vote

Accepted answer
  1. PRADEEPCHEEKATLA-MSFT 82,271 Reputation points Microsoft Employee
    2021-09-20T09:47:21.16+00:00

    Hello @Sandro Falter ,

    Welcome to the Microsoft Q&A platform.

    From the error message "Resolving Azure CosmosDB LinkedService [CosmosDb_Test] in Azure Synapse failed.Validate the configured LinkedService. If still seeing this, try using the Azure CosmosDB account name and credentials directly." it clearly says failed to validate the linked service connection.

    From the stack trace it looks like unable to load the Azure CosmosDB linked service - "StatusCode\":400,\"ErrorResponse\":{\"code\":\"LSRLinkedServiceFailure\",\"message\":\"Could not load the Linked Service.

    You can try to below steps to connect Azure Cosmos DB using Apache Spark 2 in Azure Synapse Link:

    Step1: Go to Linked and select the Azure Cosmos DB linked service and test the connection.

    133591-image.png

    Step2: Select the container and click on actions => New Notebook => Load streaming DataFrame from container.

    133592-image.png

    Step3: Select the Spark pool and run the code to load the dataframe from container.

    133438-image.png

    For more details, refer to Interact with Azure Cosmos DB using Apache Spark 2 in Azure Synapse Link.

    Hope this will help. Please let us know if any further queries.


    • Please don't forget to click on 130616-image.png or upvote 130671-image.png button whenever the information provided helps you. Original posters help the community find answers faster by identifying the correct answer. Here is how
    • Want a reminder to come back and check responses? Here is how to subscribe to a notification
    • If you are interested in joining the VM program and help shape the future of Q&A: Here is how you can be part of Q&A Volunteer Moderators
    1 person found this answer helpful.

1 additional answer

Sort by: Most helpful
  1. Sonali Dubey 0 Reputation points Microsoft Employee
    2024-03-22T10:45:50.1833333+00:00

    I am also trying the same approach but it is not working for me. getting below error:

    Py4JJavaError: An error occurred while calling o4081.load. : java.lang.RuntimeException: Resolving Azure CosmosDB LinkedService [SynapseCosmosDbMIConnect] in Azure Synapse failed.Validate the configured LinkedService. If still seeing this, try using the Azure CosmosDB account name and credentials directly. at com.microsoft.azure.cosmos.analytics.spark.connector.api.ArcadiaLinkService$.fetchCosmosAccountInfo(ArcadiaLinkService.scala:118) at com.microsoft.azure.cosmos.analytics.spark.connector.api.ArcadiaLinkService.fetchCosmosAccountInfo(ArcadiaLinkService.scala:142) at

    0 comments No comments