123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
- from werkzeug.security import check_password_hash
- from fastapi import APIRouter, Request, Response
- from fastapi import HTTPException, Header
- from fastapi.responses import JSONResponse
- from hub import methods, Global
- router = APIRouter()
- # mdb = Global.get_mongodb_client()
- # serializer = Serializer(secret_key='casper.com@2021', expires_in=86400 * 100) # debug
- serializer = Serializer(secret_key='casper.com@2021', expires_in=86400) # release
- def get_token_by_user(user):
- """生成token"""
- data = {
- 'id': user.get('uid'),
- 'username': user.get('username'),
- 'password': user.get('password'),
- }
- return serializer.dumps(data).decode('utf-8')
- @router.post('/v1/token')
- async def get_token(request: Request, response: Response):
- """获取令牌"""
- methods.debug_log('v1.get_token.28', f"#now at {methods.now_string()}, ip: {request.client.host}")
- # --- check key --- # debug
- # node_api = NodeApi()
- # if not node_api.verify_key():
- # return JSONResponse(status_code=401, content=dict(message='not verified!', code=1))
- # --- get params ---
- params = await request.json()
- username = params.get('username')
- password = params.get('password')
- user = Global.mdb.get_one('UserInfo', {'username': username})
- role_info = {
- '1': '超级管理员',
- '2': '普通管理员',
- '3': '普通用户',
- } # 1: 超级管理员 2: 普通管理员 3: 普通用户
- # if user:
- # role = Global.mdb.get_one_by_id('UserRole', user.get('role_id'))
- # role_name = role.get('role_name')
- # role_type = role.get('role_type')
- # else:
- # role_name = ''
- # role_type = 0
- # --- fail log---
- if not user:
- data = {
- 'username': username,
- 'is_login': 'Fail',
- 'role_type': user.get('role_type'),
- 'login_at': methods.now_ts(),
- 'login_ip': request.client.host,
- }
- Global.mdb.add('UserLoginLog', data)
- code = 2
- elif not check_password_hash(user['password'], password):
- data = {
- 'username': username,
- 'is_login': 'Fail',
- 'role_type': user.get('role_type'),
- 'login_at': methods.now_ts(),
- 'login_ip': request.client.host,
- }
- Global.mdb.add('UserLoginLog', data)
- code = 3
- else:
- data = {
- 'username': username,
- 'role_type': user.get('role_type'),
- 'is_login': 'Pass',
- 'login_at': methods.now_ts(),
- 'login_ip': request.client.host,
- }
- Global.mdb.add('UserLoginLog', data)
- code = 0
- # --- 登录失败 ---
- if code:
- return JSONResponse(status_code=401, content=dict(message='unauthorized access!', code=code))
- # --- make token ---
- data = {
- 'id': str(user['_id']),
- 'username': user['username'],
- 'password': user['password'],
- }
- token = serializer.dumps(data).decode('utf-8')
- content = dict(
- # message='authorization passed.',
- # username=username,
- # uid=str(user['_id']),
- # rid=user.get('role_id'), code=0
- code=0,
- message='authorization passed.',
- data={
- 'uid': str(user['_id']),
- 'username': username,
- 'role': role_info.get(str(user.get('role_type'))),
- 'role_type': user.get('role_type'),
- 'token': token,
- }
- )
- headers = {'authorization': token}
- return JSONResponse(content=content, headers=headers)
- async def login_required(request: Request):
- """
- 检查登录token
- methods.debug_log('token.login_required.115', f"#code: {code}")
- """
- # --- check ---
- # if request.method == 'POST':
- # sources = await request.json()
- # tag = sources.get('tag', 'v1')
- # code = int(sources.get('code'))
- # # methods.debug_log('token.login_required', f"m-107: code -> {code} | token -> {token}")
- # if not token and tag == 'v3' and code in [1102, 8201]:
- # methods.debug_log('token.login_required', f"m-103: code -> {code}")
- # superuser = Global.mdb.get_one('User', {'username': 'admin'})
- # superuser = Global.mdb.get_one('UserInfo', {'username': 'admin'})
- # return {
- # 'uid': str(superuser.get('_id')),
- # 'username': 'admin',
- # 'password': 'admin',
- # 'role_id': superuser.get('role_id'),
- # 'skip_is': True,
- # }
- # --- check --- todo 屏蔽token验证,正常情况下应放开
- # if not token:
- # # raise HTTPException(status_code=401, detail='unauthorized access!')
- # raise HTTPException(status_code=401, headers=dict(message='unauthorized access!', code='4'))
- # --- fill --- todo 屏蔽token验证,正常情况下应放开
- # try:
- # data = serializer.loads(token)
- # user = Global.mdb.get_one_by_id('User', data['id'])
- # role_id = user.get('role_id')
- # # role_acl = Global.mdb.get_one_by_id('UserRole', role_id).get('role_acl')
- # return {
- # 'uid': data['id'],
- # 'username': data['username'],
- # 'password': data['password'],
- # 'role_id': role_id,
- # }
- # except Exception as e:
- # # raise HTTPException(status_code=401, detail='unauthorized access!')
- # raise HTTPException(status_code=401, headers=dict(message='unauthorized access!', code='5'))
- # todo 正常情况下应屏蔽
- return {'skip_is': True}
|