xredis.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. # update: 2021-7-14-11
  2. import redis
  3. import pickle
  4. class Client(redis.StrictRedis):
  5. def __init__(self, host='fra-middleware-redis', port=6379, db=0, password='', channels=[]):
  6. """
  7. 内部访问:
  8. host: fra-middleware-redis
  9. port: 6379
  10. 远程访问:
  11. host: 118.190.217.96、192.168.20.162
  12. port: 7070
  13. """
  14. self.host = host
  15. self.port = port
  16. self.db = db
  17. super().__init__(host=self.host, port=self.port, db=self.db)
  18. self.channels = channels
  19. if self.channels:
  20. """
  21. dir(self.pubsub()):
  22. ['HEALTH_CHECK_MESSAGE', 'PUBLISH_MESSAGE_TYPES', 'UNSUBSCRIBE_MESSAGE_TYPES', 'channels', 'check_health',
  23. 'close', 'connection', 'connection_pool', 'encoder', 'execute_command', 'get_message', 'handle_message',
  24. 'health_check_response', 'ignore_subscribe_messages', 'listen', 'on_connect', 'parse_response', 'patterns',
  25. 'pending_unsubscribe_channels', 'pending_unsubscribe_patterns', 'ping', 'psubscribe', 'punsubscribe',
  26. 'reset', 'run_in_thread', 'shard_hint', 'subscribe', 'subscribed', 'unsubscribe']
  27. """
  28. self.channel_pubsub = self.pubsub()
  29. self.channel_pubsub.psubscribe(channels)
  30. # self.channel_listen = self.channel_pubsub.listen()
  31. def get_config(self):
  32. """
  33. CONFIG GET|SET: 分别用 config_get 和 config_set 实现。
  34. """
  35. return self.config_get()
  36. def set_config(self, name, value):
  37. """
  38. CONFIG GET|SET: 分别用 config_get 和 config_set 实现。
  39. """
  40. return self.config_set(name=name, value=value)
  41. def get_elements(self, key):
  42. """获取列表元素"""
  43. return self.lrange(key, 0, -1)
  44. def get_elements_length(self, key):
  45. """获取列表长度"""
  46. return self.llen(key)
  47. def pop_element(self, key, position='RightEnd'):
  48. """
  49. 弹出元素
  50. position: 出队位置 LeftEnd 左端 RightEnd 右端
  51. """
  52. _dict = {
  53. 'LeftEnd': 'lpop',
  54. 'RightEnd': 'rpop',
  55. }
  56. func = getattr(self, _dict.get(position))
  57. pickle_data = func(key)
  58. element = pickle.loads(pickle_data) if pickle_data else None
  59. return element
  60. def add_element(self, key, element, position='RightEnd'):
  61. """
  62. 增加元素
  63. position: 入队位置 LeftEnd 左端 RightEnd 右端
  64. """
  65. _dict = {
  66. 'LeftEnd': 'lpush',
  67. 'RightEnd': 'rpush',
  68. }
  69. func = getattr(self, _dict.get(position))
  70. pickle_data = pickle.dumps(element)
  71. elements_length = func(key, pickle_data)
  72. return elements_length
  73. def set_one(self, key, data, expire_time=None):
  74. """压缩存储"""
  75. pickle_data = pickle.dumps(data)
  76. return self.set(key, pickle_data, ex=expire_time)
  77. def get_one(self, key):
  78. """解压获取"""
  79. pickle_data = self.get(key)
  80. return pickle.loads(pickle_data) if pickle_data else None
  81. def push_one(self, channel, data):
  82. """
  83. 推送一条
  84. """
  85. return self.publish(channel, pickle.dumps(data))
  86. def pull_one(self, channel):
  87. """
  88. 拉取一条
  89. msg = self.channel_pubsub.parse_response(block=False, timeout=60)
  90. msg = self.channel_pubsub.parse_response(block=False)
  91. [b'pmessage', b'cctv1', b'cctv1', b'3']
  92. msg = self.channel_pubsub.get_message()
  93. {'type': 'pmessage', 'pattern': b'cctv1', 'channel': b'cctv1', 'data': b'asdfasdfasdf'}
  94. """
  95. message = self.channel_pubsub.get_message()
  96. channel = channel.encode()
  97. if message and message.get('pattern') and message.get('channel') == channel:
  98. return pickle.loads(message.get('data'))
  99. def pull_one_by_channel(self, channel):
  100. """
  101. 拉取一条
  102. """
  103. message = self.channel_pubsub.get_message()
  104. channel = channel.encode()
  105. if message and message.get('pattern') and message.get('channel') == channel:
  106. return pickle.loads(message.get('data'))
  107. def pull_one_by_channels(self):
  108. """
  109. 拉取一条
  110. """
  111. message = self.channel_pubsub.get_message()
  112. channels = [i.encode() for i in self.channels]
  113. if message and message.get('pattern') and message.get('channel') in channels:
  114. return pickle.loads(message.get('data'))
  115. def pull_one_by_channels_v2(self, channels):
  116. """
  117. 拉取一条
  118. """
  119. message = self.channel_pubsub.get_message()
  120. channels = [i.encode() for i in channels]
  121. if message and message.get('pattern') and message.get('channel') in channels:
  122. return pickle.loads(message.get('data'))
  123. def pull_one_by_channels_v3(self, channels):
  124. """
  125. 拉取一条(性能不佳)
  126. """
  127. self.channel_pubsub.psubscribe(channels)
  128. message = self.channel_pubsub.get_message()
  129. channels = [i.encode() for i in channels]
  130. if message and message.get('pattern') and message.get('channel') in channels:
  131. return pickle.loads(message.get('data'))
  132. def print_all(self):
  133. """打印"""
  134. for key in self.keys():
  135. key_type = self.type(key)
  136. # if key_type != b"stream":
  137. # continue
  138. if key_type == b'string':
  139. value = self.get(key)
  140. elif key_type == b'list':
  141. value = self.lrange(key, 0, -1)
  142. elif key_type == b'set':
  143. value = self.smembers(key)
  144. elif key_type == b'zset':
  145. value = self.zrange(key, 0, -1)
  146. elif key_type == b"hash":
  147. value = self.hgetall(key)
  148. elif key_type == b"stream":
  149. value = self.xread({key: b"0-0"})
  150. else:
  151. value = None
  152. print(f"type: {key_type} | key: {key}")
  153. # print(f"type: {key_type} | key: {key} | value: {value}")
  154. def filter_keys(self):
  155. re_list = []
  156. for key in self.keys():
  157. if self.type(key) != b'string':
  158. continue
  159. if b'warn' not in key:
  160. continue
  161. re_list.append(key)
  162. return re_list
  163. def test(self):
  164. while True:
  165. msg = self.pull()
  166. if msg:
  167. print(msg)
  168. if __name__ == '__main__':
  169. # --- init ---
  170. # db0 = Client(host='192.168.30.49', port=7070, channels=['cctv1', 'cctv2'])
  171. db0 = Client(host='192.168.30.59', port=7070)
  172. # db0 = Client(host='192.168.1.190', port=7070)
  173. # db0 = Client(host='fra-middleware-redis', port=6379)
  174. # --- test ---
  175. out = db0.get_elements_length('tasklist1')
  176. print(out)
  177. # --- test --
  178. # db0.set_config('client-output-buffer-limit', 'normal 0 0 0 slave 268435456 67108864 60 pubsub 0 0 0')
  179. # out = db0.get_config()
  180. # print(out)
  181. # --- test ---
  182. # db0.push_one('cctv1', 'asdfasdfasdf')
  183. # --- test ---
  184. # db0.set_object('compare_face_task', [1, 2, 3])
  185. # --- test ---
  186. # out = db0.get_object('compare_face_task')
  187. # print(out)
  188. # --- test ---
  189. # db0.hset(name='test001', key='test002', value='test003')
  190. # out = db0.hgetall(name='test001')
  191. # print(out)
  192. # --- test ---
  193. # db0.print_all()
  194. # --- test ---
  195. # db0.flushdb()
  196. # --- test ---
  197. # import pickle
  198. # out = db0.get('real_cam')
  199. # out = pickle.loads(out)
  200. # print(out)
  201. # --- test ---
  202. # import pickle
  203. # out = db0.get('webcam')
  204. # out = pickle.loads(out)
  205. # print(out)
  206. # --- test ---
  207. # import pickle
  208. # config = {
  209. # 'aass': 12,
  210. # '1': {
  211. # '2': {
  212. # }
  213. # }
  214. # }
  215. #
  216. # config = pickle.dumps(config)
  217. # db0.set('webcam', config, ex=1000)
  218. # out = db0.get('webcam')
  219. # out = pickle.loads(out)
  220. # print(out)
  221. # --- test ---
  222. # out = db0.filter_keys()
  223. # print(out)
  224. # out = db0.get(out[0])
  225. # print(out)
  226. # --- test ---
  227. # db0.delete('warn:192.168.1.68:1604928475')
  228. # db0.delete('warn:192.168.1.68:1604928695')
  229. # db0.delete('warn:192.168.1.68:1604928479')
  230. # db0.delete('warn:192.168.1.68:1604928680')
  231. # db0.delete('warn:192.168.1.68:1604928491')
  232. # db0.print_all()
  233. # --- test ---
  234. # import pickle
  235. # data = pickle.dumps(0)
  236. # db0.set('warn:192.168.1.68:<last_at>', data, ex=1000*86400)
  237. # out = db0.get(f'warn:192.168.1.68:<last_at>')
  238. # print(out)
  239. # --- test ---
  240. # import pickle
  241. # data = {
  242. # 'last_at': 112233,
  243. # 'image': pickle.dumps({}),
  244. # }
  245. # db0.hmset(name='192.168.1.68:admin:HuaWei123:<find_at>', mapping=data)
  246. # out = db0.hmget(name='192.168.1.68:admin:HuaWei123:<find_at>', keys='last_at')
  247. # print(out)
  248. # out = db0.hget(name='192.168.1.68:admin:HuaWei123:<find_at>', key='last_at')
  249. # print(out)
  250. # --- test ---
  251. # db0.set('food', b'mutton', ex=1000)
  252. # print(type(db0.get('food')))
  253. # print(repr(db0.get('food')))
  254. # db0.print_all()
  255. # --- test ---
  256. # data = db0.get('192.168.1.68:admin:HuaWei123')
  257. # import numpy as np
  258. # data = np.frombuffer(data)
  259. # print(repr(data))
  260. # --- test ---
  261. # data = db0.get('192.168.1.68:admin:HuaWei123')
  262. # import pickle
  263. # data = pickle.loads(data)
  264. # import cv2
  265. # cv2.imshow('data', data)
  266. # cv2.waitKey(0)