v1.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
  2. from werkzeug.security import check_password_hash
  3. from fastapi import APIRouter, Request, Response
  4. from fastapi import HTTPException, Header
  5. from fastapi.responses import JSONResponse
  6. from hub import methods, Global
  7. router = APIRouter()
  8. # mdb = Global.get_mongodb_client()
  9. # serializer = Serializer(secret_key='casper.com@2021', expires_in=86400 * 100) # debug
  10. serializer = Serializer(secret_key='casper.com@2021', expires_in=86400) # release
  11. def get_token_by_user(user):
  12. """生成token"""
  13. data = {
  14. 'id': user.get('uid'),
  15. 'username': user.get('username'),
  16. 'password': user.get('password'),
  17. }
  18. return serializer.dumps(data).decode('utf-8')
  19. @router.post('/v1/token')
  20. async def get_token(request: Request, response: Response):
  21. """获取令牌"""
  22. methods.debug_log('v1.get_token.28', f"#now at {methods.now_string()}, ip: {request.client.host}")
  23. # --- check key --- # debug
  24. # node_api = NodeApi()
  25. # if not node_api.verify_key():
  26. # return JSONResponse(status_code=401, content=dict(message='not verified!', code=1))
  27. # --- get params ---
  28. params = await request.json()
  29. username = params.get('username')
  30. password = params.get('password')
  31. user = Global.mdb.get_one('UserInfo', {'username': username})
  32. role_info = {
  33. '1': '超级管理员',
  34. '2': '普通管理员',
  35. '3': '普通用户',
  36. } # 1: 超级管理员 2: 普通管理员 3: 普通用户
  37. # if user:
  38. # role = Global.mdb.get_one_by_id('UserRole', user.get('role_id'))
  39. # role_name = role.get('role_name')
  40. # role_type = role.get('role_type')
  41. # else:
  42. # role_name = ''
  43. # role_type = 0
  44. # --- fail log---
  45. if not user:
  46. data = {
  47. 'username': username,
  48. 'is_login': 'Fail',
  49. 'role_type': user.get('role_type'),
  50. 'login_at': methods.now_ts(),
  51. 'login_ip': request.client.host,
  52. }
  53. Global.mdb.add('UserLoginLog', data)
  54. code = 2
  55. elif not check_password_hash(user['password'], password):
  56. data = {
  57. 'username': username,
  58. 'is_login': 'Fail',
  59. 'role_type': user.get('role_type'),
  60. 'login_at': methods.now_ts(),
  61. 'login_ip': request.client.host,
  62. }
  63. Global.mdb.add('UserLoginLog', data)
  64. code = 3
  65. else:
  66. data = {
  67. 'username': username,
  68. 'role_type': user.get('role_type'),
  69. 'is_login': 'Pass',
  70. 'login_at': methods.now_ts(),
  71. 'login_ip': request.client.host,
  72. }
  73. Global.mdb.add('UserLoginLog', data)
  74. code = 0
  75. # --- 登录失败 ---
  76. if code:
  77. return JSONResponse(status_code=401, content=dict(message='unauthorized access!', code=code))
  78. # --- make token ---
  79. data = {
  80. 'id': str(user['_id']),
  81. 'username': user['username'],
  82. 'password': user['password'],
  83. }
  84. token = serializer.dumps(data).decode('utf-8')
  85. content = dict(
  86. # message='authorization passed.',
  87. # username=username,
  88. # uid=str(user['_id']),
  89. # rid=user.get('role_id'), code=0
  90. code=0,
  91. message='authorization passed.',
  92. data={
  93. 'uid': str(user['_id']),
  94. 'username': username,
  95. 'role': role_info.get(str(user.get('role_type'))),
  96. 'role_type': user.get('role_type'),
  97. 'token': token,
  98. }
  99. )
  100. headers = {'authorization': token}
  101. return JSONResponse(content=content, headers=headers)
  102. async def login_required(request: Request):
  103. """
  104. 检查登录token
  105. methods.debug_log('token.login_required.115', f"#code: {code}")
  106. """
  107. # --- check ---
  108. # if request.method == 'POST':
  109. # sources = await request.json()
  110. # tag = sources.get('tag', 'v1')
  111. # code = int(sources.get('code'))
  112. # # methods.debug_log('token.login_required', f"m-107: code -> {code} | token -> {token}")
  113. # if not token and tag == 'v3' and code in [1102, 8201]:
  114. # methods.debug_log('token.login_required', f"m-103: code -> {code}")
  115. # superuser = Global.mdb.get_one('User', {'username': 'admin'})
  116. # superuser = Global.mdb.get_one('UserInfo', {'username': 'admin'})
  117. # return {
  118. # 'uid': str(superuser.get('_id')),
  119. # 'username': 'admin',
  120. # 'password': 'admin',
  121. # 'role_id': superuser.get('role_id'),
  122. # 'skip_is': True,
  123. # }
  124. # --- check --- todo 屏蔽token验证,正常情况下应放开
  125. # if not token:
  126. # # raise HTTPException(status_code=401, detail='unauthorized access!')
  127. # raise HTTPException(status_code=401, headers=dict(message='unauthorized access!', code='4'))
  128. # --- fill --- todo 屏蔽token验证,正常情况下应放开
  129. # try:
  130. # data = serializer.loads(token)
  131. # user = Global.mdb.get_one_by_id('User', data['id'])
  132. # role_id = user.get('role_id')
  133. # # role_acl = Global.mdb.get_one_by_id('UserRole', role_id).get('role_acl')
  134. # return {
  135. # 'uid': data['id'],
  136. # 'username': data['username'],
  137. # 'password': data['password'],
  138. # 'role_id': role_id,
  139. # }
  140. # except Exception as e:
  141. # # raise HTTPException(status_code=401, detail='unauthorized access!')
  142. # raise HTTPException(status_code=401, headers=dict(message='unauthorized access!', code='5'))
  143. # todo 正常情况下应屏蔽
  144. return {'skip_is': True}