123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- #!/usr/bin/env python2
- # -*- coding: utf-8 -*-
- import os, shutil, time, glob
- import argparse, multiprocessing
- import requests
- import common
- logger = common.genlogger()
- def getargs():
- parser = argparse.ArgumentParser()
- parser.add_argument("--ip", default="localhost")
- parser.add_argument("--port", default="3000")
- parser.add_argument("--dir", default=common.TMP_PATH)
- parser.add_argument("--threads", type=int, default=4)
- parser.add_argument("--retry", default=5)
- parser.add_argument("--wait", default=10) # [sec]
- parser.add_argument("--rename", action="store_true")
- args = parser.parse_args()
- args.dir = common.normpath(args.dir)
- return args
- def status(url):
- return requests.get(url).json()
- def upload(url, path):
- # post form
- utime = common.getutime()
- hsh = common.genhash(path)
- form = { "utime": utime, "hash": hsh }
- # post files (single data)
- files = { "file": open(path, 'rb') }
- # upload
- logger.info("upload start: {}".format(path))
- logger.debug("unixtime: {}, hash: {}".format(utime, hsh))
- return requests.post(url, data=form, files=files).json()
- def worker(url, path, retry=-1, wait=1, rename=False, rnfmt="{}.uploaded"):
- for i in xrange(retry+1):
- try:
- res = upload(url, path)
- if not res["success"]: raise Exception
- logger.info("upload complete: {}".format(path))
- if rename: # rename mode
- dstpath = rnfmt.format(path)
- shutil.move(path, dstpath)
- logger.info("rename file -> {}".format(dstpath))
- else: # remove mode
- os.remove(path)
- logger.info("remove file: {}".format(path))
- return
- except Exception as e:
- logger.info("upload error: {}".format(e))
- logger.info("retry upload ({}): {}".format(i, path))
- time.sleep(wait)
- logger.info("failed upload!: {}".format(path))
- if __name__ == "__main__":
- args = getargs()
- url = "http://{}:{}".format(args.ip, args.port)
- urlup = "{}/upload".format(url)
- urlst = "{}/status".format(url)
- logger.info("URL = {}".format(url))
- logger.info("watching directory = {}".format(args.dir))
- # check connection
- while True:
- try:
- # use to check connection only
- res = status(urlst)
- logger.info("connection success, status = {}".format(res))
- break
- except Exception as e:
- logger.error("connection error = {}".format(e))
- time.sleep(args.wait)
- # watch and upload
- queue = {} # { path : { job, started } }
- while True:
- time.sleep(args.wait) # loop rate
- # watch direcory
- paths = glob.glob("{}/{}".format(args.dir, "*.bag"))
- if len(paths) == 0:
- logger.info("no file ...")
- continue
- # prepare jobs
- for path in paths:
- path = common.normpath(path)
- if not path in queue.keys():
- process = multiprocessing.Process(target=worker,
- args=(urlup, path, args.retry, args.wait, args.rename,))
- queue[path] = { "process": process, "started": False }
- # start jobs
- exeq = queue.items()[:args.threads]
- logger.info("queue size = {}, threads = {}".format(len(queue), len(exeq)))
- for path in [ path for path, job in exeq if not job["started"] ]:
- queue[path]["process"].start()
- queue[path]["started"] = True
- # delete finished jobs
- [ queue.pop(path) for path, job, in queue.items()
- if not job["process"].is_alive() and job["started"] ]
|