from import generate_password_hash from hub import methods, Global async def code1001(**sources): """ 新增用户 """ # --- check --- if not sources.get('username'): return dict(code=1, details=f"something is wrong.") # --- check --- if sources.get('password'): sources['password'] = generate_password_hash(sources.get('password')) else: sources['password'] = generate_password_hash('baosteel@2024') # --- save --- """ UserInfo: 用户信息表 UserInfo.username: 登录账号 UserInfo.password: 登录密码 (默认值:baosteel@2024) UserInfo.role_type: 角色类型 (1: 超级管理员 2: 普通管理员 3: 普通用户) (默认值:3) UserInfo.create_at: 创建时间 UserInfo.update_at: 修改时间 姓名 UserInfo.cid: 身份证号 手机号 性别 (0:未设置 1:男 2:女) UserInfo.department: 部门 UserInfo.wid: 工号 UserInfo.allow_at: 注册审批时间 UserInfo.state: 禁用状态 (0:激活 1:禁用) """ data = { 'username': sources.get('username'), 'password': sources.get('password'), 'role_type': str(sources.get('role_type', 3)), 'name': sources.get('name'), 'cid': sources.get('cid'), 'phone': sources.get('phone'), 'sex': str(sources.get('sex')), 'department': sources.get('department'), 'wid': sources.get('wid'), 'state': '0', 'allow_at': methods.now_ts(), } uuid = Global.mdb.add('UserInfo', data) return dict(code=0, data=uuid) async def code1002(**sources): """ 查询用户列表 """ # --- check --- if not sources.get('page'): return dict(code=1, details=f"something is wrong.") elif not sources.get('size'): return dict(code=2, details=f"something is wrong.") # --- debug --- # methods.debug_log('code1000.code1002.82:', f"#role_type: {sources.get('role_type')}") # --- fill d1 --- d1 = list() page = sources.get('page', 1) size = sources.get('size', 10) for item in Global.mdb.get_all('UserInfo', sort_field=[('allow_at', -1)]): # methods.debug_log('code1000|72', f"#item: {item.get('name')}") # --- check --- if sources.get('name') and sources.get('name') not in str(item.get('name')): continue # --- check --- if sources.get('phone') and str(sources.get('phone')) != str(item.get('phone')): continue # --- check --- if not item.get('role_type'): item['role_type'] = '3' # --- check --- if sources.get('role_type') and str(item.get('role_type')) not in sources.get('role_type'): continue # --- check --- item['role_type'] = str(item.get('role_type')) item['sex'] = str(item.get('sex')) item['state'] = str(item.get('state')) item['uuid'] = str(item.get('_id')) item.pop('_id') item.pop('password') d1.append(item) return dict(code=0, data=d1[(page - 1) * size: page * size], total=len(d1), page=page, size=size) async def code1003(**sources): """ 删除指定用户 """ # --- check --- uuid = sources.get('uuid') if not uuid: return dict(code=1, details=f"something is wrong.") # --- check --- item = Global.mdb.get_one_by_id('UserInfo', uuid) if not item: return dict(code=2, details=f"something is wrong.") # --- remove --- Global.mdb.remove_one_by_id('UserInfo', uuid) return dict(code=0, data=uuid) async def code1004(**sources): """ 禁用指定用户 """ # --- check --- uuid = sources.get('uuid') if not uuid: return dict(code=1, details=f"something is wrong.") # --- check --- item = Global.mdb.get_one_by_id('UserInfo', uuid) if not item: return dict(code=2, details=f"something is wrong.") # --- update --- """ UserInfo: 用户信息表 UserInfo.username: 登录账号 UserInfo.password: 登录密码 (默认值:baosteel@2024) UserInfo.role_type: 角色类型 (1: 超级管理员 2: 普通管理员 3: 普通用户) (默认值:3) UserInfo.create_at: 创建时间 UserInfo.update_at: 修改时间 姓名 UserInfo.cid: 身份证号 手机号 性别 (0:未设置 1:男 2:女) UserInfo.department: 部门 UserInfo.wid: 工号 UserInfo.allow_at: 注册审批时间 UserInfo.state: 禁用状态 (0:激活 1:禁用) """ update_dict = dict() update_dict['state'] = 1 item = Global.mdb.update_one_by_id('UserInfo', uuid, update_dict, need_back=True) return dict(code=0, data=item) async def code1005(**sources): """ 启用指定用户 """ # --- check --- uuid = sources.get('uuid') if not uuid: return dict(code=1, details=f"something is wrong.") # --- check --- item = Global.mdb.get_one_by_id('UserInfo', uuid) if not item: return dict(code=2, details=f"something is wrong.") # --- update --- """ UserInfo: 用户信息表 UserInfo.username: 登录账号 UserInfo.password: 登录密码 (默认值:baosteel@2024) UserInfo.role_type: 角色类型 (1: 超级管理员 2: 普通管理员 3: 普通用户) (默认值:3) UserInfo.create_at: 创建时间 UserInfo.update_at: 修改时间 姓名 UserInfo.cid: 身份证号 手机号 性别 (0:未设置 1:男 2:女) UserInfo.department: 部门 UserInfo.wid: 工号 UserInfo.allow_at: 注册审批时间 UserInfo.state: 禁用状态 (0:激活 1:禁用) """ update_dict = dict() update_dict['state'] = 0 item = Global.mdb.update_one_by_id('UserInfo', uuid, update_dict, need_back=True) return dict(code=0, data=item) async def code1006(**sources): """ 获取指定用户详情 """ uuid = sources.get('uuid') if not uuid: return dict(code=1, details=f"something is wrong.") item = Global.mdb.get_one_by_id('UserInfo', uuid) if not item: return dict(code=2, details=f"something is wrong.") # --- fill --- """ UserInfo: 用户信息表 UserInfo.username: 登录账号 UserInfo.password: 登录密码 (默认值:baosteel@2024) UserInfo.role_type: 角色类型 (1: 超级管理员 2: 普通管理员 3: 普通用户) (默认值:3) UserInfo.create_at: 创建时间 UserInfo.update_at: 修改时间 姓名 UserInfo.cid: 身份证号 手机号 性别 (0:未设置 1:男 2:女) UserInfo.department: 部门 UserInfo.wid: 工号 UserInfo.allow_at: 注册审批时间 UserInfo.state: 禁用状态 (0:激活 1:禁用) """ item.pop('_id') item['uuid'] = uuid # --- check --- if not item.get('role_type'): item['role_type'] = '3' # --- check --- item['role_type'] = str(item.get('role_type')) item['sex'] = str(item.get('sex')) item['state'] = str(item.get('state')) return dict(code=0, data=item) async def code1007(**sources): """ 修改指定用户信息 """ # --- check --- uuid = sources.get('uuid') if not uuid: return dict(code=1, details=f"something is wrong.") # --- check again --- item = Global.mdb.get_one_by_id('UserInfo', uuid) if not item: return dict(code=2, details=f"something is wrong.") # --- update UserCamera by camera_uuid --- """ UserInfo: 用户信息表 UserInfo.username: 登录账号 UserInfo.password: 登录密码 (默认值:baosteel@2024) UserInfo.role_type: 角色类型 (1: 超级管理员 2: 普通管理员 3: 普通用户) (默认值:3) UserInfo.create_at: 创建时间 UserInfo.update_at: 修改时间 姓名 UserInfo.cid: 身份证号 手机号 性别 (0:未设置 1:男 2:女) UserInfo.department: 部门 UserInfo.wid: 工号 UserInfo.allow_at: 注册审批时间 UserInfo.state: 禁用状态 (0:激活 1:禁用) """ update_dict = dict() update_dict['update_at'] = methods.now_ts() if sources.get('username'): update_dict['username'] = sources.get('username') if sources.get('password'): update_dict['password'] = generate_password_hash(sources.get('password')) if sources.get('role_type'): update_dict['role_type'] = str(sources.get('role_type')) if sources.get('name'): update_dict['name'] = sources.get('name') if sources.get('cid'): update_dict['cid'] = sources.get('cid') if sources.get('phone'): update_dict['phone'] = sources.get('phone') if sources.get('sex'): update_dict['sex'] = str(sources.get('sex')) if sources.get('department'): update_dict['department'] = sources.get('department') if sources.get('wid'): update_dict['wid'] = sources.get('wid') # --- check --- if type(item.get('role_type')) == int: update_dict['role_type'] = str(item.get('role_type')) if type(item.get('sex')) == int: update_dict['sex'] = str(item.get('sex')) if type(item.get('state')) == int: update_dict['state'] = str(item.get('state')) item = Global.mdb.update_one_by_id('UserInfo', uuid, update_dict, need_back=True) return dict(code=0, data=item)