Hi team,
I am trying to fetch the data from azure cosmos DB for mongoDB.
In the collection "resources", there are about 500,000 records
I need to fetch them all
so I query the data with pagination(limit 5000, offset from 0)
I fetched records from 0 to 250000 successfully,
but when I fetch the data from 250000 to 255000, it's failed with error: pymongo.errors.ExecutionTimeout: Request timed out
logs:
2023-08-16 05:33:05,220 issue878_export_to_redis.py[line:100] INFO get_Resource, limit=5000, offset=245000
2023-08-16 05:34:03,780 issue878_export_to_redis.py[line:100] INFO get_Resource, limit=5000, offset=250000
2023-08-16 05:35:03,521 issue878_export_to_redis.py[line:100] INFO get_Resource, limit=5000, offset=255000
Traceback (most recent call last):
File "issue878_export_to_redis.py", line 268, in <module>
dbUtil.get_Resource()
File "issue878_export_to_redis.py", line 103, in get_Resource
for it in self.db['Resource'].find(max_time_ms=60*1000).sort("updatedAt",pymongo.DESCENDING).limit(limit).skip(offset):
File "/home/zhong_wen_zhou1/.local/lib/python3.8/site-packages/pymongo/cursor.py", line 1251, in next
if len(self.__data) or self._refresh():
File "/home/zhong_wen_zhou1/.local/lib/python3.8/site-packages/pymongo/cursor.py", line 1168, in _refresh
self.__send_message(q)
File "/home/zhong_wen_zhou1/.local/lib/python3.8/site-packages/pymongo/cursor.py", line 1055, in __send_message
response = client._run_operation(
File "/home/zhong_wen_zhou1/.local/lib/python3.8/site-packages/pymongo/_csot.py", line 106, in csot_wrapper
return func(self, *args, **kwargs)
File "/home/zhong_wen_zhou1/.local/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1341, in _run_operation
return self._retryable_read(
File "/home/zhong_wen_zhou1/.local/lib/python3.8/site-packages/pymongo/_csot.py", line 106, in csot_wrapper
return func(self, *args, **kwargs)
File "/home/zhong_wen_zhou1/.local/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1465, in _retryable_read
return func(session, server, sock_info, read_pref)
File "/home/zhong_wen_zhou1/.local/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1337, in _cmd
return server.run_operation(
File "/home/zhong_wen_zhou1/.local/lib/python3.8/site-packages/pymongo/helpers.py", line 279, in inner
return func(*args, **kwargs)
File "/home/zhong_wen_zhou1/.local/lib/python3.8/site-packages/pymongo/server.py", line 135, in run_operation
_check_command_response(first, sock_info.max_wire_version)
File "/home/zhong_wen_zhou1/.local/lib/python3.8/site-packages/pymongo/helpers.py", line 190, in _check_command_response
raise ExecutionTimeout(errmsg, code, response, max_wire_version)
pymongo.errors.ExecutionTimeout: Request timed out. Retries due to rate limiting: False., full error: {'ok': 0.0, 'errmsg': 'Request timed out. Retries due to rate limiting: False.', 'code': 50, 'codeName': 'ExceededTimeLimit'}
zhong_wen_zhou1@vmljumphostdp02:~/metering/metering_scripts$
code:
import os
import sys
import logging
import csv
import datetime
import time
import json
from dotenv import load_dotenv
import pymongo
import pprint
import redis
LOG = logging.getLogger()
DB_CON =''
CA_PATH=''
'''
pip3 install redis
pip3 install python-dotenv
pip3 install pymongo
'''
class DBMigration(object):
def __init__(self):
self.db_client = pymongo.MongoClient(DB_CON)
self.db = self.db_client["metering"]
for prop, value in vars(self.db_client.options).items():
print("Property: {}: Value: {} ".format(prop, value))
for k, v in self.db_client.server_info().items():
print("Key: {} , Value: {}".format(k, v))
print("Server status {}".format(self.db_client.admin.command("serverStatus")))
databases = self.db_client.list_database_names()
print("Databases: {}".format(databases))
def get_Resource(self):
limit=5000
offset=240000
while True:
LOG.info('get_Resource, limit=%s, offset=%s'%(limit, offset))
resource_map = {}
snapshotId_list = []
for it in self.db['Resource'].find(max_time_ms=180*1000).sort("updatedAt",pymongo.DESCENDING).limit(limit).skip(offset):
key='_kvModel:Resource:%s'%(it['instanceUUID'])
if not it.get('lastSnapshotId'):
LOG.error('key=%s'%(key))
continue
# if it.get('resourceData',{}).get('eventType','') == 'delete':
# continue
resource_map[key] = {}
resource_map[key]['resource'] = self._convert_json(it)
snapshotId_list.append(it['lastSnapshotId'])
LOG.info('get_Resource, got results=%s'%(len(snapshotId_list)))
if len(snapshotId_list) == 0:
LOG.info('no more records')
break
# resource_map, eventId_list = self.get_lastSnapshot(snapshotId_list, resource_map)
# resource_map = self.get_Events(eventId_list, resource_map)
# json.dump(resource_map, open('/temp/resource_map.txt','w+'), indent=2)
# break
data_dict = {}
for key, info in resource_map.items():
data_dict[key] = json.dumps(info)
# self.redis.mset(data_dict)
offset += limit
def _convert_json(self, mongoObj):
info={}
for k, v in mongoObj.items():
if 'ObjectId' in str(type(v)):
if k=='_id':
info['_id'] = str(v)
else:
info[k]=str(v)
elif 'datetime' in str(type(v)):
info[k]='%sZ'%(v.isoformat())
else:
info[k]=v
return info
def clearDB(self):
rs= self.redis.flushall(asynchronous=True)
print('empty whole db now')
if __name__ == "__main__":
if not LOG.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(
logging.Formatter(
"%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s \
%(message)s"))
LOG.addHandler(handler)
LOG.setLevel(logging.DEBUG)
env = 'Azure_DEV'
env_file = './issue878_%s.env'%(env)
load_dotenv(env_file)
DB_CON = os.getenv("DB_CON")
if not DB_CON:
raise Exception('please set env var DB_CON first, example: export DB_CON="mongodb://localhost:27017/"')
print(50*'#')
print('please double check DB_CON=%s'%(DB_CON))
print(50*'#')
LOG.debug('env=%s, env_file=%s'%(env, env_file))
time.sleep(5)
dbUtil = DBMigration()
dbUtil.get_Resource()