from starlette.formparsers import MultiPartParser as StartletteMultiPartParser from fastapi import APIRouter, Request, Response, Depends from fastapi.responses import FileResponse from key import v1 as key_v1 from hub import methods, Global import importlib router = APIRouter() config_dict = { 'v6': { # --- 用户管理 --- 1001: 'v6.code1000.code1001', # 新增用户 1002: 'v6.code1000.code1002', # 查询用户列表(分页) 1003: 'v6.code1000.code1003', # 删除指定用户 1004: 'v6.code1000.code1004', # 禁用指定用户 1005: 'v6.code1000.code1005', # 启用指定用户 1006: 'v6.code1000.code1006', # 获取指定用户详情 1007: 'v6.code1000.code1007', # 修改指定用户信息 # --- 车辆管理 --- 2001: 'v6.code2000.code2001', # 新增车辆 2002: 'v6.code2000.code2002', # 查询车辆列表(分页) 2003: 'v6.code2000.code2003', # 修改指定车辆信息 2004: 'v6.code2000.code2004', # 禁止指定车辆远程操作 2005: 'v6.code2000.code2005', # 允许指定车辆远程操作 2006: 'v6.code2000.code2006', # 删除指定作业车辆 2007: 'v6.code2000.code2007', # 获取指定作业车辆详情 # --- 关于日志 --- 3001: 'v6.code3000.code3001', # 查询驾驶人员操作记录列表 3002: 'v6.code3000.code3002', # 下载指定驾驶人员操作日志 } } methods_dict = {} # {: {: }} def _get_method_by_code(code, tag): """ 通过code获取method(不调用不加载) """ # --- check --- if tag not in methods_dict: methods_dict[tag] = {} # --- check --- if code in methods_dict.get(tag): return methods_dict.get(tag).get(code) # --- get method --- method_path = config_dict.get(tag).get(code) tag, file_name, method_name = method_path.split('.') script = importlib.import_module(f"api.{tag}.{file_name}") method = getattr(script, method_name) # --- set method --- methods_dict[tag][code] = method return method @router.post('/{tag}/api') async def post(tag: str, request: Request, response: Response, user: dict = Depends(key_v1.login_required)): """""" # --- check --- if not user.get('skip_is'): response.headers['authorization'] = key_v1.get_token_by_user(user) # --- get sources --- sources = await request.json() code = int(sources.get('code')) page = sources.get('page', 1) size = sources.get('size', 10) ban_keys = ['code'] # --- set global args --- sources = {key: val for key, val in sources.items() if key not in ban_keys} sources['g_user_id'] = user.get('uid') sources['g_user_name'] = user.get('username') sources['g_user_pass'] = user.get('password') sources['g_role_id'] = user.get('role_id') # --- call action --- try: run_at = methods.now_ts() method = _get_method_by_code(code=code, tag=tag) result = await method(**sources) methods.debug_log(f"api.post.95", f"#{tag}/{code}: use time {round(methods.now_ts() - run_at, 2)}s") # --- check --- if code in [ # 8201, # 访客记录,列表 ] and result.__class__.__name__ == 'dict': return dict( code=result.get('code'), data=result.get('data', [])[(page - 1) * size: page * size], page=page, size=size, total=len(result.get('data', [])), ) elif result.__class__.__name__ in ['FileResponse', 'dict']: return result else: return dict(data=result, type=result.__class__.__name__) except Exception as exception: methods.debug_log('api.post.115', f"#tag: {tag}, #code: {code}") methods.debug_log('api.post.115', f"#exception: {exception}") methods.debug_log('api.post.115', f"#traceback: {methods.trace_log()}") return dict(code=-1, data=[], message=f"something is wrong. [{exception.__class__.__name__}]", details=f"{methods.trace_log()}") @router.put('/actions') async def upload(request: Request, user: dict = Depends(key_v1.login_required)): try: # --- get sources --- sources = dict() parser = StartletteMultiPartParser(request.headers, request.stream()) params = await parser.parse() # sources['params'] = params # --- set sources --- fromdata列表处理 for key, value in params.multi_items(): # methods.debug_log('api.upload.137', f"#key: {key}, #value: {value}") # --- check list --- if key[-4:] == 'list' and key not in sources: sources[key] = list() if key[-4:] == 'list': sources[key].append(value) # --- make list --- else: if key in sources: sources[key] = [sources[key]] sources[key].append(value) else: sources[key] = value # methods.debug_log('actions.put', f"m-201: sources: {sources}") # --- clean --- tag = sources.get('tag', 'v1') code = int(sources.get('code')) del sources['tag'] del sources['code'] # --- set sources --- sources['g_user_id'] = user.get('uid') sources['g_user_name'] = user.get('username') sources['g_user_pass'] = user.get('password') sources['g_role_id'] = user.get('role_id') # --- set files --- files = dict() for key, value in sources.items(): if value.__class__.__name__ == 'UploadFile': files[key] = value for key, value in files.items(): del sources[key] sources['files'] = files # --- call action --- return await _get_method_by_code(code=code, tag=tag)(**sources) except Exception as exception: methods.debug_log('api.upload.176', f"#exception: {exception}") methods.debug_log('api.upload.176', f"#traceback: {methods.trace_log()}") return dict(code=-1, message=f"something is wrong. [{exception.__class__.__name__}]", details=f"{methods.trace_log()}") @router.get('/{tag}/api') async def download(request: Request, response: Response, tag: str): try: # --- get --- params = request.query_params methods.debug_log('api.download|185', f"#code: {params.get('code')} | {type(params.get('code'))}") methods.debug_log('api.download|185', f"#tag: {tag}") methods.debug_log('api.download|185', f"#params: {params}") method = _get_method_by_code(code=int(params.get('code')), tag=tag) result = await method(**params) # --- 是否弹框 --- if result.get('code') == 0: if True: return FileResponse(result.get('file_path'), media_type="application/octet-stream", filename=result.get('file_name')) else: return FileResponse(result.get('file_path')) else: return result except Exception as exception: methods.debug_log('api.download|200', f"#exception: {exception}") methods.debug_log('api.download|200', f"#traceback: {methods.trace_log()}") return dict(code=-1, message=f"something is wrong. [{exception.__class__.__name__}]", details=f"{methods.trace_log()}")