proc_manager.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. # Copyright 2015-2019 Autoware Foundation
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #!/usr/bin/env python
  15. import os
  16. import sys
  17. import signal
  18. import socket
  19. import yaml
  20. import ctypes
  21. import psutil
  22. import select
  23. import re
  24. import pickle
  25. import threading
  26. import multiprocessing
  27. import time # for *debug*
  28. #import hashlib
  29. libc = ctypes.CDLL("libc.so.6")
  30. PR_CAPBSET_DROP=24
  31. SOCK_PATH="/tmp/autoware_proc_manager"
  32. class ProcManager:
  33. def __init__(self):
  34. self.sock = socket.socket(socket.AF_UNIX)
  35. try:
  36. os.unlink(SOCK_PATH)
  37. except:
  38. pass
  39. self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  40. self.sock.bind(SOCK_PATH)
  41. self.sock.listen(10)
  42. os.chmod(SOCK_PATH, 0777)
  43. def set_nice(self, pid, value):
  44. try:
  45. proc = psutil.Process(pid)
  46. except Exception as e:
  47. print("Error construct psutil.Process(pid={})".format(pid))
  48. return -1
  49. try:
  50. proc.set_nice(value)
  51. except AttributeError:
  52. proc.nice(value)
  53. except Exception as e:
  54. print("Error set_nice: {}".format(e))
  55. return -1
  56. print("[set_nice] pid={}, value={} ".format(
  57. pid, value))
  58. return 0
  59. def set_cpu_affinity(self, pid, cpus):
  60. try:
  61. proc = psutil.Process(pid)
  62. except psutil.NoSuchProcess as e:
  63. return
  64. print("[CPU affinity] Set pid:{}, CPUS: {}: ".format(proc.pid, cpus))
  65. ret = 0
  66. try:
  67. proc.set_cpu_affinity(cpus)
  68. except AttributeError:
  69. proc.cpu_affinity(cpus)
  70. except Exception:
  71. ret = -1
  72. return ret
  73. def _policy_to_string(self, policy):
  74. if policy == 0:
  75. return "SCHED_OTHER"
  76. elif policy == 1:
  77. return "SCHED_FIFO"
  78. elif policy == 2:
  79. return "SCHED_RR"
  80. else:
  81. raise ValueError("Invalid schedule policy argument")
  82. def set_scheduling_policy(self, pid, policy, priority):
  83. print("[sched_setscheduler] pid={}, priority={} ".format(
  84. pid, self._policy_to_string(policy), priority))
  85. param = ctypes.c_int(priority)
  86. err = libc.sched_setscheduler(pid, ctypes.c_int(policy), ctypes.byref(param))
  87. return err
  88. def _set_sched_switch(self, t):
  89. f = open('/sys/kernel/debug/tracing/events/sched/sched_switch/enable', 'w')
  90. f.write('1' if t else '0')
  91. f.close()
  92. def _set_ftrace(self, t):
  93. f = open('/sys/kernel/debug/tracing/tracing_on', 'w')
  94. f.write('1' if t else '0')
  95. f.close()
  96. def _ftrace(self, sec, pids=[]):
  97. opid = [0] * multiprocessing.cpu_count()
  98. ret = {}
  99. for cpuno in range(0, multiprocessing.cpu_count()):
  100. ret[cpuno] = []
  101. wsec = sec
  102. f = open('/sys/kernel/debug/tracing/trace_pipe', 'r')
  103. time.sleep(wsec)
  104. while True:
  105. (r, _, _) = select.select([f], [], [], 0)
  106. if len(r) <= 0:
  107. break
  108. l = f.readline()
  109. m = re.match('^.* \[([0-9]*)\].* ([0-9]*\.[0-9]*): .*==> next_comm=.* next_pid=([0-9]*) next.*$', l)
  110. if m is None:
  111. continue
  112. cpuno = int(m.group(1))
  113. t = float(m.group(2))
  114. pid = int(m.group(3))
  115. if stime == 0:
  116. stime = t
  117. if pid not in pids and pid > 0:
  118. pid = 0 # idle...
  119. #if pid != opid[cpuno] or pid in pids or opid[cpuno] in pids:
  120. if pid != opid[cpuno]:
  121. dat = [pid, t]
  122. ret[cpuno].append(dat)
  123. opid[cpuno] = pid
  124. f.close()
  125. return ret
  126. def _filterNodePid(self, pids):
  127. f = open('/sys/kernel/debug/tracing/set_ftrace_pid','w')
  128. f.close()
  129. for pid in pids:
  130. f = open('/sys/kernel/debug/tracing/set_ftrace_pid','a')
  131. f.write(str(pid))
  132. f.close()
  133. def get_ftrace(self, sec, pids):
  134. st = time.time() # for *debug*
  135. self._ftrace(0)
  136. self._filterNodePid(pids)
  137. self._set_sched_switch(True)
  138. self._set_ftrace(True)
  139. ret = self._ftrace(1, pids)
  140. self._set_ftrace(False)
  141. self._set_sched_switch(False)
  142. self._ftrace(0)
  143. et = time.time() - st # for *debug*
  144. print "* ftrace", et, "sec" # for *debug*
  145. return ret
  146. def get_ftrace_cont(self, conn, interval, pids):
  147. self._filterNodePid(pids)
  148. self._set_sched_switch(True)
  149. self._set_ftrace(True)
  150. f = open('/sys/kernel/debug/tracing/trace_pipe', 'r')
  151. while True:
  152. opid = [0] * multiprocessing.cpu_count()
  153. ret = {}
  154. for cpuno in range(0, multiprocessing.cpu_count()):
  155. ret[cpuno] = []
  156. time.sleep(interval)
  157. while True:
  158. (r, _, _) = select.select([f], [], [], 0)
  159. if len(r) <= 0:
  160. break
  161. l = f.readline()
  162. m = re.match('^.* \[([0-9]*)\].* ([0-9]*\.[0-9]*): .*==> next_comm=.* next_pid=([0-9]*) next.*$', l)
  163. if m is None:
  164. continue
  165. cpuno = int(m.group(1))
  166. t = float(m.group(2))
  167. pid = int(m.group(3))
  168. if pid not in pids and pid > 0:
  169. pid = 0 # idle...
  170. #if pid != opid[cpuno] or pid in pids or opid[cpuno] in pids:
  171. if pid != opid[cpuno]:
  172. dat = [pid, t]
  173. ret[cpuno].append(dat)
  174. opid[cpuno] = pid
  175. dat = pickle.dumps(ret)
  176. slen = 0
  177. try:
  178. while slen < len(dat):
  179. slen += conn.send(dat[slen:])
  180. except socket.error:
  181. print "ftrace disconnected"
  182. break
  183. f.close()
  184. self._set_ftrace(False)
  185. self._set_sched_switch(False)
  186. conn.close()
  187. def run(self):
  188. while True:
  189. conn, addr = self.sock.accept()
  190. data = conn.recv(4096)
  191. order = yaml.load(data)
  192. ret = 0
  193. if order['name'] == 'nice':
  194. ret = self.set_nice(order['pid'], order['nice'])
  195. elif order['name'] == 'cpu_affinity':
  196. ret = self.set_cpu_affinity(order['pid'], order['cpus'])
  197. elif order['name'] == 'scheduling_policy':
  198. ret = self.set_scheduling_policy(order['pid'],
  199. order['policy'],
  200. order['priority'])
  201. elif order['name'] == 'ftrace':
  202. ret = self.get_ftrace(order['sec'], order['pids'])
  203. elif order['name'] == 'ftrace_cont':
  204. th = threading.Thread(target=self.get_ftrace_cont,
  205. name="ftrace_cont",
  206. args=(conn, order['interval'], order['pids']))
  207. th.start()
  208. continue
  209. elif order['name'] == 'shutdown':
  210. conn.send(str.encode("0"))
  211. conn.close()
  212. print("[proc_manager.py] Shutdown process manager")
  213. break
  214. else:
  215. print("Error: unknown operation key: '{}'".format(order['name']))
  216. ret = -1
  217. if isinstance(ret, (int, long)):
  218. conn.send(str.encode(str(ret)))
  219. else:
  220. st = time.time() # for *debug*
  221. #dat = yaml.dump(ret) ## too slow!
  222. dat = pickle.dumps(ret)
  223. tt = time.time() - st # for *debug*
  224. print "** dump", tt, "sec"
  225. slen = 0
  226. try:
  227. while slen < len(dat):
  228. slen += conn.send(dat[slen:])
  229. except socket.error:
  230. print 'socket failed'
  231. tt = time.time() - st # for *debug*
  232. print "** sent", tt, "sec, size", len(dat)
  233. #print "** md5", hashlib.md5(dat).hexdigest()
  234. conn.close()
  235. def cap_last_cap():
  236. last_cap = 0
  237. with open("/proc/sys/kernel/cap_last_cap", "r") as f:
  238. last_cap = int(f.read())
  239. return last_cap
  240. def drop_capabilities():
  241. KEEP_CAPS = [6, 7, 23] # CAP_SETUID, CAP_SETGID, CAP_SYS_NICE
  242. for cap in range(0, cap_last_cap()+1):
  243. if cap not in KEEP_CAPS:
  244. libc.prctl(PR_CAPBSET_DROP, cap)
  245. def get_cpu_count():
  246. try:
  247. return psutil.NUM_CPUS
  248. except AttributeError:
  249. return psutil.cpu_count()
  250. if __name__ == "__main__":
  251. if os.getuid() != 0:
  252. print("You must run runtime manger as root user")
  253. sys.exit(-1)
  254. drop_capabilities()
  255. manager = ProcManager()
  256. manager.run()