client.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. #!/usr/bin/env python2
  2. # -*- coding: utf-8 -*-
  3. import os, shutil, time, glob
  4. import argparse, multiprocessing
  5. import requests
  6. import common
  7. logger = common.genlogger()
  8. def getargs():
  9. parser = argparse.ArgumentParser()
  10. parser.add_argument("--ip", default="localhost")
  11. parser.add_argument("--port", default="3000")
  12. parser.add_argument("--dir", default=common.TMP_PATH)
  13. parser.add_argument("--threads", type=int, default=4)
  14. parser.add_argument("--retry", default=5)
  15. parser.add_argument("--wait", default=10) # [sec]
  16. parser.add_argument("--rename", action="store_true")
  17. args = parser.parse_args()
  18. args.dir = common.normpath(args.dir)
  19. return args
  20. def status(url):
  21. return requests.get(url).json()
  22. def upload(url, path):
  23. # post form
  24. utime = common.getutime()
  25. hsh = common.genhash(path)
  26. form = { "utime": utime, "hash": hsh }
  27. # post files (single data)
  28. files = { "file": open(path, 'rb') }
  29. # upload
  30. logger.info("upload start: {}".format(path))
  31. logger.debug("unixtime: {}, hash: {}".format(utime, hsh))
  32. return requests.post(url, data=form, files=files).json()
  33. def worker(url, path, retry=-1, wait=1, rename=False, rnfmt="{}.uploaded"):
  34. for i in xrange(retry+1):
  35. try:
  36. res = upload(url, path)
  37. if not res["success"]: raise Exception
  38. logger.info("upload complete: {}".format(path))
  39. if rename: # rename mode
  40. dstpath = rnfmt.format(path)
  41. shutil.move(path, dstpath)
  42. logger.info("rename file -> {}".format(dstpath))
  43. else: # remove mode
  44. os.remove(path)
  45. logger.info("remove file: {}".format(path))
  46. return
  47. except Exception as e:
  48. logger.info("upload error: {}".format(e))
  49. logger.info("retry upload ({}): {}".format(i, path))
  50. time.sleep(wait)
  51. logger.info("failed upload!: {}".format(path))
  52. if __name__ == "__main__":
  53. args = getargs()
  54. url = "http://{}:{}".format(args.ip, args.port)
  55. urlup = "{}/upload".format(url)
  56. urlst = "{}/status".format(url)
  57. logger.info("URL = {}".format(url))
  58. logger.info("watching directory = {}".format(args.dir))
  59. # check connection
  60. while True:
  61. try:
  62. # use to check connection only
  63. res = status(urlst)
  64. logger.info("connection success, status = {}".format(res))
  65. break
  66. except Exception as e:
  67. logger.error("connection error = {}".format(e))
  68. time.sleep(args.wait)
  69. # watch and upload
  70. queue = {} # { path : { job, started } }
  71. while True:
  72. time.sleep(args.wait) # loop rate
  73. # watch direcory
  74. paths = glob.glob("{}/{}".format(args.dir, "*.bag"))
  75. if len(paths) == 0:
  76. logger.info("no file ...")
  77. continue
  78. # prepare jobs
  79. for path in paths:
  80. path = common.normpath(path)
  81. if not path in queue.keys():
  82. process = multiprocessing.Process(target=worker,
  83. args=(urlup, path, args.retry, args.wait, args.rename,))
  84. queue[path] = { "process": process, "started": False }
  85. # start jobs
  86. exeq = queue.items()[:args.threads]
  87. logger.info("queue size = {}, threads = {}".format(len(queue), len(exeq)))
  88. for path in [ path for path, job in exeq if not job["started"] ]:
  89. queue[path]["process"].start()
  90. queue[path]["started"] = True
  91. # delete finished jobs
  92. [ queue.pop(path) for path, job, in queue.items()
  93. if not job["process"].is_alive() and job["started"] ]