|
- from werkzeug.security import generate_password_hash
- from hub import methods, Global
- async def code1001(**sources):
- """
- 新增用户
- """
- # --- check ---
- if not sources.get('username'):
- return dict(code=1, details=f"something is wrong.")
- # --- check ---
- if sources.get('password'):
- sources['password'] = generate_password_hash(sources.get('password'))
- else:
- sources['password'] = generate_password_hash('baosteel@2024')
- # --- save ---
- """
- UserInfo: 用户信息表
- UserInfo.username: 登录账号
- UserInfo.password: 登录密码 (默认值:baosteel@2024)
- UserInfo.role_type: 角色类型 (1: 超级管理员 2: 普通管理员 3: 普通用户) (默认值:3)
- UserInfo.create_at: 创建时间
- UserInfo.update_at: 修改时间
- UserInfo.name: 姓名
- UserInfo.cid: 身份证号
- UserInfo.phone: 手机号
- UserInfo.sex: 性别 (0:未设置 1:男 2:女)
- UserInfo.department: 部门
- UserInfo.wid: 工号
- UserInfo.allow_at: 注册审批时间
- UserInfo.state: 禁用状态 (0:激活 1:禁用)
- """
- data = {
- 'username': sources.get('username'),
- 'password': sources.get('password'),
- 'role_type': str(sources.get('role_type', 3)),
- 'name': sources.get('name'),
- 'cid': sources.get('cid'),
- 'phone': sources.get('phone'),
- 'sex': str(sources.get('sex')),
- 'department': sources.get('department'),
- 'wid': sources.get('wid'),
- 'state': '0',
- 'allow_at': methods.now_ts(),
- }
- uuid = Global.mdb.add('UserInfo', data)
- return dict(code=0, data=uuid)
- async def code1002(**sources):
- """
- 查询用户列表
- """
- # --- check ---
- if not sources.get('page'):
- return dict(code=1, details=f"something is wrong.")
- elif not sources.get('size'):
- return dict(code=2, details=f"something is wrong.")
- # --- debug ---
- # methods.debug_log('code1000.code1002.82:', f"#role_type: {sources.get('role_type')}")
- # --- fill d1 ---
- d1 = list()
- page = sources.get('page', 1)
- size = sources.get('size', 10)
- for item in Global.mdb.get_all('UserInfo', sort_field=[('allow_at', -1)]):
- # --- check ---
- if sources.get('name') and sources.get('name') not in str(item.get('name')):
- continue
- # --- check ---
- if sources.get('phone') and str(sources.get('phone')) != str(item.get('phone')):
- continue
- # --- check ---
- if not item.get('role_type'):
- item['role_type'] = '3'
- # --- check ---
- if sources.get('role_type') and str(item.get('role_type')) not in sources.get('role_type'):
- continue
- # --- check ---
- item['role_type'] = str(item.get('role_type'))
- item['sex'] = str(item.get('sex'))
- item['state'] = str(item.get('state'))
- item['uuid'] = str(item.get('_id'))
- item.pop('_id')
- item.pop('password')
- d1.append(item)
- return dict(code=0, data=d1[(page - 1) * size: page * size], total=len(d1), page=page, size=size)
- async def code1003(**sources):
- """
- 删除指定用户
- """
- # --- check ---
- uuid = sources.get('uuid')
- if not uuid:
- return dict(code=1, details=f"something is wrong.")
- # --- check ---
- item = Global.mdb.get_one_by_id('UserInfo', uuid)
- if not item:
- return dict(code=2, details=f"something is wrong.")
- # --- remove ---
- Global.mdb.remove_one_by_id('UserInfo', uuid)
- return dict(code=0, data=uuid)
- async def code1004(**sources):
- """
- 禁用指定用户
- """
- # --- check ---
- uuid = sources.get('uuid')
- if not uuid:
- return dict(code=1, details=f"something is wrong.")
- # --- check ---
- item = Global.mdb.get_one_by_id('UserInfo', uuid)
- if not item:
- return dict(code=2, details=f"something is wrong.")
- # --- update ---
- """
- UserInfo: 用户信息表
- UserInfo.username: 登录账号
- UserInfo.password: 登录密码 (默认值:baosteel@2024)
- UserInfo.role_type: 角色类型 (1: 超级管理员 2: 普通管理员 3: 普通用户) (默认值:3)
- UserInfo.create_at: 创建时间
- UserInfo.update_at: 修改时间
- UserInfo.name: 姓名
- UserInfo.cid: 身份证号
- UserInfo.phone: 手机号
- UserInfo.sex: 性别 (0:未设置 1:男 2:女)
- UserInfo.department: 部门
- UserInfo.wid: 工号
- UserInfo.allow_at: 注册审批时间
- UserInfo.state: 禁用状态 (0:激活 1:禁用)
- """
- update_dict = dict()
- update_dict['state'] = 1
- item = Global.mdb.update_one_by_id('UserInfo', uuid, update_dict, need_back=True)
- return dict(code=0, data=item)
- async def code1005(**sources):
- """
- 启用指定用户
- """
- # --- check ---
- uuid = sources.get('uuid')
- if not uuid:
- return dict(code=1, details=f"something is wrong.")
- # --- check ---
- item = Global.mdb.get_one_by_id('UserInfo', uuid)
- if not item:
- return dict(code=2, details=f"something is wrong.")
- # --- update ---
- """
- UserInfo: 用户信息表
- UserInfo.username: 登录账号
- UserInfo.password: 登录密码 (默认值:baosteel@2024)
- UserInfo.role_type: 角色类型 (1: 超级管理员 2: 普通管理员 3: 普通用户) (默认值:3)
- UserInfo.create_at: 创建时间
- UserInfo.update_at: 修改时间
- UserInfo.name: 姓名
- UserInfo.cid: 身份证号
- UserInfo.phone: 手机号
- UserInfo.sex: 性别 (0:未设置 1:男 2:女)
- UserInfo.department: 部门
- UserInfo.wid: 工号
- UserInfo.allow_at: 注册审批时间
- UserInfo.state: 禁用状态 (0:激活 1:禁用)
- """
- update_dict = dict()
- update_dict['state'] = 0
- item = Global.mdb.update_one_by_id('UserInfo', uuid, update_dict, need_back=True)
- return dict(code=0, data=item)
- async def code1006(**sources):
- """
- 获取指定用户详情
- """
- uuid = sources.get('uuid')
- if not uuid:
- return dict(code=1, details=f"something is wrong.")
- item = Global.mdb.get_one_by_id('UserInfo', uuid)
- if not item:
- return dict(code=2, details=f"something is wrong.")
- # --- fill ---
- """
- UserInfo: 用户信息表
- UserInfo.username: 登录账号
- UserInfo.password: 登录密码 (默认值:baosteel@2024)
- UserInfo.role_type: 角色类型 (1: 超级管理员 2: 普通管理员 3: 普通用户) (默认值:3)
- UserInfo.create_at: 创建时间
- UserInfo.update_at: 修改时间
- UserInfo.name: 姓名
- UserInfo.cid: 身份证号
- UserInfo.phone: 手机号
- UserInfo.sex: 性别 (0:未设置 1:男 2:女)
- UserInfo.department: 部门
- UserInfo.wid: 工号
- UserInfo.allow_at: 注册审批时间
- UserInfo.state: 禁用状态 (0:激活 1:禁用)
- """
- item.pop('_id')
- item['uuid'] = uuid
- # --- check ---
- if not item.get('role_type'):
- item['role_type'] = '3'
- # --- check ---
- item['role_type'] = str(item.get('role_type'))
- item['sex'] = str(item.get('sex'))
- item['state'] = str(item.get('state'))
- return dict(code=0, data=item)
- async def code1007(**sources):
- """
- 修改指定用户信息
- """
- # --- check ---
- uuid = sources.get('uuid')
- if not uuid:
- return dict(code=1, details=f"something is wrong.")
- # --- check again ---
- item = Global.mdb.get_one_by_id('UserInfo', uuid)
- if not item:
- return dict(code=2, details=f"something is wrong.")
- # --- update UserCamera by camera_uuid ---
- """
- UserInfo: 用户信息表
- UserInfo.username: 登录账号
- UserInfo.password: 登录密码 (默认值:baosteel@2024)
- UserInfo.role_type: 角色类型 (1: 超级管理员 2: 普通管理员 3: 普通用户) (默认值:3)
- UserInfo.create_at: 创建时间
- UserInfo.update_at: 修改时间
- UserInfo.name: 姓名
- UserInfo.cid: 身份证号
- UserInfo.phone: 手机号
- UserInfo.sex: 性别 (0:未设置 1:男 2:女)
- UserInfo.department: 部门
- UserInfo.wid: 工号
- UserInfo.allow_at: 注册审批时间
- UserInfo.state: 禁用状态 (0:激活 1:禁用)
- """
- update_dict = dict()
- update_dict['update_at'] = methods.now_ts()
- if sources.get('username'):
- update_dict['username'] = sources.get('username')
- if sources.get('password'):
- update_dict['password'] = generate_password_hash(sources.get('password'))
- if sources.get('role_type'):
- update_dict['role_type'] = str(sources.get('role_type'))
- if sources.get('name'):
- update_dict['name'] = sources.get('name')
- if sources.get('cid'):
- update_dict['cid'] = sources.get('cid')
- if sources.get('phone'):
- update_dict['phone'] = sources.get('phone')
- if sources.get('sex'):
- update_dict['sex'] = str(sources.get('sex'))
- if sources.get('department'):
- update_dict['department'] = sources.get('department')
- if sources.get('wid'):
- update_dict['wid'] = sources.get('wid')
- # --- check ---
- if type(item.get('role_type')) == int:
- update_dict['role_type'] = str(item.get('role_type'))
- if type(item.get('sex')) == int:
- update_dict['sex'] = str(item.get('sex'))
- if type(item.get('state')) == int:
- update_dict['state'] = str(item.get('state'))
- item = Global.mdb.update_one_by_id('UserInfo', uuid, update_dict, need_back=True)
- return dict(code=0, data=item)
|