# update: 2021-6-28-19 import requests import logging import time import copy import re requests.packages.urllib3.disable_warnings() TEMPLATES = ['PCI Quarterly External Scan', 'Host Discovery', 'WannaCry Ransomware', 'Intel AMT Security Bypass', 'Basic Network Scan', 'Credentialed Patch Audit', 'Web Application Tests', 'Malware Scan', 'Mobile Device Scan', 'MDM Config Audit', 'Policy Compliance Auditing', 'Internal PCI Network Scan', 'Offline Config Audit', 'Audit Cloud Infrastructure', 'SCAP and OVAL Auditing', 'Custom Scan', 'Bash Shellshock Detection', 'GHOST (glibc) Detection', 'DROWN Detection', 'Badlock Detection', 'Shadow Brokers Scan', 'Spectre and Meltdown', 'Advanced Scan', 'Advanced Dynamic Scan'] DefaultPolicyForAdvancedScan = { "uuid": "ad629e16-03b6-8c1d-cef6-ef8c9dd3c658d24bd260ef5f9e66", "plugins": { "SMTP problems": {"status": "enabled"}, "Backdoors": {"status": "enabled"}, "Ubuntu Local Security Checks": {"status": "enabled"}, "Gentoo Local Security Checks": {"status": "enabled"}, "Oracle Linux Local Security Checks": {"status": "enabled"}, "RPC": {"status": "enabled"}, "Gain a shell remotely": {"status": "enabled"}, "Service detection": {"status": "enabled"}, "DNS": {"status": "enabled"}, "Mandriva Local Security Checks": {"status": "enabled"}, "Junos Local Security Checks": {"status": "enabled"}, "Misc.": {"status": "enabled"}, "FTP": {"status": "enabled"}, "Slackware Local Security Checks": {"status": "enabled"}, "Default Unix Accounts": {"status": "enabled"}, "AIX Local Security Checks": {"status": "enabled"}, "SNMP": {"status": "enabled"}, "OracleVM Local Security Checks": {"status": "enabled"}, "CGI abuses": {"status": "enabled"}, "Settings": {"status": "enabled"}, "CISCO": {"status": "enabled"}, "Firewalls": {"status": "enabled"}, "Databases": {"status": "enabled"}, "Debian Local Security Checks": {"status": "enabled"}, "Fedora Local Security Checks": {"status": "enabled"}, "Netware": {"status": "enabled"}, "Huawei Local Security Checks": {"status": "enabled"}, "Windows : User management": {"status": "enabled"}, "VMware ESX Local Security Checks": {"status": "enabled"}, "Virtuozzo Local Security Checks": {"status": "enabled"}, "CentOS Local Security Checks": {"status": "enabled"}, "Peer-To-Peer File Sharing": {"status": "enabled"}, "NewStart CGSL Local Security Checks": {"status": "enabled"}, "General": {"status": "enabled"}, "Policy Compliance": {"status": "enabled"}, "Amazon Linux Local Security Checks": {"status": "enabled"}, "Solaris Local Security Checks": {"status": "enabled"}, "F5 Networks Local Security Checks": {"status": "enabled"}, "Denial of Service": {"status": "enabled"}, "Windows : Microsoft Bulletins": {"status": "enabled"}, "SuSE Local Security Checks": {"status": "enabled"}, "Palo Alto Local Security Checks": {"status": "enabled"}, "Red Hat Local Security Checks": {"status": "enabled"}, "PhotonOS Local Security Checks": {"status": "enabled"}, "HP-UX Local Security Checks": {"status": "enabled"}, "CGI abuses : XSS": {"status": "enabled"}, "FreeBSD Local Security Checks": {"status": "enabled"}, "Windows": {"status": "enabled"}, "Scientific Linux Local Security Checks": {"status": "enabled"}, "MacOS X Local Security Checks": {"status": "enabled"}, "Web Servers": {"status": "enabled"}, "SCADA": {"status": "enabled"} }, "credentials": {"add": {}, "edit": {}, "delete": []}, "settings": { "patch_audit_over_telnet": "no", "patch_audit_over_rsh": "no", "patch_audit_over_rexec": "no", "snmp_port": "161", "additional_snmp_port1": "161", "additional_snmp_port2": "161", "additional_snmp_port3": "161", "http_login_method": "POST", "http_reauth_delay": "", "http_login_max_redir": "0", "http_login_invert_auth_regex": "no", "http_login_auth_regex_on_headers": "no", "http_login_auth_regex_nocase": "no", "never_send_win_creds_in_the_clear": "yes", "dont_use_ntlmv1": "yes", "start_remote_registry": "no", "enable_admin_shares": "no", "ssh_known_hosts": "", "ssh_port": "22", "ssh_client_banner": "OpenSSH_5.0", "attempt_least_privilege": "no", "region_dfw_pref_name": "yes", "region_ord_pref_name": "yes", "region_iad_pref_name": "yes", "region_lon_pref_name": "yes", "region_syd_pref_name": "yes", "region_hkg_pref_name": "yes", "microsoft_azure_subscriptions_ids": "", "aws_ui_region_type": "Rest of the World", "aws_us_east_1": "", "aws_us_east_2": "", "aws_us_west_1": "", "aws_us_west_2": "", "aws_ca_central_1": "", "aws_eu_west_1": "", "aws_eu_west_2": "", "aws_eu_west_3": "", "aws_eu_central_1": "", "aws_eu_north_1": "", "aws_ap_east_1": "", "aws_ap_northeast_1": "", "aws_ap_northeast_2": "", "aws_ap_northeast_3": "", "aws_ap_southeast_1": "", "aws_ap_southeast_2": "", "aws_ap_south_1": "", "aws_me_south_1": "", "aws_sa_east_1": "", "aws_use_https": "yes", "aws_verify_ssl": "yes", "log_whole_attack": "no", "enable_plugin_debugging": "no", "audit_trail": "use_scanner_default", "include_kb": "use_scanner_default", "enable_plugin_list": "no", "custom_find_filepath_exclusions": "", "custom_find_filesystem_exclusions": "", "reduce_connections_on_congestion": "no", "network_receive_timeout": "5", "max_checks_per_host": "5", "max_hosts_per_scan": "100", "max_simult_tcp_sessions_per_host": "", "max_simult_tcp_sessions_per_scan": "", "safe_checks": "yes", "stop_scan_on_disconnect": "no", "slice_network_addresses": "no", "allow_post_scan_editing": "yes", "reverse_lookup": "no", "log_live_hosts": "no", "display_unreachable_hosts": "no", "report_verbosity": "Normal", "report_superseded_patches": "yes", "silent_dependencies": "yes", "scan_malware": "no", "samr_enumeration": "yes", "adsi_query": "yes", "wmi_query": "yes", "rid_brute_forcing": "no", "request_windows_domain_info": "no", "scan_webapps": "no", "test_default_oracle_accounts": "no", "provided_creds_only": "yes", "smtp_domain": "example.com", "smtp_from": "nobody@example.com", "smtp_to": "postmaster@[AUTO_REPLACED_IP]", "av_grace_period": "0", "report_paranoia": "Normal", "thorough_tests": "no", "svc_detection_on_all_ports": "yes", "detect_ssl": "yes", "ssl_prob_ports": "Known SSL ports", "cert_expiry_warning_days": "60", "enumerate_all_ciphers": "yes", "check_crl": "no", "tcp_scanner": "no", "tcp_firewall_detection": "Automatic (normal)", "syn_scanner": "yes", "syn_firewall_detection": "Automatic (normal)", "udp_scanner": "no", "ssh_netstat_scanner": "yes", "wmi_netstat_scanner": "yes", "snmp_scanner": "yes", "only_portscan_if_enum_failed": "yes", "verify_open_ports": "no", "unscanned_closed": "no", "portscan_range": "default", "wol_mac_addresses": "", "wol_wait_time": "5", "scan_network_printers": "no", "scan_netware_hosts": "no", "scan_ot_devices": "no", "ping_the_remote_host": "yes", "arp_ping": "yes", "tcp_ping": "yes", "tcp_ping_dest_ports": "built-in", "icmp_ping": "yes", "icmp_unreach_means_host_down": "no", "icmp_ping_retries": "2", "udp_ping": "no", "test_local_nessus_host": "yes", "fast_network_discovery": "no", "name": "test-1122", "description": "" } } class Api(object): def __init__(self, service_url='https://47.104.160.37:8001', username='admin', password='admin'): self.debugs = str() self.errors = str() self.urls = list() self.service_url = service_url self.headers = dict() self.headers['content-type'] = 'application/json' self.headers['X-API-Token'] = self.get_api_token() self.create_super_user(username, password) token = self.get_session(username, password) self.headers['X-Cookie'] = f'token={token}' def debug(self, **kwargs): for k, v in kwargs.items(): self.debugs += str(k) + ':' + str(v) + ';' def error(self, **kwargs): for k, v in kwargs.items(): self.errors += str(k) + ':' + str(v) + ';' def get_policy_id_by_name(self, name): """根据策略名称获取策略id""" for policy in self.get_policies(): if policy['name'] != name: continue return policy['id'] return '' def get_template_id_by_title(self, title): """根据策略类型标题获取策略类型id""" if title not in TEMPLATES: return '' for template in self.get_templates(): if template['title'] != title: continue return template['uuid'] return '' def scan_is_completed(self, scan_id): """检查执行状态""" return self.get_scan_status(scan_id) == 'completed' def create_super_user(self, username, password): """创建超级管理员用户""" # --- 是否为register --- if self.server_status() != 'register': return self.create_user(username, password) self.server_restart() # --- 等待loading结束 --- while True: try: if self.server_status() == 'ready': break print(self.server_status()) time.sleep(5) except Exception as e: print(e.__class__.__name__) time.sleep(5) def create_user(self, username, password): """ url: https://:8834/users """ if 'register' != self.server_status(): return 0 url = f'{self.service_url}/users' data = { 'username': username, 'password': password, 'permissions': 128, } print('NessusApiLog:create_user:url:', url) response = requests.post(url=url, json=data, headers=self.headers, verify=False) print('NessusApiLog:create_user:result:', response.status_code) if response.status_code > 300: self.error(method='create_user', result=response.text) return 0 else: self.urls.append(url) self.debug(method='create_user', result=response.text) return response.json()['id'] def server_restart(self): """ url: https://:8834/server/restart """ url = f'{self.service_url}/server/restart' print('NessusApiLog:server_restart:url:', url) response = requests.post(url=url, headers=self.headers, verify=False) print('NessusApiLog:server_restart:result:', response.status_code) if response.status_code > 300: self.error(method='server_restart', result=response.text) return False else: self.debug(method='server_restart', result=response.text) return True def server_status(self): """ url: https://118.190.217.96:8001/server/status status: loading register ready """ url = f'{self.service_url}/server/status' print('NessusApiLog:server_status:url:', url) response = requests.get(url=url, headers=self.headers, verify=False) print('NessusApiLog:server_status:result:', response.status_code) if response.status_code > 300: self.error(method='restart', result=response.text) return '' else: self.debug(method='restart', result=response.text) return response.json()['status'] def get_session(self, username, password): """ url: http://:8834/session """ url = f'{self.service_url}/session' data = { 'username': username, 'password': password, } print('NessusApiLog:get_token:url:', url) response = requests.post(url=url, json=data, headers=self.headers, verify=False) print('NessusApiLog:get_token:result:', response.status_code) if response.status_code > 300: self.error(method='get_token', result=response.text) else: self.urls.append(url) self.debug(method='get_token', result=response.text) return response.json()['token'] def get_keys(self): """ url: http://:8834/session/keys doc: https://47.104.160.37:8001/api#/resources/session/keys """ url = f'{self.service_url}/session/keys' print('NessusApiLog:get_keys:url:', url) response = requests.put(url=url, headers={'content-type': 'application/json'}, verify=False) print('NessusApiLog:get_keys:result:', response.status_code) if response.status_code > 300: self.error(method='get_keys', result=response.text) return {} else: self.debug(method='get_keys', result=response.text) return response.json() def get_policy(self, policy_id): """ url: http://:8834/policies/ """ url = f'{self.service_url}/policies/{policy_id}' print('NessusApiLog:get_policy:url:', url) response = requests.get(url, headers=self.headers, verify=False) print('NessusApiLog:get_policy:result:', response.status_code) if response.status_code > 300: self.error(method='get_policy', result=response.text) else: self.urls.append(url) self.debug(method='get_policy', result=response.text) return response.json() def get_policies(self): """ url: http://:8834/policies """ url = f'{self.service_url}/policies' print('NessusApiLog:get_policies:url:', url) response = requests.get(url, headers=self.headers, verify=False, timeout=300) print('NessusApiLog:get_policies:result:', response.status_code) if response.status_code > 300: self.error(method='get_policies', result=response.text) else: self.urls.append(url) self.debug(method='get_policies', result=response.text) return response.json()['policies'] def get_scan_status(self, scan_id): """ url: http://:8834/scans/ """ url = f'{self.service_url}/scans/{scan_id}' params = {'includeHostDetailsForHostDiscovery': 'true', 'limit': 2500} print('NessusApiLog:get_scan:url:', url) response = requests.get(url, headers=self.headers, params=params, verify=False) print('NessusApiLog:get_scan:result:', response.status_code) if response.status_code > 300: self.error(method='get_scan', result=response.text) else: self.urls.append(url) self.debug(method='get_scan', result=response.text) return response.json()['info']['status'] def get_scan(self, scan_id): """ url: http://:8834/scans/ """ url = f'{self.service_url}/scans/{scan_id}' params = {'includeHostDetailsForHostDiscovery': 'true', 'limit': 2500} print('NessusApiLog:get_scan:url:', url) response = requests.get(url, headers=self.headers, params=params, verify=False) print('NessusApiLog:get_scan:result:', response.status_code) if response.status_code > 300: self.error(method='get_scan', result=response.text) else: self.urls.append(url) self.debug(method='get_scan', result=response.text) return response.json()['hosts'] def get_scans(self): """ url: http://:8834/scans """ url = f'{self.service_url}/scans' print('NessusApiLog:get_scans:url:', url) response = requests.get(url, headers=self.headers, verify=False) print('NessusApiLog:get_scans:result:', response.status_code) if response.status_code > 300: self.error(method='get_scans', result=response.text) else: self.urls.append(url) self.debug(method='get_scans', result=response.text) return response.json()['scans'] def create_scan(self, template_uuid, scan_name, policy_id, targets): """ url: http://:8834/scans doc: https://192.168.20.162:8834/api#/resources/scans/create """ url = f'{self.service_url}/scans' data = { "uuid": template_uuid, "settings": { "emails": "", "filter_type": "and", "filters": [], "launch_now": True, "enabled": False, "name": scan_name, "description": "", # "folder_id": 3, # "scanner_id": "1", "policy_id": str(policy_id), "text_targets": targets, "file_targets": "" } } print('NessusApiLog:create_scan:url:', url) response = requests.post(url, json=data, headers=self.headers, verify=False) print('NessusApiLog:create_scan:result:', response.status_code) if response.status_code > 300: self.error(method='create_scan', result=response.text) else: self.urls.append(url) self.debug(method='create_scan', result=response.text) return response.json()['scan']['id'] def delete_scan(self, scan_id): """ url: http://:8834/scans/ """ url = f'{self.service_url}/scans/{scan_id}' print('NessusApiLog:delete_scan:url:', url) response = requests.delete(url, headers=self.headers, verify=False) print('NessusApiLog:delete_scan:result:', response.status_code) if response.status_code > 300: self.error(method='delete_scan', result=response.text) return False else: self.urls.append(url) self.debug(method='delete_scan', result=response.text) return True def start_scan(self, scan_id): """ url: http://:8834/scans//launch """ url = f'{self.service_url}/scans/{scan_id}/launch' print('NessusApiLog:start_scan:url:', url) response = requests.post(url, json={}, headers=self.headers, verify=False) print('NessusApiLog:start_scan:result:', response.status_code) if response.status_code > 300: self.error(method='start_scan', result=response.text) else: self.urls.append(url) self.debug(method='start_scan', result=response.text) return response.json() def get_host(self, scan_id, host_id): """ url: https://192.168.20.162:8834/scans/68/hosts/232 doc: https://192.168.20.162:8834/api#/resources/scans/host-details """ url = f'{self.service_url}/scans/{scan_id}/hosts/{host_id}' print('NessusApiLog:get_host:url:', url) response = requests.get(url, params={}, headers=self.headers, verify=False) print('NessusApiLog:get_host:result:', response.status_code) if response.status_code > 300: self.error(method='get_host', result=response.text) else: self.urls.append(url) self.debug(method='get_host', result=response.text) return response.json() def get_plugin(self, scan_id, host_id, plugin_id): """ url: https://192.168.20.162:8834/scans/68/hosts/232/plugins/100464 """ url = f'{self.service_url}/scans/{scan_id}/hosts/{host_id}/plugins/{plugin_id}' print('NessusApiLog:get_plugin:url:', url) response = requests.get(url, params={}, headers=self.headers, verify=False) print('NessusApiLog:get_plugin:result:', response.status_code) if response.status_code > 300: self.error(method='get_plugin', result=response.text) else: self.urls.append(url) self.debug(method='get_plugin', result=response.text) return response.json() def get_plugin_info(self, plugin_id): """ url: https://192.168.20.162:8834/plugins/plugin/{id} doc: https://192.168.20.162:8834/api#/resources/plugins/plugin-details """ url = f'{self.service_url}/plugins/plugin/{plugin_id}' print('NessusApiLog:get_plugin_info:url:', url) response = requests.get(url, params={}, headers=self.headers, verify=False) print('NessusApiLog:get_plugin_info:result:', response.status_code) if response.status_code > 300: self.error(method='get_plugin_info', result=response.text) else: self.urls.append(url) self.debug(method='get_plugin_info', result=response.text) return response.json() def get_templates(self, template_type='policy'): """ url: https://172.30.2.8:8001/editor/{type}/templates doc: https://172.30.2.8:8001/api#/resources/editor/list """ url = f'{self.service_url}/editor/{template_type}/templates' print('NessusApiLog:get_templates:url:', url) response = requests.get(url, params={}, headers=self.headers, verify=False) print('NessusApiLog:get_templates:result:', response.status_code) if response.status_code > 300: self.error(method='get_templates', result=response.text) else: self.urls.append(url) self.debug(method='get_templates', result=response.text) return response.json()['templates'] def create_policy(self, template_uuid, policy_name, settings=None): """ url: https://192.168.20.162:8834/policies doc: https://192.168.20.162:8834/api#/resources/policies/create """ url = f'{self.service_url}/policies/' data = copy.copy(DefaultPolicyForAdvancedScan) data['uuid'] = template_uuid data['settings']['name'] = policy_name if settings: data['settings'].update(settings) print('NessusApiLog:create_policy:url:', url) response = requests.post(url, json=data, headers=self.headers, verify=False) print('NessusApiLog:create_policy:result:', response.status_code) if response.status_code > 300: self.error(method='create_policy', result=response.text) return '' else: self.urls.append(url) self.debug(method='create_policy', result=response.text) return response.json()['policy_id'] def get_api_token(self): """ url: https://192.168.20.162:8844/nessus6.js?v=1583846805284 """ url = f'{self.service_url}/nessus6.js' params = {'v': 1583846805284} print('NessusApiLog:get_api_token:url:', url) response = requests.get(url=url, params=params, headers=self.headers, verify=False) print('NessusApiLog:get_api_token:result:', response.status_code) if response.status_code > 300: # self.error(method='get_api_token', result=response.text) return '' else: self.urls.append(url) # self.debug(method='get_api_token', result=response.text) pattern = r'return"([0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12})' return re.findall(pattern, response.text)[0] if __name__ == '__main__': # api = Api(service_url='https://118.190.217.96:8001', username='admin', password='admin') # api = Api(service_url='https://47.104.224.202:8001', username='admin', password='admin') api = Api(service_url='https://172.30.2.8:8001', username='admin', password='admin') # --- test api --- # print(api.server_status()) # print(api.server_restart()) # print(api.server_status()) print(api.get_policies()) print(api.errors) # print(api.get_templates()) # print(api.get_template_id_by_title('Advanced Scan'))