v1.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
  2. from werkzeug.security import check_password_hash
  3. from fastapi import APIRouter, Request, Response
  4. from fastapi import HTTPException, Header
  5. from fastapi.responses import JSONResponse
  6. from hub import methods, Global
  7. router = APIRouter()
  8. # mdb = Global.get_mongodb_client()
  9. # serializer = Serializer(secret_key='casper.com@2021', expires_in=86400 * 100) # debug
  10. serializer = Serializer(secret_key='casper.com@2021', expires_in=86400) # release
  11. def get_token_by_user(user):
  12. """生成token"""
  13. data = {
  14. 'id': user.get('uid'),
  15. 'username': user.get('username'),
  16. 'password': user.get('password'),
  17. }
  18. return serializer.dumps(data).decode('utf-8')
  19. @router.post('/v1/token')
  20. async def get_token(request: Request, response: Response):
  21. """获取令牌"""
  22. methods.debug_log('v1.get_token.28', f"#now at {methods.now_string()}, ip: {request.client.host}")
  23. # --- check key --- # debug
  24. # node_api = NodeApi()
  25. # if not node_api.verify_key():
  26. # return JSONResponse(status_code=401, content=dict(message='not verified!', code=1))
  27. # --- get params ---
  28. params = await request.json()
  29. username = params.get('username')
  30. password = params.get('password')
  31. user = Global.mdb.get_one('UserInfo', {'username': username})
  32. role_info = {
  33. '1': '超级管理员',
  34. '2': '普通管理员',
  35. '3': '普通用户',
  36. } # 1: 超级管理员 2: 普通管理员 3: 普通用户
  37. # if user:
  38. # role = Global.mdb.get_one_by_id('UserRole', user.get('role_id'))
  39. # role_name = role.get('role_name')
  40. # role_type = role.get('role_type')
  41. # else:
  42. # role_name = ''
  43. # role_type = 0
  44. # --- fail log---
  45. if not user:
  46. data = {
  47. 'username': username,
  48. 'is_login': 'Fail',
  49. 'role_type': user.get('role_type'),
  50. 'login_at': methods.now_ts(),
  51. 'login_ip': request.client.host,
  52. }
  53. Global.mdb.add('UserLoginLog', data)
  54. code = 2
  55. elif not check_password_hash(user['password'], password):
  56. data = {
  57. 'username': username,
  58. 'is_login': 'Fail',
  59. 'role_type': user.get('role_type'),
  60. 'login_at': methods.now_ts(),
  61. 'login_ip': request.client.host,
  62. }
  63. Global.mdb.add('UserLoginLog', data)
  64. code = 3
  65. elif int(user.get('state')) == 1:
  66. code = 4
  67. else:
  68. data = {
  69. 'username': username,
  70. 'role_type': user.get('role_type'),
  71. 'is_login': 'Pass',
  72. 'login_at': methods.now_ts(),
  73. 'login_ip': request.client.host,
  74. }
  75. Global.mdb.add('UserLoginLog', data)
  76. code = 0
  77. # --- 登录失败 ---
  78. if code:
  79. return JSONResponse(status_code=401, content=dict(message='unauthorized access!', code=code))
  80. # --- make token ---
  81. data = {
  82. 'id': str(user['_id']),
  83. 'username': user['username'],
  84. 'password': user['password'],
  85. }
  86. token = serializer.dumps(data).decode('utf-8')
  87. content = dict(
  88. code=0,
  89. message='authorization passed.',
  90. data={
  91. 'uid': str(user['_id']),
  92. 'username': username,
  93. 'role': role_info.get(str(user.get('role_type'))),
  94. 'role_type': user.get('role_type'),
  95. 'token': token,
  96. }
  97. )
  98. headers = {'authorization': token}
  99. return JSONResponse(content=content, headers=headers)
  100. async def login_required(request: Request):
  101. """
  102. 检查登录token
  103. methods.debug_log('token.login_required.115', f"#code: {code}")
  104. """
  105. # --- check ---
  106. # if request.method == 'POST':
  107. # sources = await request.json()
  108. # tag = sources.get('tag', 'v1')
  109. # code = int(sources.get('code'))
  110. # # methods.debug_log('token.login_required', f"m-107: code -> {code} | token -> {token}")
  111. # if not token and tag == 'v3' and code in [1102, 8201]:
  112. # methods.debug_log('token.login_required', f"m-103: code -> {code}")
  113. # superuser = Global.mdb.get_one('User', {'username': 'admin'})
  114. # superuser = Global.mdb.get_one('UserInfo', {'username': 'admin'})
  115. # return {
  116. # 'uid': str(superuser.get('_id')),
  117. # 'username': 'admin',
  118. # 'password': 'admin',
  119. # 'role_id': superuser.get('role_id'),
  120. # 'skip_is': True,
  121. # }
  122. # --- check --- todo 屏蔽token验证,正常情况下应放开
  123. # if not token:
  124. # # raise HTTPException(status_code=401, detail='unauthorized access!')
  125. # raise HTTPException(status_code=401, headers=dict(message='unauthorized access!', code='4'))
  126. # --- fill --- todo 屏蔽token验证,正常情况下应放开
  127. # try:
  128. # data = serializer.loads(token)
  129. # user = Global.mdb.get_one_by_id('User', data['id'])
  130. # role_id = user.get('role_id')
  131. # # role_acl = Global.mdb.get_one_by_id('UserRole', role_id).get('role_acl')
  132. # return {
  133. # 'uid': data['id'],
  134. # 'username': data['username'],
  135. # 'password': data['password'],
  136. # 'role_id': role_id,
  137. # }
  138. # except Exception as e:
  139. # # raise HTTPException(status_code=401, detail='unauthorized access!')
  140. # raise HTTPException(status_code=401, headers=dict(message='unauthorized access!', code='5'))
  141. # todo 正常情况下应屏蔽
  142. return {'skip_is': True}