from itsdangerous import TimedJSONWebSignatureSerializer as Serializer from werkzeug.security import check_password_hash from fastapi import APIRouter, Request, Response from fastapi import HTTPException, Header from fastapi.responses import JSONResponse from hub import methods, Global router = APIRouter() # mdb = Global.get_mongodb_client() # serializer = Serializer(secret_key='casper.com@2021', expires_in=86400 * 100) # debug serializer = Serializer(secret_key='casper.com@2021', expires_in=86400) # release def get_token_by_user(user): """生成token""" data = { 'id': user.get('uid'), 'username': user.get('username'), 'password': user.get('password'), } return serializer.dumps(data).decode('utf-8') @router.post('/v1/token') async def get_token(request: Request, response: Response): """获取令牌""" methods.debug_log('v1.get_token.28', f"#now at {methods.now_string()}, ip: {request.client.host}") # --- check key --- # debug # node_api = NodeApi() # if not node_api.verify_key(): # return JSONResponse(status_code=401, content=dict(message='not verified!', code=1)) # --- get params --- params = await request.json() username = params.get('username') password = params.get('password') user = Global.mdb.get_one('UserInfo', {'username': username}) role_info = { '1': '超级管理员', '2': '普通管理员', '3': '普通用户', } # 1: 超级管理员 2: 普通管理员 3: 普通用户 # if user: # role = Global.mdb.get_one_by_id('UserRole', user.get('role_id')) # role_name = role.get('role_name') # role_type = role.get('role_type') # else: # role_name = '' # role_type = 0 # --- fail log--- if not user: data = { 'username': username, 'is_login': 'Fail', 'role_type': user.get('role_type'), 'login_at': methods.now_ts(), 'login_ip': request.client.host, } Global.mdb.add('UserLoginLog', data) code = 2 elif not check_password_hash(user['password'], password): data = { 'username': username, 'is_login': 'Fail', 'role_type': user.get('role_type'), 'login_at': methods.now_ts(), 'login_ip': request.client.host, } Global.mdb.add('UserLoginLog', data) code = 3 else: data = { 'username': username, 'role_type': user.get('role_type'), 'is_login': 'Pass', 'login_at': methods.now_ts(), 'login_ip': request.client.host, } Global.mdb.add('UserLoginLog', data) code = 0 # --- 登录失败 --- if code: return JSONResponse(status_code=401, content=dict(message='unauthorized access!', code=code)) # --- make token --- data = { 'id': str(user['_id']), 'username': user['username'], 'password': user['password'], } token = serializer.dumps(data).decode('utf-8') content = dict( # message='authorization passed.', # username=username, # uid=str(user['_id']), # rid=user.get('role_id'), code=0 code=0, message='authorization passed.', data={ 'uid': str(user['_id']), 'username': username, 'role': role_info.get(str(user.get('role_type'))), 'role_type': user.get('role_type'), 'token': token, } ) headers = {'authorization': token} return JSONResponse(content=content, headers=headers) async def login_required(request: Request): """ 检查登录token methods.debug_log('token.login_required.115', f"#code: {code}") """ # --- check --- # if request.method == 'POST': # sources = await request.json() # tag = sources.get('tag', 'v1') # code = int(sources.get('code')) # # methods.debug_log('token.login_required', f"m-107: code -> {code} | token -> {token}") # if not token and tag == 'v3' and code in [1102, 8201]: # methods.debug_log('token.login_required', f"m-103: code -> {code}") # superuser = Global.mdb.get_one('User', {'username': 'admin'}) # superuser = Global.mdb.get_one('UserInfo', {'username': 'admin'}) # return { # 'uid': str(superuser.get('_id')), # 'username': 'admin', # 'password': 'admin', # 'role_id': superuser.get('role_id'), # 'skip_is': True, # } # --- check --- todo 屏蔽token验证,正常情况下应放开 # if not token: # # raise HTTPException(status_code=401, detail='unauthorized access!') # raise HTTPException(status_code=401, headers=dict(message='unauthorized access!', code='4')) # --- fill --- todo 屏蔽token验证,正常情况下应放开 # try: # data = serializer.loads(token) # user = Global.mdb.get_one_by_id('User', data['id']) # role_id = user.get('role_id') # # role_acl = Global.mdb.get_one_by_id('UserRole', role_id).get('role_acl') # return { # 'uid': data['id'], # 'username': data['username'], # 'password': data['password'], # 'role_id': role_id, # } # except Exception as e: # # raise HTTPException(status_code=401, detail='unauthorized access!') # raise HTTPException(status_code=401, headers=dict(message='unauthorized access!', code='5')) # todo 正常情况下应屏蔽 return {'skip_is': True}