test_mapped_queue.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. import pytest
  2. from networkx.utils.mapped_queue import MappedQueue, _HeapElement
  3. def test_HeapElement_gtlt():
  4. bar = _HeapElement(1.1, "a")
  5. foo = _HeapElement(1, "b")
  6. assert foo < bar
  7. assert bar > foo
  8. assert foo < 1.1
  9. assert 1 < bar
  10. def test_HeapElement_gtlt_tied_priority():
  11. bar = _HeapElement(1, "a")
  12. foo = _HeapElement(1, "b")
  13. assert foo > bar
  14. assert bar < foo
  15. def test_HeapElement_eq():
  16. bar = _HeapElement(1.1, "a")
  17. foo = _HeapElement(1, "a")
  18. assert foo == bar
  19. assert bar == foo
  20. assert foo == "a"
  21. def test_HeapElement_iter():
  22. foo = _HeapElement(1, "a")
  23. bar = _HeapElement(1.1, (3, 2, 1))
  24. assert list(foo) == [1, "a"]
  25. assert list(bar) == [1.1, 3, 2, 1]
  26. def test_HeapElement_getitem():
  27. foo = _HeapElement(1, "a")
  28. bar = _HeapElement(1.1, (3, 2, 1))
  29. assert foo[1] == "a"
  30. assert foo[0] == 1
  31. assert bar[0] == 1.1
  32. assert bar[2] == 2
  33. assert bar[3] == 1
  34. pytest.raises(IndexError, bar.__getitem__, 4)
  35. pytest.raises(IndexError, foo.__getitem__, 2)
  36. class TestMappedQueue:
  37. def setup_method(self):
  38. pass
  39. def _check_map(self, q):
  40. assert q.position == {elt: pos for pos, elt in enumerate(q.heap)}
  41. def _make_mapped_queue(self, h):
  42. q = MappedQueue()
  43. q.heap = h
  44. q.position = {elt: pos for pos, elt in enumerate(h)}
  45. return q
  46. def test_heapify(self):
  47. h = [5, 4, 3, 2, 1, 0]
  48. q = self._make_mapped_queue(h)
  49. q._heapify()
  50. self._check_map(q)
  51. def test_init(self):
  52. h = [5, 4, 3, 2, 1, 0]
  53. q = MappedQueue(h)
  54. self._check_map(q)
  55. def test_incomparable(self):
  56. h = [5, 4, "a", 2, 1, 0]
  57. pytest.raises(TypeError, MappedQueue, h)
  58. def test_len(self):
  59. h = [5, 4, 3, 2, 1, 0]
  60. q = MappedQueue(h)
  61. self._check_map(q)
  62. assert len(q) == 6
  63. def test_siftup_leaf(self):
  64. h = [2]
  65. h_sifted = [2]
  66. q = self._make_mapped_queue(h)
  67. q._siftup(0)
  68. assert q.heap == h_sifted
  69. self._check_map(q)
  70. def test_siftup_one_child(self):
  71. h = [2, 0]
  72. h_sifted = [0, 2]
  73. q = self._make_mapped_queue(h)
  74. q._siftup(0)
  75. assert q.heap == h_sifted
  76. self._check_map(q)
  77. def test_siftup_left_child(self):
  78. h = [2, 0, 1]
  79. h_sifted = [0, 2, 1]
  80. q = self._make_mapped_queue(h)
  81. q._siftup(0)
  82. assert q.heap == h_sifted
  83. self._check_map(q)
  84. def test_siftup_right_child(self):
  85. h = [2, 1, 0]
  86. h_sifted = [0, 1, 2]
  87. q = self._make_mapped_queue(h)
  88. q._siftup(0)
  89. assert q.heap == h_sifted
  90. self._check_map(q)
  91. def test_siftup_multiple(self):
  92. h = [0, 1, 2, 4, 3, 5, 6]
  93. h_sifted = [0, 1, 2, 4, 3, 5, 6]
  94. q = self._make_mapped_queue(h)
  95. q._siftup(0)
  96. assert q.heap == h_sifted
  97. self._check_map(q)
  98. def test_siftdown_leaf(self):
  99. h = [2]
  100. h_sifted = [2]
  101. q = self._make_mapped_queue(h)
  102. q._siftdown(0, 0)
  103. assert q.heap == h_sifted
  104. self._check_map(q)
  105. def test_siftdown_single(self):
  106. h = [1, 0]
  107. h_sifted = [0, 1]
  108. q = self._make_mapped_queue(h)
  109. q._siftdown(0, len(h) - 1)
  110. assert q.heap == h_sifted
  111. self._check_map(q)
  112. def test_siftdown_multiple(self):
  113. h = [1, 2, 3, 4, 5, 6, 7, 0]
  114. h_sifted = [0, 1, 3, 2, 5, 6, 7, 4]
  115. q = self._make_mapped_queue(h)
  116. q._siftdown(0, len(h) - 1)
  117. assert q.heap == h_sifted
  118. self._check_map(q)
  119. def test_push(self):
  120. to_push = [6, 1, 4, 3, 2, 5, 0]
  121. h_sifted = [0, 2, 1, 6, 3, 5, 4]
  122. q = MappedQueue()
  123. for elt in to_push:
  124. q.push(elt)
  125. assert q.heap == h_sifted
  126. self._check_map(q)
  127. def test_push_duplicate(self):
  128. to_push = [2, 1, 0]
  129. h_sifted = [0, 2, 1]
  130. q = MappedQueue()
  131. for elt in to_push:
  132. inserted = q.push(elt)
  133. assert inserted
  134. assert q.heap == h_sifted
  135. self._check_map(q)
  136. inserted = q.push(1)
  137. assert not inserted
  138. def test_pop(self):
  139. h = [3, 4, 6, 0, 1, 2, 5]
  140. h_sorted = sorted(h)
  141. q = self._make_mapped_queue(h)
  142. q._heapify()
  143. popped = [q.pop() for _ in range(len(h))]
  144. assert popped == h_sorted
  145. self._check_map(q)
  146. def test_remove_leaf(self):
  147. h = [0, 2, 1, 6, 3, 5, 4]
  148. h_removed = [0, 2, 1, 6, 4, 5]
  149. q = self._make_mapped_queue(h)
  150. removed = q.remove(3)
  151. assert q.heap == h_removed
  152. def test_remove_root(self):
  153. h = [0, 2, 1, 6, 3, 5, 4]
  154. h_removed = [1, 2, 4, 6, 3, 5]
  155. q = self._make_mapped_queue(h)
  156. removed = q.remove(0)
  157. assert q.heap == h_removed
  158. def test_update_leaf(self):
  159. h = [0, 20, 10, 60, 30, 50, 40]
  160. h_updated = [0, 15, 10, 60, 20, 50, 40]
  161. q = self._make_mapped_queue(h)
  162. removed = q.update(30, 15)
  163. assert q.heap == h_updated
  164. def test_update_root(self):
  165. h = [0, 20, 10, 60, 30, 50, 40]
  166. h_updated = [10, 20, 35, 60, 30, 50, 40]
  167. q = self._make_mapped_queue(h)
  168. removed = q.update(0, 35)
  169. assert q.heap == h_updated
  170. class TestMappedDict(TestMappedQueue):
  171. def _make_mapped_queue(self, h):
  172. priority_dict = {elt: elt for elt in h}
  173. return MappedQueue(priority_dict)
  174. def test_init(self):
  175. d = {5: 0, 4: 1, "a": 2, 2: 3, 1: 4}
  176. q = MappedQueue(d)
  177. assert q.position == d
  178. def test_ties(self):
  179. d = {5: 0, 4: 1, 3: 2, 2: 3, 1: 4}
  180. q = MappedQueue(d)
  181. assert q.position == {elt: pos for pos, elt in enumerate(q.heap)}
  182. def test_pop(self):
  183. d = {5: 0, 4: 1, 3: 2, 2: 3, 1: 4}
  184. q = MappedQueue(d)
  185. assert q.pop() == _HeapElement(0, 5)
  186. assert q.position == {elt: pos for pos, elt in enumerate(q.heap)}
  187. def test_empty_pop(self):
  188. q = MappedQueue()
  189. pytest.raises(IndexError, q.pop)
  190. def test_incomparable_ties(self):
  191. d = {5: 0, 4: 0, "a": 0, 2: 0, 1: 0}
  192. pytest.raises(TypeError, MappedQueue, d)
  193. def test_push(self):
  194. to_push = [6, 1, 4, 3, 2, 5, 0]
  195. h_sifted = [0, 2, 1, 6, 3, 5, 4]
  196. q = MappedQueue()
  197. for elt in to_push:
  198. q.push(elt, priority=elt)
  199. assert q.heap == h_sifted
  200. self._check_map(q)
  201. def test_push_duplicate(self):
  202. to_push = [2, 1, 0]
  203. h_sifted = [0, 2, 1]
  204. q = MappedQueue()
  205. for elt in to_push:
  206. inserted = q.push(elt, priority=elt)
  207. assert inserted
  208. assert q.heap == h_sifted
  209. self._check_map(q)
  210. inserted = q.push(1, priority=1)
  211. assert not inserted
  212. def test_update_leaf(self):
  213. h = [0, 20, 10, 60, 30, 50, 40]
  214. h_updated = [0, 15, 10, 60, 20, 50, 40]
  215. q = self._make_mapped_queue(h)
  216. removed = q.update(30, 15, priority=15)
  217. assert q.heap == h_updated
  218. def test_update_root(self):
  219. h = [0, 20, 10, 60, 30, 50, 40]
  220. h_updated = [10, 20, 35, 60, 30, 50, 40]
  221. q = self._make_mapped_queue(h)
  222. removed = q.update(0, 35, priority=35)
  223. assert q.heap == h_updated