123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
- from werkzeug.security import check_password_hash
- from fastapi import APIRouter, Request, Response
- from fastapi import HTTPException, Header
- from fastapi.responses import JSONResponse
- from hub import methods, Global
- router = APIRouter()
- serializer = Serializer(secret_key='casper.com@2021', expires_in=86400)
- def get_token_by_user(user):
- """生成token"""
- data = {
- 'id': user.get('uid'),
- 'username': user.get('username'),
- 'password': user.get('password'),
- }
- return serializer.dumps(data).decode('utf-8')
- @router.post('/v1/token')
- async def get_token(request: Request, response: Response):
- """获取令牌"""
- methods.debug_log('v1.get_token.28', f"#now at {methods.now_string()}, ip: {request.client.host}")
-
-
-
-
-
- params = await request.json()
- username = params.get('username')
- password = params.get('password')
-
- user = Global.mdb.get_one('UserInfo', {'username': username})
- role_info = {1: '超级管理员'}
-
-
-
-
-
-
-
-
- if not user:
- data = {
- 'username': username,
- 'is_login': 'Fail',
- 'role_type': user.get('role_type'),
- 'login_at': methods.now_ts(),
- 'login_ip': request.client.host,
- }
- Global.mdb.add('UserLoginLog', data)
- code = 2
- elif not check_password_hash(user['password'], password):
- data = {
- 'username': username,
- 'is_login': 'Fail',
- 'role_type': user.get('role_type'),
- 'login_at': methods.now_ts(),
- 'login_ip': request.client.host,
- }
- Global.mdb.add('UserLoginLog', data)
- code = 3
- else:
- data = {
- 'username': username,
- 'role_type': user.get('role_type'),
- 'is_login': 'Pass',
- 'login_at': methods.now_ts(),
- 'login_ip': request.client.host,
- }
- Global.mdb.add('UserLoginLog', data)
- code = 0
-
- if code:
- return JSONResponse(status_code=401, content=dict(message='unauthorized access!', code=code))
-
- data = {
- 'id': str(user['_id']),
- 'username': user['username'],
- 'password': user['password'],
- }
- token = serializer.dumps(data).decode('utf-8')
- content = dict(
-
-
-
-
- code=0,
- message='authorization passed.',
- data={
- 'uid': str(user['_id']),
- 'username': username,
- 'role': role_info.get(user.get('role_type')),
- 'role_type': user.get('role_type'),
- 'token': token,
- }
- )
- headers = {'authorization': token}
- return JSONResponse(content=content, headers=headers)
- async def login_required(request: Request):
- """
- 检查登录token
- methods.debug_log('token.login_required.115', f"#code: {code}")
- """
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- return {'skip_is': True}
|