# update: 2022-3-30-15 """ $eq 等于 $ne 不等 $in [] db.collection.find({ field: { $not: { $in: [value1, value2, value3] } } }) $regex 模糊查找 (>) 大于 - $gt (<) 小于 - $lt (>=) 大于等于 - $gte (<= ) 小于等于 - $lte pip install pymongo==3.11.2 -i https://pypi.tuna.tsinghua.edu.cn/simple """ from pymongo import MongoClient, TEXT from bson.objectid import ObjectId class Client(MongoClient): def __init__(self, host='sri-thirdparty-mongo', port=27017, database='vms', username='admin', password='admin'): """ 内部访问: host: sri-thirdparty-mongo port: 27017 远程访问: host: 118.190.217.96、192.168.20.162 port: 7030 """ super().__init__(f'mongodb://{username}:{password}@{host}:{port}') self.db = self[database] def run_command(self, command=None, command_type='dict', use_admin=False): """ Command: print(self.db.command('serverStatus')) print(self.db.command('dbstats')) print(self['admin'].command('listDatabases', 1)) # 查看数据库信息 print(self['admin'].command({'setParameter': 1, 'internalQueryExecMaxBlockingSortBytes': 52428800})) # 设置排序内存限制 print(self['admin'].command({'getParameter': 1, 'internalQueryExecMaxBlockingSortBytes': 1})) # 查询排序内存限制 print(self.db.command('collstats', 'WorkFlowPlugin')) # 查看表信息 print(self.db.command({'collStats': 'WorkFlowPlugin'})) # 查看表信息 Usage: self.run_command( command={'setParameter': 1, 'internalQueryExecMaxBlockingSortBytes': 52428800}, use_admin=True ) # 设置排序内存限制 """ if use_admin and command_type == 'dict': return self['admin'].command(command) def get_indexes(self, table_name): """ 获取索引 # return collection.indexes.find() # return collection.list_indexes() """ collection = self.db[table_name] return collection.index_information() def add_index(self, table_name, index_list): """ table_name: Xxx index_list: [("field", 1)] # return collection.create_index([('sec_id', TEXT)], name='search_index_zyq', default_language='english') """ collection = self.db[table_name] return collection.create_index(index_list) def del_index(self, table_name, index_or_name): """ 删除索引 https://www.dotnetperls.com/create-index-mongodb table_name: Xxx index_list: [("field", 1)] """ collection = self.db[table_name] return collection.drop_index(index_or_name) def add(self, table_name, data): """ 保存数据 doc: https://www.cnblogs.com/aademeng/articles/9779271.html """ table = self.db[table_name] if type(data) == dict: return str(table.insert_one(data).inserted_id) elif type(data) == list: return [str(i) for i in table.insert_many(data).inserted_ids] def filter(self, table_name, condition_dict, sort_field=None): """ 筛选数据 condition_dict: 筛选条件 sort_field: [('排序字段', -1)] """ table = self.db[table_name] if sort_field: return table.find(condition_dict).sort(sort_field) else: return table.find(condition_dict) def filter_by_aggregate(self, table_name, condition=None, sort_dict=None, page=None, size=None): """ 聚合查询 condition: 条件 sort_dict: 排序 {'字段': -1} page: 页数 size: 条数 """ table = self.db[table_name] parameters = list() if condition: parameters.append({'$match': condition}) if sort_dict: parameters.append({'$sort': sort_dict}) if type(page) == int and type(size) == int: skip = 0 if page == 1 else (page - 1) * size parameters.append({'$skip': skip}) # 跳过多少条 parameters.append({'$limit': size}) return table.aggregate(parameters, allowDiskUse=True) def get_count(self, table_name, condition_dict=None): """ 获取数据数量 """ table = self.db[table_name] if not condition_dict: condition_dict = {} return table.count_documents(condition_dict) def get_all(self, table_name, sort_field=None): """ 获取数据 / 排序 table_name: '表名' sort_field: [('排序字段', -1)] """ table = self.db[table_name] if sort_field: return table.find().sort(sort_field) else: return table.find() def get_one(self, table_name, unique_dict): """ 单条获取 """ table = self.db[table_name] data = table.find_one(unique_dict) if data: return data else: return dict() def get_last_one(self, table_name, sort_list=None): """ 获取最后一条 table_name: '集合名称' sort_list: [('_id', -1)] """ table = self.db[table_name] if not sort_list: sort_list = [('_id', -1)] return table.find_one(sort=sort_list) def get_one_by_id(self, table_name, object_id): """ 单条获取 """ table = self.db[table_name] return table.find_one({'_id': ObjectId(str(object_id))}) def update_one_by_id(self, table_name, object_id, update_dict, need_back=False): """ 单条数据 """ table = self.db[table_name] modified_count = table.update_one({'_id': ObjectId(str(object_id))}, {'$set': update_dict}).modified_count if need_back: item = self.get_one_by_id(table_name, object_id) item.pop('_id') item['uuid'] = object_id return item else: return modified_count def update_all(self, table_name, condition_dict, update_dict): """ 批量更新 """ table = self.db[table_name] return table.update_many(condition_dict, {'$set': update_dict}).matched_count def update_one(self, table_name, unique_dict, update_dict, default_dict=None, need_back=False): """ # if get: # update update_one_by_id # else: # insert default_dict # return: get """ data = self.get_one(table_name, unique_dict) if data: object_id = str(data.get('_id')) update_dict.update(unique_dict) self.update_one_by_id(table_name, object_id, update_dict) else: if not default_dict: default_dict = dict() if '_id' in default_dict: del default_dict['_id'] default_dict.update(update_dict) default_dict.update(unique_dict) object_id = self.add(table_name, default_dict) if need_back: return self.get_one_by_id(table_name, object_id) else: return object_id def remove(self, table_name, condition_dict): """ 批量删除 """ table = self.db[table_name] return table.delete_many(condition_dict).deleted_count def remove_one_by_id(self, table_name, object_id): """ 单条删除 """ table = self.db[table_name] return table.delete_many({'_id': ObjectId(str(object_id))}).deleted_count def distinct(self, table_name, field): """ 去重 """ table = self.db[table_name] return table.distinct(field) def get_all_vague(self, table_name, unique_dict): """ 模糊查询获取 """ table = self.db[table_name] return table.find(unique_dict) def get_aggregate(self, table_name, condition): """ 分组聚合查询(by wdj) """ table = self.db[table_name] return table.aggregate(condition) def get_col_list(self, table_name, condition, _dic): """ 只返回某一列的数据(by wdj) """ table = self.db[table_name] return table.find(condition, _dic) # def find_by_aggregate(self, table_name,ip): # """ # 分组查询获取 {"$match":{"ip":ip} {"$group":{"_id":"","counter":{"$sum":1}}} # """ # table = self.db[table_name] # count= table.aggregate([{"$group":{"_id":"$find_plugin","count":{"$sum":"$risk_score"}}}]) # return count def deep_filter(self, collection, field_path, filter_type='equal', filter_value=None, return_count=False, sort_field=None): """ field_path: 递进字段名(适用字典类型) filter_type: 筛选类型 contain !contain equal !equal in !in sort_field: [('排序字段', -1)] """ if filter_type == 'equal': filter_rule = {'$eq': filter_value} elif filter_type == 'contain': filter_rule = {'$regex': f'^.*{filter_value}.*$'} else: filter_rule = {} condition = { field_path: filter_rule } if return_count: return self.get_count(collection, condition) elif sort_field: return self.filter(collection, condition, sort_field) else: return self.filter(collection, condition) def del_collect(self): db_collections = self.db.collection_names() for collection in db_collections: self.db[collection].drop() @staticmethod def uuid2oid(uuid): return ObjectId(uuid) if __name__ == '__main__': """ sudo python3 -m pip install pymongo sudo python3 /home/server/projects/taiwuict/cscec-8bur-vms/supplement-python/clients/db_mongo.py """ # --- init --- # mdb = Client(database='vms', host='192.168.0.16', port=7030) # mdb = Client(database='vms', host='192.168.51.242', port=7030) # mdb = Client(database='vms', host='192.168.1.233', port=7030) # mdb = Client(database='vms', host='192.168.0.16', port=7030) # mdb = Client(database='vms', host='10.8.20.115', port=7030) # 晨鸣大厦 # mdb = Client(database='vms', host='192.168.15.195', port=7030) # mdb = Client(database='vms', host='192.168.0.15', port=7030) # mdb = Client(database='vms', host='192.168.101.199', port=7030) # 章丘项目 # mdb = Client(database='vms', host='192.168.15.195', port=7030) # 第3现场 # mdb = Client(database='vms', host='192.168.1.242', port=7030) # 第4现场 # mdb = Client(database='ar', host='58.34.94.176', port=7030) # 第4现场 mdb = Client(host='127.0.0.1', port=47017, database='ar', username='admin', password='admin') # --- test --- # data = { # 'username': 'aabbss', # 'password': 'aabbss', # 'role_id': 'aabbss', # 'create_at': 11223344, # } # uuid = mdb.add('User', data) # --- test --- # __condition = { # 'face_name': {'$eq': None}, # } # total = mdb.get_count('Face', __condition) # print(total) # total = mdb.remove('Face', __condition) # print(total) # total = mdb.get_count('Face', __condition) # print(total) # --- test --- __condition = { # 'face_name': {'$ne': None}, # 'face_name': {'$eq': None}, 'face_name': {'$regex': '陈忠福'}, # 'face_features': {'$eq': b'\x80\x03N.'}, } items = mdb.filter('Face', __condition) # for item in items: # print(item) # # print(item.keys()) # total = mdb.get_count('Face', __condition) # print(total) # --- test --- # _id = '62cbbefba007e25f12acea9d' # out = mdb.get_one_by_id('Face', _id) # print(out) # --- test --- # condition = { # # 'face_name': {'$eq': None}, # 'face_name': {'$ne': None}, # } # total = mdb.get_count('Face', condition) # print(total) # --- test --- # items = mdb.get_all('Face') # for item in items: # if 'c7304b' in str(item.get('_id')): # print(item) # --- test --- # __condition = { # # 'face_feature_info_list': {'$ne': None}, # 'face_feature_info_list': {'$eq': None}, # } # # items = mdb.filter('Face', __condition) # # for item in items: # # print(item) # # break # total = mdb.get_count('Face', __condition) # print(total) # total = mdb.remove('Face', __condition) # print(total) # total = mdb.get_count('Face', __condition) # print(total) # --- test --- # unique_dict = { # 'prc_id': '371324199110201974', # } # item = mdb.get_one('Face', unique_dict) # print(item.get('cscec8_id')) # --- test --- # __condition = { # 'face_type_uuid_list': {'$in': ['62a190ca84b2c038bac3519d', 'aa', '62a1a438024e59b2e654467a']} # } # items = mdb.filter('Face', __condition) # for item in items: # print(item.get('face_name')) # --- test --- # __condition = { # # 'prc_id': {'$ne': None}, # # 'face_name': {'$ne': None}, # } # items = mdb.filter('Face', __condition) # for item in items: # print(item) # mdb.update_one_by_id('Face', str(item.get('_id')), {'upload_is': None}) # --- test --- # total = mdb.get_count('Face', __condition) # print(total) # --- test --- # # _id = '6295942092325efa38758bf1' # # _id = '629591b792325efa38758be2' # # _id = '6295925992325efa38758be5' # _id = '6295915592325efa38758bdf' # # mdb.update_one_by_id('Face', _id, {'prc_id': '370102198802249999'}) # # mdb.update_one_by_id('Face', _id, {'prc_id': '370102198802241111'}) # mdb.update_one_by_id('Face', _id, {'cscec8_id': 'f6bae517-0a28-429c-b65b-b16548ab2ca4'}) # --- test --- # import time # # __condition = dict( # modify_at={'$gte': time.time() - 86400 * 100}, # # modify_at={'$gte': methods.now_ts() - 3600}, # ) # items = mdb.filter('Face', __condition) # for item in items: # print(item.keys()) # total = mdb.get_count('Face', __condition) # print(total) # --- test --- # __uuid = '6244068e64481ae85ca38ef9' # item = mdb.get_one_by_id('Face', __uuid) # print(item) # --- test --- # _uuid = '6243fa35d169a1bf065f8f02' # item = mdb.get_one_by_id('Face', _uuid) # print(item.get('face_feature_info_list')[0]) # --- test --- # """ # Face: 陌生人脸表 # Face.face_name: 姓名 # """ # __condition = { # # 'face_name': {'$ne': None}, # # 'face_name': {'$eq': None}, # 'face_name': {'$regex': '王'}, # } # items = mdb.filter('Face', __condition) # for item in items: # print(item) # break # total = mdb.get_count('Face', __condition) # print(total) # --- test --- # __condition = { # 'name': {'$eq': '未知类型'} # } # count = mdb.get_count('FaceType', __condition) # print(count) # --- test --- # items = mdb.get_all('VisitorInfo') # for item in items: # _list = item.get('aaabbbccc', []) # print('_list', type(_list), _list) # break # --- test --- # _condition = {'ipv4': '192.168.30.232'} # items = mdb.filter_by_aggregate('UserCamera', condition=_condition, sort_dict={'create_at': -1}, page=1, # size=10) # count = 0 # for item in items: # count += 1 # print(count) # --- test --- # _uuid = '604c22d22043eb79cb425c9c' # out = mdb.remove('AgentDetectionRecord', {'camera_uuid': _uuid}) # # out = mdb.get_one('AgentDetectionRecord', {'camera_uuid': _uuid}) # print(out) # --- test --- # _condition = { # '_id': {'$in': [ObjectId('605994eb7c29e9dc3887e639')]} # } # items = mdb.filter('AgentDetectionRecord', _condition) # for item in items: # print(item) # --- test --- # out = mdb.get_count('VisitorInfo') # print(out)