123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564 |
- # update: 2021-6-28-19
- import requests
- import logging
- import time
- import copy
- import re
- requests.packages.urllib3.disable_warnings()
- TEMPLATES = ['PCI Quarterly External Scan', 'Host Discovery', 'WannaCry Ransomware', 'Intel AMT Security Bypass',
- 'Basic Network Scan', 'Credentialed Patch Audit', 'Web Application Tests', 'Malware Scan',
- 'Mobile Device Scan', 'MDM Config Audit', 'Policy Compliance Auditing', 'Internal PCI Network Scan',
- 'Offline Config Audit', 'Audit Cloud Infrastructure', 'SCAP and OVAL Auditing', 'Custom Scan',
- 'Bash Shellshock Detection', 'GHOST (glibc) Detection', 'DROWN Detection', 'Badlock Detection',
- 'Shadow Brokers Scan', 'Spectre and Meltdown', 'Advanced Scan', 'Advanced Dynamic Scan']
- DefaultPolicyForAdvancedScan = {
- "uuid": "ad629e16-03b6-8c1d-cef6-ef8c9dd3c658d24bd260ef5f9e66",
- "plugins": {
- "SMTP problems": {"status": "enabled"},
- "Backdoors": {"status": "enabled"},
- "Ubuntu Local Security Checks": {"status": "enabled"},
- "Gentoo Local Security Checks": {"status": "enabled"},
- "Oracle Linux Local Security Checks": {"status": "enabled"},
- "RPC": {"status": "enabled"},
- "Gain a shell remotely": {"status": "enabled"},
- "Service detection": {"status": "enabled"},
- "DNS": {"status": "enabled"},
- "Mandriva Local Security Checks": {"status": "enabled"},
- "Junos Local Security Checks": {"status": "enabled"},
- "Misc.": {"status": "enabled"},
- "FTP": {"status": "enabled"},
- "Slackware Local Security Checks": {"status": "enabled"},
- "Default Unix Accounts": {"status": "enabled"},
- "AIX Local Security Checks": {"status": "enabled"},
- "SNMP": {"status": "enabled"},
- "OracleVM Local Security Checks": {"status": "enabled"},
- "CGI abuses": {"status": "enabled"},
- "Settings": {"status": "enabled"},
- "CISCO": {"status": "enabled"},
- "Firewalls": {"status": "enabled"},
- "Databases": {"status": "enabled"},
- "Debian Local Security Checks": {"status": "enabled"},
- "Fedora Local Security Checks": {"status": "enabled"},
- "Netware": {"status": "enabled"},
- "Huawei Local Security Checks": {"status": "enabled"},
- "Windows : User management": {"status": "enabled"},
- "VMware ESX Local Security Checks": {"status": "enabled"},
- "Virtuozzo Local Security Checks": {"status": "enabled"},
- "CentOS Local Security Checks": {"status": "enabled"},
- "Peer-To-Peer File Sharing": {"status": "enabled"},
- "NewStart CGSL Local Security Checks": {"status": "enabled"},
- "General": {"status": "enabled"},
- "Policy Compliance": {"status": "enabled"},
- "Amazon Linux Local Security Checks": {"status": "enabled"},
- "Solaris Local Security Checks": {"status": "enabled"},
- "F5 Networks Local Security Checks": {"status": "enabled"},
- "Denial of Service": {"status": "enabled"},
- "Windows : Microsoft Bulletins": {"status": "enabled"},
- "SuSE Local Security Checks": {"status": "enabled"},
- "Palo Alto Local Security Checks": {"status": "enabled"},
- "Red Hat Local Security Checks": {"status": "enabled"},
- "PhotonOS Local Security Checks": {"status": "enabled"},
- "HP-UX Local Security Checks": {"status": "enabled"},
- "CGI abuses : XSS": {"status": "enabled"},
- "FreeBSD Local Security Checks": {"status": "enabled"},
- "Windows": {"status": "enabled"},
- "Scientific Linux Local Security Checks": {"status": "enabled"},
- "MacOS X Local Security Checks": {"status": "enabled"},
- "Web Servers": {"status": "enabled"},
- "SCADA": {"status": "enabled"}
- },
- "credentials": {"add": {}, "edit": {}, "delete": []},
- "settings": {
- "patch_audit_over_telnet": "no", "patch_audit_over_rsh": "no", "patch_audit_over_rexec": "no",
- "snmp_port": "161", "additional_snmp_port1": "161", "additional_snmp_port2": "161",
- "additional_snmp_port3": "161", "http_login_method": "POST", "http_reauth_delay": "",
- "http_login_max_redir": "0", "http_login_invert_auth_regex": "no", "http_login_auth_regex_on_headers": "no",
- "http_login_auth_regex_nocase": "no", "never_send_win_creds_in_the_clear": "yes",
- "dont_use_ntlmv1": "yes", "start_remote_registry": "no", "enable_admin_shares": "no", "ssh_known_hosts": "",
- "ssh_port": "22", "ssh_client_banner": "OpenSSH_5.0", "attempt_least_privilege": "no",
- "region_dfw_pref_name": "yes", "region_ord_pref_name": "yes", "region_iad_pref_name": "yes",
- "region_lon_pref_name": "yes", "region_syd_pref_name": "yes", "region_hkg_pref_name": "yes",
- "microsoft_azure_subscriptions_ids": "", "aws_ui_region_type": "Rest of the World",
- "aws_us_east_1": "", "aws_us_east_2": "", "aws_us_west_1": "", "aws_us_west_2": "",
- "aws_ca_central_1": "", "aws_eu_west_1": "", "aws_eu_west_2": "", "aws_eu_west_3": "",
- "aws_eu_central_1": "", "aws_eu_north_1": "", "aws_ap_east_1": "", "aws_ap_northeast_1": "",
- "aws_ap_northeast_2": "", "aws_ap_northeast_3": "", "aws_ap_southeast_1": "", "aws_ap_southeast_2": "",
- "aws_ap_south_1": "", "aws_me_south_1": "", "aws_sa_east_1": "", "aws_use_https": "yes",
- "aws_verify_ssl": "yes", "log_whole_attack": "no", "enable_plugin_debugging": "no",
- "audit_trail": "use_scanner_default", "include_kb": "use_scanner_default", "enable_plugin_list": "no",
- "custom_find_filepath_exclusions": "", "custom_find_filesystem_exclusions": "",
- "reduce_connections_on_congestion": "no", "network_receive_timeout": "5", "max_checks_per_host": "5",
- "max_hosts_per_scan": "100", "max_simult_tcp_sessions_per_host": "", "max_simult_tcp_sessions_per_scan": "",
- "safe_checks": "yes", "stop_scan_on_disconnect": "no", "slice_network_addresses": "no",
- "allow_post_scan_editing": "yes", "reverse_lookup": "no", "log_live_hosts": "no",
- "display_unreachable_hosts": "no", "report_verbosity": "Normal", "report_superseded_patches": "yes",
- "silent_dependencies": "yes", "scan_malware": "no", "samr_enumeration": "yes", "adsi_query": "yes",
- "wmi_query": "yes", "rid_brute_forcing": "no", "request_windows_domain_info": "no",
- "scan_webapps": "no", "test_default_oracle_accounts": "no", "provided_creds_only": "yes",
- "smtp_domain": "example.com", "smtp_from": "nobody@example.com", "smtp_to": "postmaster@[AUTO_REPLACED_IP]",
- "av_grace_period": "0", "report_paranoia": "Normal", "thorough_tests": "no",
- "svc_detection_on_all_ports": "yes", "detect_ssl": "yes", "ssl_prob_ports": "Known SSL ports",
- "cert_expiry_warning_days": "60", "enumerate_all_ciphers": "yes", "check_crl": "no",
- "tcp_scanner": "no", "tcp_firewall_detection": "Automatic (normal)", "syn_scanner": "yes",
- "syn_firewall_detection": "Automatic (normal)", "udp_scanner": "no", "ssh_netstat_scanner": "yes",
- "wmi_netstat_scanner": "yes", "snmp_scanner": "yes", "only_portscan_if_enum_failed": "yes",
- "verify_open_ports": "no", "unscanned_closed": "no", "portscan_range": "default",
- "wol_mac_addresses": "", "wol_wait_time": "5", "scan_network_printers": "no", "scan_netware_hosts": "no",
- "scan_ot_devices": "no", "ping_the_remote_host": "yes", "arp_ping": "yes", "tcp_ping": "yes",
- "tcp_ping_dest_ports": "built-in", "icmp_ping": "yes", "icmp_unreach_means_host_down": "no",
- "icmp_ping_retries": "2", "udp_ping": "no", "test_local_nessus_host": "yes",
- "fast_network_discovery": "no", "name": "test-1122", "description": ""
- }
- }
- class Api(object):
- def __init__(self, service_url='https://47.104.160.37:8001', username='admin', password='admin'):
- self.debugs = str()
- self.errors = str()
- self.urls = list()
- self.service_url = service_url
- self.headers = dict()
- self.headers['content-type'] = 'application/json'
- self.headers['X-API-Token'] = self.get_api_token()
- self.create_super_user(username, password)
- token = self.get_session(username, password)
- self.headers['X-Cookie'] = f'token={token}'
- def debug(self, **kwargs):
- for k, v in kwargs.items():
- self.debugs += str(k) + ':' + str(v) + ';'
- def error(self, **kwargs):
- for k, v in kwargs.items():
- self.errors += str(k) + ':' + str(v) + ';'
- def get_policy_id_by_name(self, name):
- """根据策略名称获取策略id"""
- for policy in self.get_policies():
- if policy['name'] != name:
- continue
- return policy['id']
- return ''
- def get_template_id_by_title(self, title):
- """根据策略类型标题获取策略类型id"""
- if title not in TEMPLATES:
- return ''
- for template in self.get_templates():
- if template['title'] != title:
- continue
- return template['uuid']
- return ''
- def scan_is_completed(self, scan_id):
- """检查执行状态"""
- return self.get_scan_status(scan_id) == 'completed'
- def create_super_user(self, username, password):
- """创建超级管理员用户"""
- # --- 是否为register ---
- if self.server_status() != 'register':
- return
- self.create_user(username, password)
- self.server_restart()
- # --- 等待loading结束 ---
- while True:
- try:
- if self.server_status() == 'ready':
- break
- print(self.server_status())
- time.sleep(5)
- except Exception as e:
- print(e.__class__.__name__)
- time.sleep(5)
- def create_user(self, username, password):
- """
- url: https://<ip地址>:8834/users
- """
- if 'register' != self.server_status():
- return 0
- url = f'{self.service_url}/users'
- data = {
- 'username': username,
- 'password': password,
- 'permissions': 128,
- }
- print('NessusApiLog:create_user:url:', url)
- response = requests.post(url=url, json=data, headers=self.headers, verify=False)
- print('NessusApiLog:create_user:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='create_user', result=response.text)
- return 0
- else:
- self.urls.append(url)
- self.debug(method='create_user', result=response.text)
- return response.json()['id']
- def server_restart(self):
- """
- url: https://<ip地址>:8834/server/restart
- """
- url = f'{self.service_url}/server/restart'
- print('NessusApiLog:server_restart:url:', url)
- response = requests.post(url=url, headers=self.headers, verify=False)
- print('NessusApiLog:server_restart:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='server_restart', result=response.text)
- return False
- else:
- self.debug(method='server_restart', result=response.text)
- return True
- def server_status(self):
- """
- url: https://118.190.217.96:8001/server/status
- status:
- loading
- register
- ready
- """
- url = f'{self.service_url}/server/status'
- print('NessusApiLog:server_status:url:', url)
- response = requests.get(url=url, headers=self.headers, verify=False)
- print('NessusApiLog:server_status:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='restart', result=response.text)
- return ''
- else:
- self.debug(method='restart', result=response.text)
- return response.json()['status']
- def get_session(self, username, password):
- """
- url: http://<ip地址>:8834/session
- """
- url = f'{self.service_url}/session'
- data = {
- 'username': username,
- 'password': password,
- }
- print('NessusApiLog:get_token:url:', url)
- response = requests.post(url=url, json=data, headers=self.headers, verify=False)
- print('NessusApiLog:get_token:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='get_token', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='get_token', result=response.text)
- return response.json()['token']
- def get_keys(self):
- """
- url: http://<ip地址>:8834/session/keys
- doc: https://47.104.160.37:8001/api#/resources/session/keys
- """
- url = f'{self.service_url}/session/keys'
- print('NessusApiLog:get_keys:url:', url)
- response = requests.put(url=url, headers={'content-type': 'application/json'}, verify=False)
- print('NessusApiLog:get_keys:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='get_keys', result=response.text)
- return {}
- else:
- self.debug(method='get_keys', result=response.text)
- return response.json()
- def get_policy(self, policy_id):
- """
- url: http://<ip地址>:8834/policies/<policy_id>
- """
- url = f'{self.service_url}/policies/{policy_id}'
- print('NessusApiLog:get_policy:url:', url)
- response = requests.get(url, headers=self.headers, verify=False)
- print('NessusApiLog:get_policy:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='get_policy', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='get_policy', result=response.text)
- return response.json()
- def get_policies(self):
- """
- url: http://<ip地址>:8834/policies
- """
- url = f'{self.service_url}/policies'
- print('NessusApiLog:get_policies:url:', url)
- response = requests.get(url, headers=self.headers, verify=False, timeout=300)
- print('NessusApiLog:get_policies:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='get_policies', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='get_policies', result=response.text)
- return response.json()['policies']
- def get_scan_status(self, scan_id):
- """
- url: http://<ip地址>:8834/scans/<scan_id>
- """
- url = f'{self.service_url}/scans/{scan_id}'
- params = {'includeHostDetailsForHostDiscovery': 'true', 'limit': 2500}
- print('NessusApiLog:get_scan:url:', url)
- response = requests.get(url, headers=self.headers, params=params, verify=False)
- print('NessusApiLog:get_scan:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='get_scan', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='get_scan', result=response.text)
- return response.json()['info']['status']
- def get_scan(self, scan_id):
- """
- url: http://<ip地址>:8834/scans/<scan_id>
- """
- url = f'{self.service_url}/scans/{scan_id}'
- params = {'includeHostDetailsForHostDiscovery': 'true', 'limit': 2500}
- print('NessusApiLog:get_scan:url:', url)
- response = requests.get(url, headers=self.headers, params=params, verify=False)
- print('NessusApiLog:get_scan:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='get_scan', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='get_scan', result=response.text)
- return response.json()['hosts']
- def get_scans(self):
- """
- url: http://<ip地址>:8834/scans
- """
- url = f'{self.service_url}/scans'
- print('NessusApiLog:get_scans:url:', url)
- response = requests.get(url, headers=self.headers, verify=False)
- print('NessusApiLog:get_scans:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='get_scans', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='get_scans', result=response.text)
- return response.json()['scans']
- def create_scan(self, template_uuid, scan_name, policy_id, targets):
- """
- url: http://<ip地址>:8834/scans
- doc: https://192.168.20.162:8834/api#/resources/scans/create
- """
- url = f'{self.service_url}/scans'
- data = {
- "uuid": template_uuid,
- "settings": {
- "emails": "",
- "filter_type": "and",
- "filters": [],
- "launch_now": True,
- "enabled": False,
- "name": scan_name,
- "description": "",
- # "folder_id": 3,
- # "scanner_id": "1",
- "policy_id": str(policy_id),
- "text_targets": targets,
- "file_targets": ""
- }
- }
- print('NessusApiLog:create_scan:url:', url)
- response = requests.post(url, json=data, headers=self.headers, verify=False)
- print('NessusApiLog:create_scan:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='create_scan', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='create_scan', result=response.text)
- return response.json()['scan']['id']
- def delete_scan(self, scan_id):
- """
- url: http://<ip地址>:8834/scans/<scan_id>
- """
- url = f'{self.service_url}/scans/{scan_id}'
- print('NessusApiLog:delete_scan:url:', url)
- response = requests.delete(url, headers=self.headers, verify=False)
- print('NessusApiLog:delete_scan:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='delete_scan', result=response.text)
- return False
- else:
- self.urls.append(url)
- self.debug(method='delete_scan', result=response.text)
- return True
- def start_scan(self, scan_id):
- """
- url: http://<ip地址>:8834/scans/<scan_id>/launch
- """
- url = f'{self.service_url}/scans/{scan_id}/launch'
- print('NessusApiLog:start_scan:url:', url)
- response = requests.post(url, json={}, headers=self.headers, verify=False)
- print('NessusApiLog:start_scan:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='start_scan', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='start_scan', result=response.text)
- return response.json()
- def get_host(self, scan_id, host_id):
- """
- url: https://192.168.20.162:8834/scans/68/hosts/232
- doc: https://192.168.20.162:8834/api#/resources/scans/host-details
- """
- url = f'{self.service_url}/scans/{scan_id}/hosts/{host_id}'
- print('NessusApiLog:get_host:url:', url)
- response = requests.get(url, params={}, headers=self.headers, verify=False)
- print('NessusApiLog:get_host:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='get_host', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='get_host', result=response.text)
- return response.json()
- def get_plugin(self, scan_id, host_id, plugin_id):
- """
- url: https://192.168.20.162:8834/scans/68/hosts/232/plugins/100464
- """
- url = f'{self.service_url}/scans/{scan_id}/hosts/{host_id}/plugins/{plugin_id}'
- print('NessusApiLog:get_plugin:url:', url)
- response = requests.get(url, params={}, headers=self.headers, verify=False)
- print('NessusApiLog:get_plugin:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='get_plugin', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='get_plugin', result=response.text)
- return response.json()
- def get_plugin_info(self, plugin_id):
- """
- url: https://192.168.20.162:8834/plugins/plugin/{id}
- doc: https://192.168.20.162:8834/api#/resources/plugins/plugin-details
- """
- url = f'{self.service_url}/plugins/plugin/{plugin_id}'
- print('NessusApiLog:get_plugin_info:url:', url)
- response = requests.get(url, params={}, headers=self.headers, verify=False)
- print('NessusApiLog:get_plugin_info:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='get_plugin_info', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='get_plugin_info', result=response.text)
- return response.json()
- def get_templates(self, template_type='policy'):
- """
- url: https://172.30.2.8:8001/editor/{type}/templates
- doc: https://172.30.2.8:8001/api#/resources/editor/list
- """
- url = f'{self.service_url}/editor/{template_type}/templates'
- print('NessusApiLog:get_templates:url:', url)
- response = requests.get(url, params={}, headers=self.headers, verify=False)
- print('NessusApiLog:get_templates:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='get_templates', result=response.text)
- else:
- self.urls.append(url)
- self.debug(method='get_templates', result=response.text)
- return response.json()['templates']
- def create_policy(self, template_uuid, policy_name, settings=None):
- """
- url: https://192.168.20.162:8834/policies
- doc: https://192.168.20.162:8834/api#/resources/policies/create
- """
- url = f'{self.service_url}/policies/'
- data = copy.copy(DefaultPolicyForAdvancedScan)
- data['uuid'] = template_uuid
- data['settings']['name'] = policy_name
- if settings:
- data['settings'].update(settings)
- print('NessusApiLog:create_policy:url:', url)
- response = requests.post(url, json=data, headers=self.headers, verify=False)
- print('NessusApiLog:create_policy:result:', response.status_code)
- if response.status_code > 300:
- self.error(method='create_policy', result=response.text)
- return ''
- else:
- self.urls.append(url)
- self.debug(method='create_policy', result=response.text)
- return response.json()['policy_id']
- def get_api_token(self):
- """
- url: https://192.168.20.162:8844/nessus6.js?v=1583846805284
- """
- url = f'{self.service_url}/nessus6.js'
- params = {'v': 1583846805284}
- print('NessusApiLog:get_api_token:url:', url)
- response = requests.get(url=url, params=params, headers=self.headers, verify=False)
- print('NessusApiLog:get_api_token:result:', response.status_code)
- if response.status_code > 300:
- # self.error(method='get_api_token', result=response.text)
- return ''
- else:
- self.urls.append(url)
- # self.debug(method='get_api_token', result=response.text)
- pattern = r'return"([0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12})'
- return re.findall(pattern, response.text)[0]
- if __name__ == '__main__':
- # api = Api(service_url='https://118.190.217.96:8001', username='admin', password='admin')
- # api = Api(service_url='https://47.104.224.202:8001', username='admin', password='admin')
- api = Api(service_url='https://172.30.2.8:8001', username='admin', password='admin')
- # --- test api ---
- # print(api.server_status())
- # print(api.server_restart())
- # print(api.server_status())
- print(api.get_policies())
- print(api.errors)
- # print(api.get_templates())
- # print(api.get_template_id_by_title('Advanced Scan'))
|