123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # update: 2024-3-31-10
- """
- mode:
- # r 只读,默认打开方式,当文件不存在时会报错
- # w 只写,当文件不存在时会自动创建文件,文件内容只能是字符串,只能写入字符串
- # r+ 可读可写,当文件不存在时会报错
- # w+ 可读可写。当文件不存在时会新建
- # a 追加文件,不可读
- # a+ 追加文件,可读可写
- # rb 以二进制读模式打开,只可读
- # rb+ 以二进制写读写模式打开,可读可写,当文件不存在时报错
- # wb 以位进制写模式打开,只可写
- # wb+ 以二进制读写模式打开,可读可写。当文件不存在时新建
- # ab 以二进制追加模式打开,追加文件,不可读
- # ab+ 以二进制读写模式打开,追加文件。可读可写
- """
- import hashlib
- import base64
- import pickle
- import sys
- import os
- def read_bytes(file_path):
- with open(file_path, 'rb') as f:
- return f.read()
- def read_text(file_path):
- with open(file_path, 'r') as f:
- return f.read()
- def write_bytes(path, data=b''):
- with open(path, 'wb') as f:
- f.write(data)
- def write_text(path, data='', mode='w'):
- with open(path, mode) as f:
- f.write(data)
- def write_file(path, data=None, mode='wb'):
- with open(path, mode) as f:
- f.write(data)
- def load_pickle_file(file_path):
- with open(file_path, 'rb+') as f:
- return pickle.load(f)
- def save_pickle_file(file_path, data):
- with open(file_path, 'wb') as f:
- f.write(pickle.dumps(data))
- def get_file_md5(file_path):
- """获取文件md5"""
- m = hashlib.md5()
- with open(file_path, 'rb') as f:
- m.update(f.read())
- return m.hexdigest()
- def get_big_file_md5(file_path, block_size=8 * 1024):
- """获取文件md5(默认使用8KB作为分块大小)"""
- m = hashlib.md5()
- with open(file_path, 'rb') as f:
- while True:
- block = f.read(block_size)
- if not block:
- break
- m.update(block)
- return m.hexdigest()
- def _file_iterator(file_object, block_size=8 * 1024):
- with file_object:
- block = file_object.read(block_size)
- while len(block) > 0:
- yield block
- block = file_object.read(block_size)
- def get_big_file_md5_v2(file_path):
- """获取文件md5(默认使用8KB作为分块大小)"""
- file_object = open(file_path, 'rb')
- blocks = _file_iterator(file_object)
- m = hashlib.md5()
- for block in blocks:
- m.update(block)
- return m.hexdigest()
- def get_file_b64_md5(file_path):
- """获取文件的b64的md5"""
- m = hashlib.md5()
- with open(file_path, 'rb') as f:
- block = f.read()
- encoded = base64.b64encode(block)
- m.update(encoded)
- return m.hexdigest()
- def get_big_file_b64_md5(file_path, block_size=3 * 1024 * 1024):
- """流式获取文件的b64的md5(默认使用3MB作为分块大小)"""
- with open(file_path, 'rb') as f1, open(f'{file_path}.b64', 'wb') as f2:
- while True:
- block = f1.read(block_size)
- if not block:
- break
- b64_data = base64.b64encode(block)
- f2.write(b64_data)
- return get_big_file_md5(f'{file_path}.b64')
- def get_var_size(object, unit='MB'):
- """
- unit: GB/MB/KB/B
- """
- unit_dict = {
- 'GB': 3,
- 'MB': 2,
- 'KB': 1,
- 'B': 0,
- }
- byte = sys.getsizeof(object)
- size = byte / (1024 ** unit_dict.get(unit))
- return float(round(size, 1))
- def get_file_size(file_path, unit='MB'):
- """
- unit: GB/MB/KB/B
- """
- if not is_file(file_path):
- return None
- unit_dict = {
- 'GB': 3,
- 'MB': 2,
- 'KB': 1,
- 'B': 0,
- }
- byte = os.path.getsize(file_path)
- size = byte / (1024 ** unit_dict.get(unit))
- return float(round(size, 1))
- def get_file_path_list(dir_path='/root', path_list=None):
- """
- 获取指定目录下全部文件
- Example:
- get_file_path_list('/opt')
- ['/opt/1.txt', '/opt/2.txt']
- # import glob
- # if os.path.isfile(input_dir):
- # img_list = [input_dir]
- # else:
- # img_list = sorted(glob.glob(os.path.join(input_dir, '*')))
- """
- if not path_list:
- path_list = []
- for path, folders, files in os.walk(dir_path):
- for file_name in files:
- file_path = os.path.join(path, file_name)
- if file_path in path_list:
- continue
- # if file_path.split('.')[-1] == 'pdb':
- # remove_file(file_path)
- # print(f"#file_path: {file_path}")
- path_list.append(file_path)
- for folder_name in folders:
- get_file_path_list(os.path.join(path, folder_name), path_list)
- return path_list
- def mkdir(dir_path):
- """
- 创建文件目录
- Example:
- mkdir('/share/abc/xyz/1.txt')
- True
- """
- if not dir_path:
- return False
- if dir_path[0] != '/':
- dir_path = f"{get_pwd()}/{dir_path}"
- path = '/'
- for part in dir_path.split('/'):
- path += f"{part}/"
- if not os.path.isdir(path):
- os.mkdir(path)
- return True
- def get_pwd():
- """
- 获取当前路径
- """
- path = sys.path[0]
- if not os.path.isdir(path):
- path = os.path.dirname(path)
- return path
- def is_file(file_path):
- """判断文件"""
- return os.path.isfile(file_path)
- def is_dir(file_path):
- """判断目录"""
- return os.path.isdir(file_path)
- def remove_file(file_path):
- """删除文件"""
- return os.remove(file_path)
- def move_file(s_path, d_path):
- """移动文件"""
- return os.rename(s_path, d_path)
- if __name__ == '__main__':
- """
- """
- # --- test ---
- # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp'
- # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoQt'
- # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoQt\thirdparty'
- # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoServer\webrtcinterop\x64\Release'
- # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoQt\webrtcinterop\x64\Release'
- # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoServer'
- file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoQt'
- # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoServer'
- path_list = get_file_path_list(file_dir)
- print(path_list)
- # remove_file(r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoQt\webrtcinterop\x64\Release\webrtcinterop.obj')
|