Here the code that I am using:
def generate_master_key_authorization_signature(verb, resource_type, resource_link, date, key):
key_type = "master"
token_version = "1.0"
payload = f"{verb.lower()}\n{resource_type.lower()}\n{resource_link}\n{date.lower()}\n\n"
decoded_key = base64.b64decode(key)
hmac_sha256 = hmac.new(decoded_key, payload.encode('utf-8'), hashlib.sha256)
hash_payload = hmac_sha256.digest()
signature = base64.encodebytes(hash_payload).decode('utf-8')
auth_set = urllib.parse.quote(f"type={key_type}&ver={token_version}&sig={signature}")
return auth_set
def get_cosmosdb_data():
verb = 'GET'
resource_type = 'docs'
resource_link = f'dbs/{DATABASE_ID}/colls/{CONTAINER_ID}/docs'
date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
print(date)
auth_header = generate_master_key_authorization_signature(verb, resource_type, resource_link, date, COSMOSDB_PRIMARY_KEY)
headers = {
'Authorization': auth_header,
'x-ms-version': '2018-12-31',
'x-ms-date': date,
'Content-Type': 'application/query+json',
'x-ms-documentdb-isquery': 'True'
}
query = {"query": "SELECT * FROM c"}
cosmosdb_uri = f"{COSMOSDB_URI}{resource_link}"
print(cosmosdb_uri)
response = requests.get(cosmosdb_uri, headers=headers, json=query)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Error retrieving data from Cosmos DB: {response.text}")
if __name__ == '__main__':
data = get_cosmosdb_data()
print(data)