# update: 2021-3-26-13 """ https://www.jianshu.com/p/512a5641b501 # --- 上传脚本 --- sftp = s.open_sftp() sftp.put('../test.sh', '/data/test.sh') sftp.close() # --- 执行脚本并删除 --- docker exec zelu_8891 bash -c "python /home/server/projects/python-admin/zelu/libraries/protocol_l4/ssh/client.py" """ import paramiko def test(): host_ip = '192.168.30.13' username = 'fish' password = 'admin' client = paramiko.SSHClient() try: # --- password login --- client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname=host_ip, port=22, username=username, password=password) # --- test --- stdin, stdout, stderr = client.exec_command('ls -all /home') # stdin, stdout, stderr = client.exec_command('ls /home/ShellScripts') print('--- --- --- ---') print(stdout.read().decode('utf-8')) except Exception as e: print(e) finally: client.close() class Client(paramiko.SSHClient): def __init__(self, ipv4='172.18.0.1', port=22, username='fish', password='admin'): super().__init__() self.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.connect(hostname=ipv4, port=port, username=username, password=password) def run_script(self, script_path, host_ip, username, password, command='sh /root/1.sh && cat /home/out.txt'): """执行上传脚本""" # script_path = '/home/server/projects/user-dashboard/operation/builtin_script/linux基线检查脚本.sh' # script_path = r'D:\share\gitee\secdeer.user-dashboard-online\operation\builtin_script\linux基线检查脚本.sh' # host_ip = '47.104.160.37' # username = 'root' # password = 'Secdeer_01!' self.connect(hostname=host_ip, port=22, username=username, password=password) sftp = self.open_sftp() sftp.put(script_path, '/root/1.sh') sftp.close() # stdin, stdout, stderr = self.exec_command('ls -all /home') # stdin, stdout, stderr = self.exec_command('sh /root/1.sh') # stdin, stdout, stderr = self.exec_command('cat /home/out.txt') stdin, stdout, stderr = self.exec_command(command) output = stdout.read().decode('utf-8') print(output) print('--- --- --- ---') return output def run_command(self, command): """执行命令""" try: stdin, stdout, stderr = self.exec_command(command) output = stdout.read().decode('utf-8') return output except Exception as exception: import traceback print(f"SSHClient.run_command.exception: {exception.__class__.__name__}") print(f"SSHClient.run_command.traceback: {traceback.format_exc()}") # finally: # self.close() def run_sudo_command(self, command, password): """执行命令""" try: stdin, stdout, stderr = self.exec_command(f"echo \"{password}\" | sudo -S {command}") output = stdout.read().decode('utf-8') return output except Exception as exception: import traceback print(f"SSHClient.run_sudo_command.exception: {exception.__class__.__name__}") print(f"SSHClient.run_sudo_command.traceback: {traceback.format_exc()}") # finally: # self.close() def get_disk_utilization_rate(self, path='/'): """获取根目录磁盘所用空间占比""" try: stdin, stdout, stderr = self.exec_command("df -h") output = stdout.read().decode('utf-8') # --- get utilization_rate by '/' --- utilization_rate = str() for row in output.split('\n'): if not row: continue if row[-1] != '/': continue for val in row.split(' '): if '%' in val: utilization_rate = val break # --- transform --- if '%' in utilization_rate: utilization_rate = int(utilization_rate.strip('%')) else: utilization_rate = 0 return utilization_rate except Exception as exception: import traceback print(f"SSHClient.show_root_disk.exception: {exception.__class__.__name__}") print(f"SSHClient.show_root_disk.traceback: {traceback.format_exc()}") def close(self): self.close() if __name__ == '__main__': # --- init --- # ssh = Client('192.168.30.13', 22, 'fish', 'admin') # ssh = Client('172.18.0.1', 22, 'fish', 'admin') # ssh = Client('192.168.1.45', 22, 'server', 'server') # ssh = Client('192.168.30.13', 22, 'server', 'server') ssh = Client('10.10.10.89', 22, 'nvidia', '123456') # --- test --- out = ssh.get_disk_utilization_rate() print(out) # --- test --- # out = ssh.run_command('date +%Y-%m-%d-%H-%M-%S') # date +"%Y-%m-%d %H:%M:%S" # print(out) # --- test --- # ssh.run_sudo_command('echo -e "\n#x1" >> /etc/network/interfaces', 'admin') # ssh.run_sudo_command('echo -e "\n#x1" >> /etc/network/interfaces', 'server') # ssh.run_command('echo -e "\n#x2" >> /etc/network/interfaces') # ssh.run_command('sudo echo -e "\n#x3" >> /etc/network/interfaces') # out = ssh.run_command("sudo chmod 777 /etc/network/interfaces") # out = ssh.run_command("cat /etc/network/interfaces") # print(out) # --- test --- # out = ssh.run_command("ifconfig") # for row in out.split('\n\n'): # if row[:4] != 'eth0': # continue # for one in row.split('\n'): # one = one.strip() # if one[:4] != 'inet': # continue # if one[:5] == 'inet6': # continue # one = [i for i in one.split(' ') if i] # ipv4, netmask = one[1], one[3] # print(ipv4, netmask) # --- test --- # out = ssh.run_command('ls /home/server/images') # out = ssh.run_command('rm -rf /home/server/images/*') # out = ssh.run_sudo_command('rm -rf /home/server/images/*', 'admin') # print(out) # --- test --- # out0 = ssh.run_sudo_command('chmod 777 /etc/network/interfaces', 'admin') # out1 = ssh.run_command('echo -e "\n#x2" >> /etc/network/interfaces') # out2 = ssh.run_command("cat /etc/network/interfaces") # print(out2) # --- test --- # out = c.run_command("ls -all /home") # out = c.run_command("df -h | grep 234G | awk \'{print $5}\'") # out = c.run_command("df -h | grep \"/dev/vda1\" | awk \'{print $5}\'") # out = c.run_command("df -h ") # print(out) # --- test --- # out = c.run_script(1, 2, 3, 4) # print(repr(out)) # --- test --- # out = c.run_script('/home/UserFiles/hello.sh', '118.190.217.96', 'root', 'Secdeer_01!', 'sh /root/1.sh') # print(repr(out))