# update: 2021-7-14-11 import redis import pickle class Client(redis.StrictRedis): def __init__(self, host='fra-middleware-redis', port=6379, db=0, password='', channels=[]): """ 内部访问: host: fra-middleware-redis port: 6379 远程访问: host: 118.190.217.96、192.168.20.162 port: 7070 """ self.host = host self.port = port self.db = db super().__init__(host=self.host, port=self.port, db=self.db) self.channels = channels if self.channels: """ dir(self.pubsub()): ['HEALTH_CHECK_MESSAGE', 'PUBLISH_MESSAGE_TYPES', 'UNSUBSCRIBE_MESSAGE_TYPES', 'channels', 'check_health', 'close', 'connection', 'connection_pool', 'encoder', 'execute_command', 'get_message', 'handle_message', 'health_check_response', 'ignore_subscribe_messages', 'listen', 'on_connect', 'parse_response', 'patterns', 'pending_unsubscribe_channels', 'pending_unsubscribe_patterns', 'ping', 'psubscribe', 'punsubscribe', 'reset', 'run_in_thread', 'shard_hint', 'subscribe', 'subscribed', 'unsubscribe'] """ self.channel_pubsub = self.pubsub() self.channel_pubsub.psubscribe(channels) # self.channel_listen = self.channel_pubsub.listen() def get_config(self): """ CONFIG GET|SET: 分别用 config_get 和 config_set 实现。 """ return self.config_get() def set_config(self, name, value): """ CONFIG GET|SET: 分别用 config_get 和 config_set 实现。 """ return self.config_set(name=name, value=value) def get_elements(self, key): """获取列表元素""" return self.lrange(key, 0, -1) def get_elements_length(self, key): """获取列表长度""" return self.llen(key) def pop_element(self, key, position='RightEnd'): """ 弹出元素 position: 出队位置 LeftEnd 左端 RightEnd 右端 """ _dict = { 'LeftEnd': 'lpop', 'RightEnd': 'rpop', } func = getattr(self, _dict.get(position)) pickle_data = func(key) element = pickle.loads(pickle_data) if pickle_data else None return element def add_element(self, key, element, position='RightEnd'): """ 增加元素 position: 入队位置 LeftEnd 左端 RightEnd 右端 """ _dict = { 'LeftEnd': 'lpush', 'RightEnd': 'rpush', } func = getattr(self, _dict.get(position)) pickle_data = pickle.dumps(element) elements_length = func(key, pickle_data) return elements_length def set_one(self, key, data, expire_time=None): """压缩存储""" pickle_data = pickle.dumps(data) return self.set(key, pickle_data, ex=expire_time) def get_one(self, key): """解压获取""" pickle_data = self.get(key) return pickle.loads(pickle_data) if pickle_data else None def push_one(self, channel, data): """ 推送一条 """ return self.publish(channel, pickle.dumps(data)) def pull_one(self, channel): """ 拉取一条 msg = self.channel_pubsub.parse_response(block=False, timeout=60) msg = self.channel_pubsub.parse_response(block=False) [b'pmessage', b'cctv1', b'cctv1', b'3'] msg = self.channel_pubsub.get_message() {'type': 'pmessage', 'pattern': b'cctv1', 'channel': b'cctv1', 'data': b'asdfasdfasdf'} """ message = self.channel_pubsub.get_message() channel = channel.encode() if message and message.get('pattern') and message.get('channel') == channel: return pickle.loads(message.get('data')) def pull_one_by_channel(self, channel): """ 拉取一条 """ message = self.channel_pubsub.get_message() channel = channel.encode() if message and message.get('pattern') and message.get('channel') == channel: return pickle.loads(message.get('data')) def pull_one_by_channels(self): """ 拉取一条 """ message = self.channel_pubsub.get_message() channels = [i.encode() for i in self.channels] if message and message.get('pattern') and message.get('channel') in channels: return pickle.loads(message.get('data')) def pull_one_by_channels_v2(self, channels): """ 拉取一条 """ message = self.channel_pubsub.get_message() channels = [i.encode() for i in channels] if message and message.get('pattern') and message.get('channel') in channels: return pickle.loads(message.get('data')) def pull_one_by_channels_v3(self, channels): """ 拉取一条(性能不佳) """ self.channel_pubsub.psubscribe(channels) message = self.channel_pubsub.get_message() channels = [i.encode() for i in channels] if message and message.get('pattern') and message.get('channel') in channels: return pickle.loads(message.get('data')) def print_all(self): """打印""" for key in self.keys(): key_type = self.type(key) # if key_type != b"stream": # continue if key_type == b'string': value = self.get(key) elif key_type == b'list': value = self.lrange(key, 0, -1) elif key_type == b'set': value = self.smembers(key) elif key_type == b'zset': value = self.zrange(key, 0, -1) elif key_type == b"hash": value = self.hgetall(key) elif key_type == b"stream": value = self.xread({key: b"0-0"}) else: value = None print(f"type: {key_type} | key: {key}") # print(f"type: {key_type} | key: {key} | value: {value}") def filter_keys(self): re_list = [] for key in self.keys(): if self.type(key) != b'string': continue if b'warn' not in key: continue re_list.append(key) return re_list def test(self): while True: msg = self.pull() if msg: print(msg) if __name__ == '__main__': # --- init --- # db0 = Client(host='192.168.30.49', port=7070, channels=['cctv1', 'cctv2']) db0 = Client(host='192.168.30.59', port=7070) # db0 = Client(host='192.168.1.190', port=7070) # db0 = Client(host='fra-middleware-redis', port=6379) # --- test --- out = db0.get_elements_length('tasklist1') print(out) # --- test -- # db0.set_config('client-output-buffer-limit', 'normal 0 0 0 slave 268435456 67108864 60 pubsub 0 0 0') # out = db0.get_config() # print(out) # --- test --- # db0.push_one('cctv1', 'asdfasdfasdf') # --- test --- # db0.set_object('compare_face_task', [1, 2, 3]) # --- test --- # out = db0.get_object('compare_face_task') # print(out) # --- test --- # db0.hset(name='test001', key='test002', value='test003') # out = db0.hgetall(name='test001') # print(out) # --- test --- # db0.print_all() # --- test --- # db0.flushdb() # --- test --- # import pickle # out = db0.get('real_cam') # out = pickle.loads(out) # print(out) # --- test --- # import pickle # out = db0.get('webcam') # out = pickle.loads(out) # print(out) # --- test --- # import pickle # config = { # 'aass': 12, # '1': { # '2': { # } # } # } # # config = pickle.dumps(config) # db0.set('webcam', config, ex=1000) # out = db0.get('webcam') # out = pickle.loads(out) # print(out) # --- test --- # out = db0.filter_keys() # print(out) # out = db0.get(out[0]) # print(out) # --- test --- # db0.delete('warn:192.168.1.68:1604928475') # db0.delete('warn:192.168.1.68:1604928695') # db0.delete('warn:192.168.1.68:1604928479') # db0.delete('warn:192.168.1.68:1604928680') # db0.delete('warn:192.168.1.68:1604928491') # db0.print_all() # --- test --- # import pickle # data = pickle.dumps(0) # db0.set('warn:192.168.1.68:', data, ex=1000*86400) # out = db0.get(f'warn:192.168.1.68:') # print(out) # --- test --- # import pickle # data = { # 'last_at': 112233, # 'image': pickle.dumps({}), # } # db0.hmset(name='192.168.1.68:admin:HuaWei123:', mapping=data) # out = db0.hmget(name='192.168.1.68:admin:HuaWei123:', keys='last_at') # print(out) # out = db0.hget(name='192.168.1.68:admin:HuaWei123:', key='last_at') # print(out) # --- test --- # db0.set('food', b'mutton', ex=1000) # print(type(db0.get('food'))) # print(repr(db0.get('food'))) # db0.print_all() # --- test --- # data = db0.get('192.168.1.68:admin:HuaWei123') # import numpy as np # data = np.frombuffer(data) # print(repr(data)) # --- test --- # data = db0.get('192.168.1.68:admin:HuaWei123') # import pickle # data = pickle.loads(data) # import cv2 # cv2.imshow('data', data) # cv2.waitKey(0)