Execute a custom query on Azure Cosmos DB using Spark in Azure Synapse

Abhiram Duvvuru 236 Reputation points Microsoft Employee
2023-06-17T01:54:24.03+00:00

Hi Team,

I want to run custom query against cosmos db in spark using azure synapse link (( synapse Workspace). I'm ending up with below error. can you please if there is any workaround to run custom query

Code:

df = spark.read.format("cosmos.olap")\
    .option("spark.cosmos.accountEndpoint", "xxxx")\
    .option("spark.cosmos.accountKey", "xxxx")\
    .option("spark.cosmos.container", "xxxx")\
    .option("spark.cosmos.database", "xxx")\
    .option("spark.cosmos.read.customQuery","SELECT a.id,b.id FROM a JOIN b in a.EventTypes
WHERE a._type = 'xxx'")\
    .load()

Error:

--------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
/tmp/ipykernel_11236/3724999061.py in <module>
      1 # To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
      2 
----> 3 df = spark.read.format("cosmos.olap")\
      4     .option("spark.cosmos.accountEndpoint", "xxxxxxxxxxxxxxxxx")\
      5     .option("spark.cosmos.accountKey", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxx")\

/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    208             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    209         else:
--> 210             return self._df(self._jreader.load())
    211 
    212     def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,

~/cluster-env/clonedenv/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

IllegalArgumentException: Unknown cosmos configuration was specified: spark.cosmos.read.customquery

Thanks,

Abhiram

Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,373 questions
Azure Cosmos DB
Azure Cosmos DB
An Azure NoSQL database service for app development.
1,901 questions
0 comments No comments
{count} votes

Accepted answer
  1. Sedat SALMAN 14,180 Reputation points MVP
    2023-06-17T14:13:41.5+00:00

    https://stackoverflow.com/questions/68151153/cosmos-db-spatial-query-using-spark

    https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/quickstart-spark

    https://github.com/Azure/azure-cosmosdb-spark/wiki/Configuration-references

    You can try using the cosmos.oltp format instead

    outlets_cfg = {
      "spark.cosmos.accountEndpoint" : cosmosEndpoint,
      "spark.cosmos.accountKey" : cosmosMasterKey,
      "spark.cosmos.database" : cosmosDatabaseName,
      "spark.cosmos.container" : cosmosContainerName,
      "spark.cosmos.read.customQuery" : "SELECT * FROM c WHERE ST_DISTANCE(c.location,{\"type\":\"Point\",\"coordinates\": [12.832489, 18.9553242]}) < 1000"
    }
    
    df = spark.read.format("cosmos.oltp").options(**outlets_cfg)\
     .option("spark.cosmos.read.inferSchema.enabled", "true")\
     .load()
    
    

    Also, in the Microsoft Learn documentation

    from pyspark.sql.functions import col
    df = spark.read.format("cosmos.oltp").options(**cfg)\
     .option("spark.cosmos.read.inferSchema.enabled", "true")\
     .load()
    df.filter(col("isAlive") == True)\
     .show()
    

    In the Azure Cosmos DB Spark Connector documentation, there is a configuration called query_custom that lets you override the default query when fetching data from Cosmos DB. However, it's not clear whether this configuration can be used with the cosmos.olap or cosmos.oltp format, or whether it can be used to execute custom queries like JOIN operations.


0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.