123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- """
- https://www.jianshu.com/p/512a5641b501
- # --- 上传脚本 ---
- sftp = s.open_sftp()
- sftp.put('../test.sh', '/data/test.sh')
- sftp.close()
- # --- 执行脚本并删除 ---
- docker exec zelu_8891 bash -c "python /home/server/projects/python-admin/zelu/libraries/protocol_l4/ssh/client.py"
- """
- import paramiko
- def test():
- host_ip = '192.168.30.13'
- username = 'fish'
- password = 'admin'
- client = paramiko.SSHClient()
- try:
-
- client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- client.connect(hostname=host_ip, port=22, username=username, password=password)
-
- stdin, stdout, stderr = client.exec_command('ls -all /home')
-
- print('--- --- --- ---')
- print(stdout.read().decode('utf-8'))
- except Exception as e:
- print(e)
- finally:
- client.close()
- class Client(paramiko.SSHClient):
- def __init__(self, ipv4='172.18.0.1', port=22, username='fish', password='admin'):
- super().__init__()
- self.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self.connect(hostname=ipv4, port=port, username=username, password=password)
- def run_script(self, script_path, host_ip, username, password, command='sh /root/1.sh && cat /home/out.txt'):
- """执行上传脚本"""
-
-
-
-
-
- self.connect(hostname=host_ip, port=22, username=username, password=password)
- sftp = self.open_sftp()
- sftp.put(script_path, '/root/1.sh')
- sftp.close()
-
-
-
- stdin, stdout, stderr = self.exec_command(command)
- output = stdout.read().decode('utf-8')
- print(output)
- print('--- --- --- ---')
- return output
- def run_command(self, command):
- """执行命令"""
- try:
- stdin, stdout, stderr = self.exec_command(command)
- output = stdout.read().decode('utf-8')
- return output
- except Exception as exception:
- import traceback
- print(f"SSHClient.run_command.exception: {exception.__class__.__name__}")
- print(f"SSHClient.run_command.traceback: {traceback.format_exc()}")
-
-
- def run_sudo_command(self, command, password):
- """执行命令"""
- try:
- stdin, stdout, stderr = self.exec_command(f"echo \"{password}\" | sudo -S {command}")
- output = stdout.read().decode('utf-8')
- return output
- except Exception as exception:
- import traceback
- print(f"SSHClient.run_sudo_command.exception: {exception.__class__.__name__}")
- print(f"SSHClient.run_sudo_command.traceback: {traceback.format_exc()}")
-
-
- def get_disk_utilization_rate(self, path='/'):
- """获取根目录磁盘所用空间占比"""
- try:
- stdin, stdout, stderr = self.exec_command("df -h")
- output = stdout.read().decode('utf-8')
-
- utilization_rate = str()
- for row in output.split('\n'):
- if not row:
- continue
- if row[-1] != '/':
- continue
- for val in row.split(' '):
- if '%' in val:
- utilization_rate = val
- break
-
- if '%' in utilization_rate:
- utilization_rate = int(utilization_rate.strip('%'))
- else:
- utilization_rate = 0
- return utilization_rate
- except Exception as exception:
- import traceback
- print(f"SSHClient.show_root_disk.exception: {exception.__class__.__name__}")
- print(f"SSHClient.show_root_disk.traceback: {traceback.format_exc()}")
- def close(self):
- self.close()
- if __name__ == '__main__':
-
-
-
-
-
- ssh = Client('10.10.10.89', 22, 'nvidia', '123456')
-
- out = ssh.get_disk_utilization_rate()
- print(out)
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|