123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- # update: 2021-7-14-11
- import redis
- import pickle
- class Client(redis.StrictRedis):
- def __init__(self, host='fra-middleware-redis', port=6379, db=0, password='', channels=[]):
- """
- 内部访问:
- host: fra-middleware-redis
- port: 6379
- 远程访问:
- host: 118.190.217.96、192.168.20.162
- port: 7070
- """
- self.host = host
- self.port = port
- self.db = db
- super().__init__(host=self.host, port=self.port, db=self.db)
- self.channels = channels
- if self.channels:
- """
- dir(self.pubsub()):
- ['HEALTH_CHECK_MESSAGE', 'PUBLISH_MESSAGE_TYPES', 'UNSUBSCRIBE_MESSAGE_TYPES', 'channels', 'check_health',
- 'close', 'connection', 'connection_pool', 'encoder', 'execute_command', 'get_message', 'handle_message',
- 'health_check_response', 'ignore_subscribe_messages', 'listen', 'on_connect', 'parse_response', 'patterns',
- 'pending_unsubscribe_channels', 'pending_unsubscribe_patterns', 'ping', 'psubscribe', 'punsubscribe',
- 'reset', 'run_in_thread', 'shard_hint', 'subscribe', 'subscribed', 'unsubscribe']
- """
- self.channel_pubsub = self.pubsub()
- self.channel_pubsub.psubscribe(channels)
- # self.channel_listen = self.channel_pubsub.listen()
- def get_config(self):
- """
- CONFIG GET|SET: 分别用 config_get 和 config_set 实现。
- """
- return self.config_get()
- def set_config(self, name, value):
- """
- CONFIG GET|SET: 分别用 config_get 和 config_set 实现。
- """
- return self.config_set(name=name, value=value)
- def get_elements(self, key):
- """获取列表元素"""
- return self.lrange(key, 0, -1)
- def get_elements_length(self, key):
- """获取列表长度"""
- return self.llen(key)
- def pop_element(self, key, position='RightEnd'):
- """
- 弹出元素
- position: 出队位置 LeftEnd 左端 RightEnd 右端
- """
- _dict = {
- 'LeftEnd': 'lpop',
- 'RightEnd': 'rpop',
- }
- func = getattr(self, _dict.get(position))
- pickle_data = func(key)
- element = pickle.loads(pickle_data) if pickle_data else None
- return element
- def add_element(self, key, element, position='RightEnd'):
- """
- 增加元素
- position: 入队位置 LeftEnd 左端 RightEnd 右端
- """
- _dict = {
- 'LeftEnd': 'lpush',
- 'RightEnd': 'rpush',
- }
- func = getattr(self, _dict.get(position))
- pickle_data = pickle.dumps(element)
- elements_length = func(key, pickle_data)
- return elements_length
- def set_one(self, key, data, expire_time=None):
- """压缩存储"""
- pickle_data = pickle.dumps(data)
- return self.set(key, pickle_data, ex=expire_time)
- def get_one(self, key):
- """解压获取"""
- pickle_data = self.get(key)
- return pickle.loads(pickle_data) if pickle_data else None
- def push_one(self, channel, data):
- """
- 推送一条
- """
- return self.publish(channel, pickle.dumps(data))
- def pull_one(self, channel):
- """
- 拉取一条
- msg = self.channel_pubsub.parse_response(block=False, timeout=60)
- msg = self.channel_pubsub.parse_response(block=False)
- [b'pmessage', b'cctv1', b'cctv1', b'3']
- msg = self.channel_pubsub.get_message()
- {'type': 'pmessage', 'pattern': b'cctv1', 'channel': b'cctv1', 'data': b'asdfasdfasdf'}
- """
- message = self.channel_pubsub.get_message()
- channel = channel.encode()
- if message and message.get('pattern') and message.get('channel') == channel:
- return pickle.loads(message.get('data'))
- def pull_one_by_channel(self, channel):
- """
- 拉取一条
- """
- message = self.channel_pubsub.get_message()
- channel = channel.encode()
- if message and message.get('pattern') and message.get('channel') == channel:
- return pickle.loads(message.get('data'))
- def pull_one_by_channels(self):
- """
- 拉取一条
- """
- message = self.channel_pubsub.get_message()
- channels = [i.encode() for i in self.channels]
- if message and message.get('pattern') and message.get('channel') in channels:
- return pickle.loads(message.get('data'))
- def pull_one_by_channels_v2(self, channels):
- """
- 拉取一条
- """
- message = self.channel_pubsub.get_message()
- channels = [i.encode() for i in channels]
- if message and message.get('pattern') and message.get('channel') in channels:
- return pickle.loads(message.get('data'))
- def pull_one_by_channels_v3(self, channels):
- """
- 拉取一条(性能不佳)
- """
- self.channel_pubsub.psubscribe(channels)
- message = self.channel_pubsub.get_message()
- channels = [i.encode() for i in channels]
- if message and message.get('pattern') and message.get('channel') in channels:
- return pickle.loads(message.get('data'))
- def print_all(self):
- """打印"""
- for key in self.keys():
- key_type = self.type(key)
- # if key_type != b"stream":
- # continue
- if key_type == b'string':
- value = self.get(key)
- elif key_type == b'list':
- value = self.lrange(key, 0, -1)
- elif key_type == b'set':
- value = self.smembers(key)
- elif key_type == b'zset':
- value = self.zrange(key, 0, -1)
- elif key_type == b"hash":
- value = self.hgetall(key)
- elif key_type == b"stream":
- value = self.xread({key: b"0-0"})
- else:
- value = None
- print(f"type: {key_type} | key: {key}")
- # print(f"type: {key_type} | key: {key} | value: {value}")
- def filter_keys(self):
- re_list = []
- for key in self.keys():
- if self.type(key) != b'string':
- continue
- if b'warn' not in key:
- continue
- re_list.append(key)
- return re_list
- def test(self):
- while True:
- msg = self.pull()
- if msg:
- print(msg)
- if __name__ == '__main__':
- # --- init ---
- # db0 = Client(host='192.168.30.49', port=7070, channels=['cctv1', 'cctv2'])
- db0 = Client(host='192.168.30.59', port=7070)
- # db0 = Client(host='192.168.1.190', port=7070)
- # db0 = Client(host='fra-middleware-redis', port=6379)
- # --- test ---
- out = db0.get_elements_length('tasklist1')
- print(out)
- # --- test --
- # db0.set_config('client-output-buffer-limit', 'normal 0 0 0 slave 268435456 67108864 60 pubsub 0 0 0')
- # out = db0.get_config()
- # print(out)
- # --- test ---
- # db0.push_one('cctv1', 'asdfasdfasdf')
- # --- test ---
- # db0.set_object('compare_face_task', [1, 2, 3])
- # --- test ---
- # out = db0.get_object('compare_face_task')
- # print(out)
- # --- test ---
- # db0.hset(name='test001', key='test002', value='test003')
- # out = db0.hgetall(name='test001')
- # print(out)
- # --- test ---
- # db0.print_all()
- # --- test ---
- # db0.flushdb()
- # --- test ---
- # import pickle
- # out = db0.get('real_cam')
- # out = pickle.loads(out)
- # print(out)
- # --- test ---
- # import pickle
- # out = db0.get('webcam')
- # out = pickle.loads(out)
- # print(out)
- # --- test ---
- # import pickle
- # config = {
- # 'aass': 12,
- # '1': {
- # '2': {
- # }
- # }
- # }
- #
- # config = pickle.dumps(config)
- # db0.set('webcam', config, ex=1000)
- # out = db0.get('webcam')
- # out = pickle.loads(out)
- # print(out)
- # --- test ---
- # out = db0.filter_keys()
- # print(out)
- # out = db0.get(out[0])
- # print(out)
- # --- test ---
- # db0.delete('warn:192.168.1.68:1604928475')
- # db0.delete('warn:192.168.1.68:1604928695')
- # db0.delete('warn:192.168.1.68:1604928479')
- # db0.delete('warn:192.168.1.68:1604928680')
- # db0.delete('warn:192.168.1.68:1604928491')
- # db0.print_all()
- # --- test ---
- # import pickle
- # data = pickle.dumps(0)
- # db0.set('warn:192.168.1.68:<last_at>', data, ex=1000*86400)
- # out = db0.get(f'warn:192.168.1.68:<last_at>')
- # print(out)
- # --- test ---
- # import pickle
- # data = {
- # 'last_at': 112233,
- # 'image': pickle.dumps({}),
- # }
- # db0.hmset(name='192.168.1.68:admin:HuaWei123:<find_at>', mapping=data)
- # out = db0.hmget(name='192.168.1.68:admin:HuaWei123:<find_at>', keys='last_at')
- # print(out)
- # out = db0.hget(name='192.168.1.68:admin:HuaWei123:<find_at>', key='last_at')
- # print(out)
- # --- test ---
- # db0.set('food', b'mutton', ex=1000)
- # print(type(db0.get('food')))
- # print(repr(db0.get('food')))
- # db0.print_all()
- # --- test ---
- # data = db0.get('192.168.1.68:admin:HuaWei123')
- # import numpy as np
- # data = np.frombuffer(data)
- # print(repr(data))
- # --- test ---
- # data = db0.get('192.168.1.68:admin:HuaWei123')
- # import pickle
- # data = pickle.loads(data)
- # import cv2
- # cv2.imshow('data', data)
- # cv2.waitKey(0)
|