|
- from starlette.formparsers import MultiPartParser as StartletteMultiPartParser
- from fastapi import APIRouter, Request, Response, Depends
- from fastapi.responses import FileResponse
- from key import v1 as key_v1
- from hub import methods, Global
- import importlib
- router = APIRouter()
- config_dict = {
- 'v5': {
- # --- 关于渣包车 ---
- 2001: 'v5.hs2000.code_2001', # 获取全部渣包车状态接口
- 2002: 'v5.hs2000.code_2002', # 指定渣包车点火操作接口
- 2003: 'v5.hs2000.code_2003', # 指定渣包车熄火操作接口
- 2004: 'v5.hs2000.code_2004', # 指定渣包车建立远程操作权限(或是切换)接口
- # 2005: 'v5.hs2000.code_2005', # 创建并执行自动驾驶任务【已停用】
- 2101: 'v5.hs2000.code_2101', # 新增渣包车接口
- 2102: 'v5.hs2000.code_2102', # 修改渣包车接口
- 2103: 'v5.hs2000.code_2103', # 删除渣包车接口
- # --- 关于关于任务 ---
- 3001: 'v5.hs3000.code_3001', # 任务列表数据获取接口(分页)
- # 3002: 'v5.hs3000.code_3002', # 任务暂停接口【已停用】
- 3003: 'v5.hs3000.code_3003', # 任务取消消接口
- 3004: 'v5.hs3000.code_3004', # 任务创建并执行接口
- 3005: 'v5.hs3000.code_3005', # 获取指定任务信息
- # --- 关于场地数据 ---
- 4001: 'v5.hs4000.code_4001', # 获取全部渣包状态数据接口
- # 4002: 'v5.hs4000.code_4002', # todo 获取接渣口状态数据
- # 4003: 'v5.hs4000.code_4003', # todo 获取倒渣口状态数据
- # --- 关于告警 ---
- 5001: 'v5.hs5000.code_5001', # 获取告警数据列表接口
- }
- }
- methods_dict = {} # {<tag>: {<method>: <path>}}
- def _get_method_by_code(code, tag):
- """
- 通过code获取method(不调用不加载)
- """
- try:
- # --- check ---
- if tag not in methods_dict:
- methods_dict[tag] = {}
- # --- check ---
- if code in methods_dict.get(tag):
- return methods_dict.get(tag).get(code)
- # --- get method ---
- method_path = config_dict.get(tag).get(code)
- tag, file_name, method_name = method_path.split('.')
- script = importlib.import_module(f"api.{tag}.{file_name}")
- method = getattr(script, method_name)
- # --- set method ---
- methods_dict[tag][code] = method
- return method
- except Exception as exception:
- methods.debug_log('api._get_method_by_code.69', f"#exception: {exception.__class__.__name__}")
- methods.debug_log('api._get_method_by_code.69', f"#traceback: {methods.trace_log()}")
- @router.post('/{tag}/api')
- async def post(tag: str, request: Request, response: Response, user: dict = Depends(key_v1.login_required)):
- """"""
- # --- check ---
- if not user.get('skip_is'):
- response.headers['authorization'] = key_v1.get_token_by_user(user)
- # --- get sources ---
- sources = await request.json()
- # tag = sources.get('tag')
- code = int(sources.get('code'))
- page = sources.get('page', 1)
- size = sources.get('size', 10)
- ban_keys = ['code']
- # --- set global args ---
- sources = {key: val for key, val in sources.items() if key not in ban_keys}
- sources['g_user_id'] = user.get('uid')
- sources['g_user_name'] = user.get('username')
- sources['g_user_pass'] = user.get('password')
- sources['g_role_id'] = user.get('role_id')
- # --- call action ---
- try:
- run_at = methods.now_ts()
- method = _get_method_by_code(code=code, tag=tag)
- result = await method(**sources)
- methods.debug_log(f"api.post.101", f"#{tag}/{code}: use time {round(methods.now_ts() - run_at, 2)}s")
- # --- check ---
- if code in [
- # 8201, # 访客记录,列表
- ] and result.__class__.__name__ == 'dict':
- return dict(
- code=result.get('code'),
- data=result.get('data', [])[(page - 1) * size: page * size],
- page=page,
- size=size,
- total=len(result.get('data', [])),
- )
- elif result.__class__.__name__ in ['FileResponse', 'dict']:
- return result
- else:
- return dict(data=result, type=result.__class__.__name__)
- except Exception as exception:
- methods.debug_log('api.post.121', f"#tag: {tag}, #code: {code}")
- methods.debug_log('api.post.121', f"#exception: {exception.__class__.__name__}")
- methods.debug_log('api.post.121', f"#traceback: {methods.trace_log()}")
- return dict(code=-1, data=[],
- reason=f"{exception.__class__.__name__}",
- detail=f"{methods.trace_log()}")
- @router.put('/actions')
- async def upload(request: Request, user: dict = Depends(key_v1.login_required)):
- try:
- # --- get sources ---
- sources = dict()
- parser = StartletteMultiPartParser(request.headers, request.stream())
- params = await parser.parse()
- # sources['params'] = params
- # --- set sources --- fromdata列表处理
- for key, value in params.multi_items():
- # --- check list ---
- if key[-4:] == 'list' and key not in sources:
- sources[key] = list()
- if key[-4:] == 'list':
- sources[key].append(value)
- # --- make list ---
- else:
- if key in sources:
- sources[key] = [sources[key]]
- sources[key].append(value)
- else:
- sources[key] = value
- # --- clean ---
- tag = sources.get('tag', 'v1')
- code = int(sources.get('code'))
- del sources['tag']
- del sources['code']
- # --- set sources ---
- sources['g_user_id'] = user.get('uid')
- sources['g_user_name'] = user.get('username')
- sources['g_user_pass'] = user.get('password')
- sources['g_role_id'] = user.get('role_id')
- # --- set files ---
- files = dict()
- for key, value in sources.items():
- if value.__class__.__name__ == 'UploadFile':
- files[key] = value
- for key, value in files.items():
- del sources[key]
- sources['files'] = files
- # --- call action ---
- return await _get_method_by_code(code=code, tag=tag)(**sources)
- except Exception as exception:
- methods.debug_log('api.upload', f"m-299: exception | {exception.__class__.__name__}")
- methods.debug_log('api.upload', f"m-299: traceback | {methods.trace_log()}")
- return dict(code=-1,
- reason=f"{exception.__class__.__name__}",
- detail=f"{methods.trace_log()}")
- @router.get('/api')
- async def download(request: Request, response: Response, user: dict = Depends(key_v1.login_required)):
- try:
- parser = StartletteMultiPartParser(request.headers, request.stream())
- params = await parser.parse()
- instance_id = params.get('instance_id')
- report = Global.mdb.get_one_by_id('', instance_id)
- report_path = report.get('report') if report else ''
- if not report_path:
- return ''
- return FileResponse(report_path)
- except Exception as exception:
- methods.debug_log('api.download.201', f"#exception: {exception.__class__.__name__}")
- methods.debug_log('api.download.201', f"#traceback: {methods.trace_log()}")
- return dict(code=-1,
- reason=f"{exception.__class__.__name__}",
- detail=f"{methods.trace_log()}")
|