# update: 2024.7.14 """ 中冶宝钢智慧研究院接口 """ from urllib import parse import requests import time import traceback import sys import importlib # sys.path.append(r'C:\mccbts-ar-py\module-method-python') # for 10.10.61.22 sys.path.append(r'C:\mccbts-ar-py\module-method-python') # for 10.10.61.22 # sys.path.append(r'D:\mccbts-ar-py\module-method-python') # for 192.168.103.34 # methods = importlib.import_module(f"libraries.base_original") methods = importlib.import_module(f"xlib") class API(object): # def __init__(self, username='System001', password='Swdx@143', def __init__(self, username='', password=''): """ 接口地址: https://eaiops.mccbts.com.cn/cigpf-gateway/cigpf-auth/login 接口说明: 用户认证接口,mcctest/Mcc@1234 接口地址: http://ycpx04.mccbts.com.cn/cnki-gw/expert/expert/getExpertList 接口说明:获取专家列表 接口地址: http://10.8.0.151:8000/datachange/release/interface 接口说明:该接口可以获取用户数据、部门数据 """ # --- define --- self.cigpf_api_service = f"https://eaiops.mccbts.com.cn" self.cnki_api_service = f"http://ycpx04.mccbts.com.cn" self.datachange_api_service = f"http://10.8.0.151:8000" self.LIST_USER_TOKEN = 'FA460AD97660475BBD8E1CF31BC7A49B' self.LIST_DEPT_TOKEN = '4412B9724D254E38B57014CF99EB0312' def get_user_info(self, username, password): """ 获取用户信息 return { 'success': True, 'message': '操作成功!', 'code': 200, 'result': { 'id': '1399259233832144898', 'username': 'mcctest', 'realname': '测试账号', 'avatar': '/cigpfupload/05a5b1ab84a446c4a2e126da89b0c0af.png', 'birthday': None, 'sex': None, 'thirdId': '', 'thirdType': 'qt', 'email': None, 'phone': '18210635895', 'orgCode': '1', 'orgCodeTxt': None, 'status': 1, 'delFlag': 0, 'workNo': '99999', 'post': None, 'telephone': None, 'createBy': '', 'createTime': None, 'updateBy': '张宇', 'updateTime': '2023-11-07 06:00:09', 'activitiSync': 1, 'userIdentity': 2, 'departIds': '1000051C', 'relTenantIds': None, 'clientId': None, 'userType': '2' }, 'timestamp': 1699666081555 } """ url = f"{self.cigpf_api_service}/cigpf-gateway/cigpf-auth/login" # data = { # 'userName': 'mcctest', # 用户 # 'passWord': 'Mcc@1234', # 密码 # } data = { 'userName': username, # 用户 'passWord': password, # 密码 } # print(f"API.get_user_info:data: {data}") print('API.get_user_info.url:', url) response = requests.post(url, json=data) if response.status_code > 300: print('API.get_user_info.result:', response.status_code) print('API.get_user_info.detail:', response.text) return {} else: print('API.get_user_info.result:', response.status_code) return response.json() def get_expert_type_tree_list(self, q=''): """ 获取专家结构树 """ url = f"https://ycpx04.mccbts.com.cn/cnki-gw/expert/expertType/getExpertTypeTreeList" data = { 'keyword': q } response = requests.post(url, json=data) # print('API.test.url:', url) # print('API.test.data:', data) # print('API.test.code:', response.status_code) # print('API.test.text:', response.text) return response.json() def get_expert_list(self, page=1, rows=10): """ 获取专家列表 return { 'msg': '操作成功', 'code': 200, 'data': { 'total': 320, 'list': [ { 'id': 407880669122629, 'expertId': 'b46f7f7e-2883-4281-ba92-0c72fb30a668', 'expertCode': '40886', 'expertName': '冯浩川', 'expertLogo': '', 'sex': 1, 'birthday': '1974-08-12 00:00:00', 'education': None, 'degree': None, 'colleges': '', 'politicalOutlook': '', 'academicTitle': '', 'expertTypeId': 'a0846382-dc33-44f5-990d-ea042ab90cf4', 'expertType': '工程机械组', 'workUnitOrCurrentPosition': '', 'professionalAndTechnicalFields': '', 'mainSocialPositions': '', 'personalHonors': '', 'project': [], 'brief': '', 'createTime': '2023-04-17 15:30:43', 'createUserId': '6867b1b3-024f-4065-b991-3ebd48df7b27', 'type': '', 'companyId': '45b9801f-d2ff-4886-8076-7f7d8b403e37', 'company': '协力生产分公司', 'departmentId': '301e6551-dfd6-4e3e-b7f5-be36e14f84a1', 'department': '协力_机械维修中心_管理组' } ] } } """ url = f"{self.cnki_api_service}/cnki-gw/expert/expert/getExpertList" data = { 'pageDto': { 'page': page, 'rows': rows } } # print(f"API.get_user_info:data: {data}") print('API.get_expert_list.url:', url) response = requests.post(url, json=data) # --- check --- if response.status_code > 300: print('API.get_expert_list.result:', response.status_code) print('API.get_expert_list.detail:', response.text) return [] # --- check --- data = response.json() if data.get('code') != 200: return [] # --- check --- if not data: return [] # print(data.get('data').get('total')) return data.get('data').get('list') def get_staff_list(self): """ 获取员工列表 return [ { 'STATUS': '1', 'ORG_CODE': '1603605', 'UPDATE_BY': 'HR', 'CREATE_BY': 'HR', 'DEPART_NAME': '山西_天车二区_硅钢班', 'DEL_FLAG': '0', 'CREATE_TIME': 1659117651000, 'UPDATE_TIME': 1659117651000, 'ID': '1001G1AR', 'DEPART_NAME_ABBR': '山西_天车二区_硅钢班', 'C1': '1', 'PARENT_ID': '1001G1AM' } ] """ url = f"{self.datachange_api_service}/datachange/release/interface" headers = {'Chinasofti-Access-Token': self.LIST_USER_TOKEN} print('API.get_staff_list.url:', url) print('API.get_staff_list.headers:', headers) response = requests.get(url, headers=headers) # --- check --- if response.status_code > 300: print('API.get_staff_list.result:', response.status_code) print('API.get_staff_list.detail:', response.text) return '请求请求失败', f"code: {response.status_code}, response: {response.text}" # --- check --- data = response.json() if data.get('code') != 0: # print('API.get_staff_list.detail:', response.text) return '接口返回异常', f"请求地址: {url}, 请求方式: GET, 请求参数: {headers}, 返回结果: {response.text}" return data.get('data') def get_department_list(self): """ 获取部门列表 return [ { 'ORG_ID': '10012B4J', 'REALNAME': '张天勤', 'POST': '电焊工', 'PHONE': '15921009690', 'SEX': 1, 'UPDATE_BY': 'HR', 'DEL_FLAG': 0, 'UPDATE_TIME': 1661492148000, 'WORK_NO2': '983260', 'STATUS': '1', 'PKID': 'C1924F31EE850472E050080A5C00BCBA', 'ORG_CODE': '153420410', 'CREATE_BY': 'HR', 'CREATE_TIME': 1620205014000, 'ID': '10000RBZ', 'WORK_NO': '983260', 'BIRTHDAY': '1967-02-09' } ] """ url = f"{self.datachange_api_service}/datachange/release/interface" headers = {'Chinasofti-Access-Token': self.LIST_DEPT_TOKEN} print('API.get_department_list.url:', url) print('API.get_department_list.headers:', headers) response = requests.get(url, headers=headers) print('API.get_department_list.status_code:', response.status_code) print('API.get_department_list.text:', response.text) # --- check --- if response.status_code > 300: # print('API.get_department_list.result:', response.status_code) # print('API.get_department_list.detail:', response.text) return {'code': -1, 'data': response.text} # --- check --- data = response.json() if data.get('code') != 0: # print('API.get_department_list.code:', data.get('code')) # print('API.get_department_list.data:', response.text) return {'code': -1, 'data': response.text} return {'code': 0, 'data': data.get('data')} def test(self): """ """ url = f"http://ycpx04.mccbts.com.cn/cnki-gw/expert/expertType/getExpertTypeList" data = { 'keyword': 'xxx' } response = requests.post(url, json=data) print('API.test.url:', url) print('API.test.data:', data) print('API.test.code:', response.status_code) print('API.test.text:', response.text) # res = res.json() # data = { # 'count': len(res['data']), # 'data': res['data'] # } if __name__ == '__main__': # --- init --- api = API() # --- test --- items = api.get_staff_list() # items = api.get_department_list() print(items) # --- test --- # items = api.get_expert_list(page=1, rows=100000000) # for item in items: # if 'c0dbc699-1640-43c2-8459-ffadbaa2d966' not in item.get('expertTypeId'): # continue # print(item) # --- test --- # items = api.get_expert_type_tree_list() # print(items) # --- test --- 获取一级公司 # d1 = list() # items = api.get_department_list().get('data') # for item in items: # # if item.get('ORG_CODE') == '191': # # print(item) # # if '总部' in item.get('DEPART_NAME'): # # if '总部' in item.get('DEPART_NAME'): # # print(item) # if len(item.get('ORG_CODE')) == 3 and item.get('ORG_CODE')[0] == '1': # # if item.get('ORG_CODE') in ['3', '4', '6']: # # print(item) # # d1.append(item.get('DEPART_NAME')) # print(item.get('ORG_CODE'), item.get('DEPART_NAME')) # print(d1, len(d1)) # --- test --- # items = api.get_expert_type_tree_list().get('data') # print(items[0]['children']) # for i in items[0]['children']: # print(len(i.get('children')), i.get('count')) # if i.get('children'): # for j in i.get('children'): # print(len(j.get('children')), j.get('count'))