| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564 | # update: 2021-6-28-19import requestsimport loggingimport timeimport copyimport rerequests.packages.urllib3.disable_warnings()TEMPLATES = ['PCI Quarterly External Scan', 'Host Discovery', 'WannaCry Ransomware', 'Intel AMT Security Bypass',             'Basic Network Scan', 'Credentialed Patch Audit', 'Web Application Tests', 'Malware Scan',             'Mobile Device Scan', 'MDM Config Audit', 'Policy Compliance Auditing', 'Internal PCI Network Scan',             'Offline Config Audit', 'Audit Cloud Infrastructure', 'SCAP and OVAL Auditing', 'Custom Scan',             'Bash Shellshock Detection', 'GHOST (glibc) Detection', 'DROWN Detection', 'Badlock Detection',             'Shadow Brokers Scan', 'Spectre and Meltdown', 'Advanced Scan', 'Advanced Dynamic Scan']DefaultPolicyForAdvancedScan = {    "uuid": "ad629e16-03b6-8c1d-cef6-ef8c9dd3c658d24bd260ef5f9e66",    "plugins": {        "SMTP problems": {"status": "enabled"},        "Backdoors": {"status": "enabled"},        "Ubuntu Local Security Checks": {"status": "enabled"},        "Gentoo Local Security Checks": {"status": "enabled"},        "Oracle Linux Local Security Checks": {"status": "enabled"},        "RPC": {"status": "enabled"},        "Gain a shell remotely": {"status": "enabled"},        "Service detection": {"status": "enabled"},        "DNS": {"status": "enabled"},        "Mandriva Local Security Checks": {"status": "enabled"},        "Junos Local Security Checks": {"status": "enabled"},        "Misc.": {"status": "enabled"},        "FTP": {"status": "enabled"},        "Slackware Local Security Checks": {"status": "enabled"},        "Default Unix Accounts": {"status": "enabled"},        "AIX Local Security Checks": {"status": "enabled"},        "SNMP": {"status": "enabled"},        "OracleVM Local Security Checks": {"status": "enabled"},        "CGI abuses": {"status": "enabled"},        "Settings": {"status": "enabled"},        "CISCO": {"status": "enabled"},        "Firewalls": {"status": "enabled"},        "Databases": {"status": "enabled"},        "Debian Local Security Checks": {"status": "enabled"},        "Fedora Local Security Checks": {"status": "enabled"},        "Netware": {"status": "enabled"},        "Huawei Local Security Checks": {"status": "enabled"},        "Windows : User management": {"status": "enabled"},        "VMware ESX Local Security Checks": {"status": "enabled"},        "Virtuozzo Local Security Checks": {"status": "enabled"},        "CentOS Local Security Checks": {"status": "enabled"},        "Peer-To-Peer File Sharing": {"status": "enabled"},        "NewStart CGSL Local Security Checks": {"status": "enabled"},        "General": {"status": "enabled"},        "Policy Compliance": {"status": "enabled"},        "Amazon Linux Local Security Checks": {"status": "enabled"},        "Solaris Local Security Checks": {"status": "enabled"},        "F5 Networks Local Security Checks": {"status": "enabled"},        "Denial of Service": {"status": "enabled"},        "Windows : Microsoft Bulletins": {"status": "enabled"},        "SuSE Local Security Checks": {"status": "enabled"},        "Palo Alto Local Security Checks": {"status": "enabled"},        "Red Hat Local Security Checks": {"status": "enabled"},        "PhotonOS Local Security Checks": {"status": "enabled"},        "HP-UX Local Security Checks": {"status": "enabled"},        "CGI abuses : XSS": {"status": "enabled"},        "FreeBSD Local Security Checks": {"status": "enabled"},        "Windows": {"status": "enabled"},        "Scientific Linux Local Security Checks": {"status": "enabled"},        "MacOS X Local Security Checks": {"status": "enabled"},        "Web Servers": {"status": "enabled"},        "SCADA": {"status": "enabled"}    },    "credentials": {"add": {}, "edit": {}, "delete": []},    "settings": {        "patch_audit_over_telnet": "no", "patch_audit_over_rsh": "no", "patch_audit_over_rexec": "no",        "snmp_port": "161", "additional_snmp_port1": "161", "additional_snmp_port2": "161",        "additional_snmp_port3": "161", "http_login_method": "POST", "http_reauth_delay": "",        "http_login_max_redir": "0", "http_login_invert_auth_regex": "no", "http_login_auth_regex_on_headers": "no",        "http_login_auth_regex_nocase": "no", "never_send_win_creds_in_the_clear": "yes",        "dont_use_ntlmv1": "yes", "start_remote_registry": "no", "enable_admin_shares": "no", "ssh_known_hosts": "",        "ssh_port": "22", "ssh_client_banner": "OpenSSH_5.0", "attempt_least_privilege": "no",        "region_dfw_pref_name": "yes", "region_ord_pref_name": "yes", "region_iad_pref_name": "yes",        "region_lon_pref_name": "yes", "region_syd_pref_name": "yes", "region_hkg_pref_name": "yes",        "microsoft_azure_subscriptions_ids": "", "aws_ui_region_type": "Rest of the World",        "aws_us_east_1": "", "aws_us_east_2": "", "aws_us_west_1": "", "aws_us_west_2": "",        "aws_ca_central_1": "", "aws_eu_west_1": "", "aws_eu_west_2": "", "aws_eu_west_3": "",        "aws_eu_central_1": "", "aws_eu_north_1": "", "aws_ap_east_1": "", "aws_ap_northeast_1": "",        "aws_ap_northeast_2": "", "aws_ap_northeast_3": "", "aws_ap_southeast_1": "", "aws_ap_southeast_2": "",        "aws_ap_south_1": "", "aws_me_south_1": "", "aws_sa_east_1": "", "aws_use_https": "yes",        "aws_verify_ssl": "yes", "log_whole_attack": "no", "enable_plugin_debugging": "no",        "audit_trail": "use_scanner_default", "include_kb": "use_scanner_default", "enable_plugin_list": "no",        "custom_find_filepath_exclusions": "", "custom_find_filesystem_exclusions": "",        "reduce_connections_on_congestion": "no", "network_receive_timeout": "5", "max_checks_per_host": "5",        "max_hosts_per_scan": "100", "max_simult_tcp_sessions_per_host": "", "max_simult_tcp_sessions_per_scan": "",        "safe_checks": "yes", "stop_scan_on_disconnect": "no", "slice_network_addresses": "no",        "allow_post_scan_editing": "yes", "reverse_lookup": "no", "log_live_hosts": "no",        "display_unreachable_hosts": "no", "report_verbosity": "Normal", "report_superseded_patches": "yes",        "silent_dependencies": "yes", "scan_malware": "no", "samr_enumeration": "yes", "adsi_query": "yes",        "wmi_query": "yes", "rid_brute_forcing": "no", "request_windows_domain_info": "no",        "scan_webapps": "no", "test_default_oracle_accounts": "no", "provided_creds_only": "yes",        "smtp_domain": "example.com", "smtp_from": "nobody@example.com", "smtp_to": "postmaster@[AUTO_REPLACED_IP]",        "av_grace_period": "0", "report_paranoia": "Normal", "thorough_tests": "no",        "svc_detection_on_all_ports": "yes", "detect_ssl": "yes", "ssl_prob_ports": "Known SSL ports",        "cert_expiry_warning_days": "60", "enumerate_all_ciphers": "yes", "check_crl": "no",        "tcp_scanner": "no", "tcp_firewall_detection": "Automatic (normal)", "syn_scanner": "yes",        "syn_firewall_detection": "Automatic (normal)", "udp_scanner": "no", "ssh_netstat_scanner": "yes",        "wmi_netstat_scanner": "yes", "snmp_scanner": "yes", "only_portscan_if_enum_failed": "yes",        "verify_open_ports": "no", "unscanned_closed": "no", "portscan_range": "default",        "wol_mac_addresses": "", "wol_wait_time": "5", "scan_network_printers": "no", "scan_netware_hosts": "no",        "scan_ot_devices": "no", "ping_the_remote_host": "yes", "arp_ping": "yes", "tcp_ping": "yes",        "tcp_ping_dest_ports": "built-in", "icmp_ping": "yes", "icmp_unreach_means_host_down": "no",        "icmp_ping_retries": "2", "udp_ping": "no", "test_local_nessus_host": "yes",        "fast_network_discovery": "no", "name": "test-1122", "description": ""    }}class Api(object):    def __init__(self, service_url='https://47.104.160.37:8001', username='admin', password='admin'):        self.debugs = str()        self.errors = str()        self.urls = list()        self.service_url = service_url        self.headers = dict()        self.headers['content-type'] = 'application/json'        self.headers['X-API-Token'] = self.get_api_token()        self.create_super_user(username, password)        token = self.get_session(username, password)        self.headers['X-Cookie'] = f'token={token}'    def debug(self, **kwargs):        for k, v in kwargs.items():            self.debugs += str(k) + ':' + str(v) + ';'    def error(self, **kwargs):        for k, v in kwargs.items():            self.errors += str(k) + ':' + str(v) + ';'    def get_policy_id_by_name(self, name):        """根据策略名称获取策略id"""        for policy in self.get_policies():            if policy['name'] != name:                continue            return policy['id']        return ''    def get_template_id_by_title(self, title):        """根据策略类型标题获取策略类型id"""        if title not in TEMPLATES:            return ''        for template in self.get_templates():            if template['title'] != title:                continue            return template['uuid']        return ''    def scan_is_completed(self, scan_id):        """检查执行状态"""        return self.get_scan_status(scan_id) == 'completed'    def create_super_user(self, username, password):        """创建超级管理员用户"""        # --- 是否为register ---        if self.server_status() != 'register':            return        self.create_user(username, password)        self.server_restart()        # --- 等待loading结束 ---        while True:            try:                if self.server_status() == 'ready':                    break                print(self.server_status())                time.sleep(5)            except Exception as e:                print(e.__class__.__name__)                time.sleep(5)    def create_user(self, username, password):        """        url: https://<ip地址>:8834/users        """        if 'register' != self.server_status():            return 0        url = f'{self.service_url}/users'        data = {            'username': username,            'password': password,            'permissions': 128,        }        print('NessusApiLog:create_user:url:', url)        response = requests.post(url=url, json=data, headers=self.headers, verify=False)        print('NessusApiLog:create_user:result:', response.status_code)        if response.status_code > 300:            self.error(method='create_user', result=response.text)            return 0        else:            self.urls.append(url)            self.debug(method='create_user', result=response.text)            return response.json()['id']    def server_restart(self):        """        url: https://<ip地址>:8834/server/restart        """        url = f'{self.service_url}/server/restart'        print('NessusApiLog:server_restart:url:', url)        response = requests.post(url=url, headers=self.headers, verify=False)        print('NessusApiLog:server_restart:result:', response.status_code)        if response.status_code > 300:            self.error(method='server_restart', result=response.text)            return False        else:            self.debug(method='server_restart', result=response.text)            return True    def server_status(self):        """        url: https://118.190.217.96:8001/server/status        status:            loading            register            ready        """        url = f'{self.service_url}/server/status'        print('NessusApiLog:server_status:url:', url)        response = requests.get(url=url, headers=self.headers, verify=False)        print('NessusApiLog:server_status:result:', response.status_code)        if response.status_code > 300:            self.error(method='restart', result=response.text)            return ''        else:            self.debug(method='restart', result=response.text)            return response.json()['status']    def get_session(self, username, password):        """        url: http://<ip地址>:8834/session        """        url = f'{self.service_url}/session'        data = {            'username': username,            'password': password,        }        print('NessusApiLog:get_token:url:', url)        response = requests.post(url=url, json=data, headers=self.headers, verify=False)        print('NessusApiLog:get_token:result:', response.status_code)        if response.status_code > 300:            self.error(method='get_token', result=response.text)        else:            self.urls.append(url)            self.debug(method='get_token', result=response.text)            return response.json()['token']    def get_keys(self):        """        url: http://<ip地址>:8834/session/keys        doc: https://47.104.160.37:8001/api#/resources/session/keys        """        url = f'{self.service_url}/session/keys'        print('NessusApiLog:get_keys:url:', url)        response = requests.put(url=url, headers={'content-type': 'application/json'}, verify=False)        print('NessusApiLog:get_keys:result:', response.status_code)        if response.status_code > 300:            self.error(method='get_keys', result=response.text)            return {}        else:            self.debug(method='get_keys', result=response.text)            return response.json()    def get_policy(self, policy_id):        """        url: http://<ip地址>:8834/policies/<policy_id>        """        url = f'{self.service_url}/policies/{policy_id}'        print('NessusApiLog:get_policy:url:', url)        response = requests.get(url, headers=self.headers, verify=False)        print('NessusApiLog:get_policy:result:', response.status_code)        if response.status_code > 300:            self.error(method='get_policy', result=response.text)        else:            self.urls.append(url)            self.debug(method='get_policy', result=response.text)            return response.json()    def get_policies(self):        """        url: http://<ip地址>:8834/policies        """        url = f'{self.service_url}/policies'        print('NessusApiLog:get_policies:url:', url)        response = requests.get(url, headers=self.headers, verify=False, timeout=300)        print('NessusApiLog:get_policies:result:', response.status_code)        if response.status_code > 300:            self.error(method='get_policies', result=response.text)        else:            self.urls.append(url)            self.debug(method='get_policies', result=response.text)            return response.json()['policies']    def get_scan_status(self, scan_id):        """        url: http://<ip地址>:8834/scans/<scan_id>        """        url = f'{self.service_url}/scans/{scan_id}'        params = {'includeHostDetailsForHostDiscovery': 'true', 'limit': 2500}        print('NessusApiLog:get_scan:url:', url)        response = requests.get(url, headers=self.headers, params=params, verify=False)        print('NessusApiLog:get_scan:result:', response.status_code)        if response.status_code > 300:            self.error(method='get_scan', result=response.text)        else:            self.urls.append(url)            self.debug(method='get_scan', result=response.text)            return response.json()['info']['status']    def get_scan(self, scan_id):        """        url: http://<ip地址>:8834/scans/<scan_id>        """        url = f'{self.service_url}/scans/{scan_id}'        params = {'includeHostDetailsForHostDiscovery': 'true', 'limit': 2500}        print('NessusApiLog:get_scan:url:', url)        response = requests.get(url, headers=self.headers, params=params, verify=False)        print('NessusApiLog:get_scan:result:', response.status_code)        if response.status_code > 300:            self.error(method='get_scan', result=response.text)        else:            self.urls.append(url)            self.debug(method='get_scan', result=response.text)            return response.json()['hosts']    def get_scans(self):        """        url: http://<ip地址>:8834/scans        """        url = f'{self.service_url}/scans'        print('NessusApiLog:get_scans:url:', url)        response = requests.get(url, headers=self.headers, verify=False)        print('NessusApiLog:get_scans:result:', response.status_code)        if response.status_code > 300:            self.error(method='get_scans', result=response.text)        else:            self.urls.append(url)            self.debug(method='get_scans', result=response.text)            return response.json()['scans']    def create_scan(self, template_uuid, scan_name, policy_id, targets):        """        url: http://<ip地址>:8834/scans        doc: https://192.168.20.162:8834/api#/resources/scans/create        """        url = f'{self.service_url}/scans'        data = {            "uuid": template_uuid,            "settings": {                "emails": "",                "filter_type": "and",                "filters": [],                "launch_now": True,                "enabled": False,                "name": scan_name,                "description": "",                # "folder_id": 3,                # "scanner_id": "1",                "policy_id": str(policy_id),                "text_targets": targets,                "file_targets": ""            }        }        print('NessusApiLog:create_scan:url:', url)        response = requests.post(url, json=data, headers=self.headers, verify=False)        print('NessusApiLog:create_scan:result:', response.status_code)        if response.status_code > 300:            self.error(method='create_scan', result=response.text)        else:            self.urls.append(url)            self.debug(method='create_scan', result=response.text)            return response.json()['scan']['id']    def delete_scan(self, scan_id):        """        url: http://<ip地址>:8834/scans/<scan_id>        """        url = f'{self.service_url}/scans/{scan_id}'        print('NessusApiLog:delete_scan:url:', url)        response = requests.delete(url, headers=self.headers, verify=False)        print('NessusApiLog:delete_scan:result:', response.status_code)        if response.status_code > 300:            self.error(method='delete_scan', result=response.text)            return False        else:            self.urls.append(url)            self.debug(method='delete_scan', result=response.text)            return True    def start_scan(self, scan_id):        """        url: http://<ip地址>:8834/scans/<scan_id>/launch        """        url = f'{self.service_url}/scans/{scan_id}/launch'        print('NessusApiLog:start_scan:url:', url)        response = requests.post(url, json={}, headers=self.headers, verify=False)        print('NessusApiLog:start_scan:result:', response.status_code)        if response.status_code > 300:            self.error(method='start_scan', result=response.text)        else:            self.urls.append(url)            self.debug(method='start_scan', result=response.text)            return response.json()    def get_host(self, scan_id, host_id):        """        url: https://192.168.20.162:8834/scans/68/hosts/232        doc: https://192.168.20.162:8834/api#/resources/scans/host-details        """        url = f'{self.service_url}/scans/{scan_id}/hosts/{host_id}'        print('NessusApiLog:get_host:url:', url)        response = requests.get(url, params={}, headers=self.headers, verify=False)        print('NessusApiLog:get_host:result:', response.status_code)        if response.status_code > 300:            self.error(method='get_host', result=response.text)        else:            self.urls.append(url)            self.debug(method='get_host', result=response.text)            return response.json()    def get_plugin(self, scan_id, host_id, plugin_id):        """        url: https://192.168.20.162:8834/scans/68/hosts/232/plugins/100464        """        url = f'{self.service_url}/scans/{scan_id}/hosts/{host_id}/plugins/{plugin_id}'        print('NessusApiLog:get_plugin:url:', url)        response = requests.get(url, params={}, headers=self.headers, verify=False)        print('NessusApiLog:get_plugin:result:', response.status_code)        if response.status_code > 300:            self.error(method='get_plugin', result=response.text)        else:            self.urls.append(url)            self.debug(method='get_plugin', result=response.text)            return response.json()    def get_plugin_info(self, plugin_id):        """        url: https://192.168.20.162:8834/plugins/plugin/{id}        doc: https://192.168.20.162:8834/api#/resources/plugins/plugin-details        """        url = f'{self.service_url}/plugins/plugin/{plugin_id}'        print('NessusApiLog:get_plugin_info:url:', url)        response = requests.get(url, params={}, headers=self.headers, verify=False)        print('NessusApiLog:get_plugin_info:result:', response.status_code)        if response.status_code > 300:            self.error(method='get_plugin_info', result=response.text)        else:            self.urls.append(url)            self.debug(method='get_plugin_info', result=response.text)            return response.json()    def get_templates(self, template_type='policy'):        """        url: https://172.30.2.8:8001/editor/{type}/templates        doc: https://172.30.2.8:8001/api#/resources/editor/list        """        url = f'{self.service_url}/editor/{template_type}/templates'        print('NessusApiLog:get_templates:url:', url)        response = requests.get(url, params={}, headers=self.headers, verify=False)        print('NessusApiLog:get_templates:result:', response.status_code)        if response.status_code > 300:            self.error(method='get_templates', result=response.text)        else:            self.urls.append(url)            self.debug(method='get_templates', result=response.text)            return response.json()['templates']    def create_policy(self, template_uuid, policy_name, settings=None):        """        url: https://192.168.20.162:8834/policies        doc: https://192.168.20.162:8834/api#/resources/policies/create        """        url = f'{self.service_url}/policies/'        data = copy.copy(DefaultPolicyForAdvancedScan)        data['uuid'] = template_uuid        data['settings']['name'] = policy_name        if settings:            data['settings'].update(settings)        print('NessusApiLog:create_policy:url:', url)        response = requests.post(url, json=data, headers=self.headers, verify=False)        print('NessusApiLog:create_policy:result:', response.status_code)        if response.status_code > 300:            self.error(method='create_policy', result=response.text)            return ''        else:            self.urls.append(url)            self.debug(method='create_policy', result=response.text)            return response.json()['policy_id']    def get_api_token(self):        """        url: https://192.168.20.162:8844/nessus6.js?v=1583846805284        """        url = f'{self.service_url}/nessus6.js'        params = {'v': 1583846805284}        print('NessusApiLog:get_api_token:url:', url)        response = requests.get(url=url, params=params, headers=self.headers, verify=False)        print('NessusApiLog:get_api_token:result:', response.status_code)        if response.status_code > 300:            # self.error(method='get_api_token', result=response.text)            return ''        else:            self.urls.append(url)            # self.debug(method='get_api_token', result=response.text)            pattern = r'return"([0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12})'            return re.findall(pattern, response.text)[0]if __name__ == '__main__':    # api = Api(service_url='https://118.190.217.96:8001', username='admin', password='admin')    # api = Api(service_url='https://47.104.224.202:8001', username='admin', password='admin')    api = Api(service_url='https://172.30.2.8:8001', username='admin', password='admin')    # --- test api ---    # print(api.server_status())    # print(api.server_restart())    # print(api.server_status())    print(api.get_policies())    print(api.errors)    # print(api.get_templates())    # print(api.get_template_id_by_title('Advanced Scan'))
 |