test_memmap.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. import sys
  2. import os
  3. import mmap
  4. import pytest
  5. from pathlib import Path
  6. from tempfile import NamedTemporaryFile, TemporaryFile
  7. from numpy import (
  8. memmap, sum, average, product, ndarray, isscalar, add, subtract, multiply)
  9. from numpy import arange, allclose, asarray
  10. from numpy.testing import (
  11. assert_, assert_equal, assert_array_equal, suppress_warnings, IS_PYPY,
  12. break_cycles
  13. )
  14. class TestMemmap:
  15. def setup_method(self):
  16. self.tmpfp = NamedTemporaryFile(prefix='mmap')
  17. self.shape = (3, 4)
  18. self.dtype = 'float32'
  19. self.data = arange(12, dtype=self.dtype)
  20. self.data.resize(self.shape)
  21. def teardown_method(self):
  22. self.tmpfp.close()
  23. self.data = None
  24. if IS_PYPY:
  25. break_cycles()
  26. break_cycles()
  27. def test_roundtrip(self):
  28. # Write data to file
  29. fp = memmap(self.tmpfp, dtype=self.dtype, mode='w+',
  30. shape=self.shape)
  31. fp[:] = self.data[:]
  32. del fp # Test __del__ machinery, which handles cleanup
  33. # Read data back from file
  34. newfp = memmap(self.tmpfp, dtype=self.dtype, mode='r',
  35. shape=self.shape)
  36. assert_(allclose(self.data, newfp))
  37. assert_array_equal(self.data, newfp)
  38. assert_equal(newfp.flags.writeable, False)
  39. def test_open_with_filename(self, tmp_path):
  40. tmpname = tmp_path / 'mmap'
  41. fp = memmap(tmpname, dtype=self.dtype, mode='w+',
  42. shape=self.shape)
  43. fp[:] = self.data[:]
  44. del fp
  45. def test_unnamed_file(self):
  46. with TemporaryFile() as f:
  47. fp = memmap(f, dtype=self.dtype, shape=self.shape)
  48. del fp
  49. def test_attributes(self):
  50. offset = 1
  51. mode = "w+"
  52. fp = memmap(self.tmpfp, dtype=self.dtype, mode=mode,
  53. shape=self.shape, offset=offset)
  54. assert_equal(offset, fp.offset)
  55. assert_equal(mode, fp.mode)
  56. del fp
  57. def test_filename(self, tmp_path):
  58. tmpname = tmp_path / "mmap"
  59. fp = memmap(tmpname, dtype=self.dtype, mode='w+',
  60. shape=self.shape)
  61. abspath = Path(os.path.abspath(tmpname))
  62. fp[:] = self.data[:]
  63. assert_equal(abspath, fp.filename)
  64. b = fp[:1]
  65. assert_equal(abspath, b.filename)
  66. del b
  67. del fp
  68. def test_path(self, tmp_path):
  69. tmpname = tmp_path / "mmap"
  70. fp = memmap(Path(tmpname), dtype=self.dtype, mode='w+',
  71. shape=self.shape)
  72. # os.path.realpath does not resolve symlinks on Windows
  73. # see: https://bugs.python.org/issue9949
  74. # use Path.resolve, just as memmap class does internally
  75. abspath = str(Path(tmpname).resolve())
  76. fp[:] = self.data[:]
  77. assert_equal(abspath, str(fp.filename.resolve()))
  78. b = fp[:1]
  79. assert_equal(abspath, str(b.filename.resolve()))
  80. del b
  81. del fp
  82. def test_filename_fileobj(self):
  83. fp = memmap(self.tmpfp, dtype=self.dtype, mode="w+",
  84. shape=self.shape)
  85. assert_equal(fp.filename, self.tmpfp.name)
  86. @pytest.mark.skipif(sys.platform == 'gnu0',
  87. reason="Known to fail on hurd")
  88. def test_flush(self):
  89. fp = memmap(self.tmpfp, dtype=self.dtype, mode='w+',
  90. shape=self.shape)
  91. fp[:] = self.data[:]
  92. assert_equal(fp[0], self.data[0])
  93. fp.flush()
  94. def test_del(self):
  95. # Make sure a view does not delete the underlying mmap
  96. fp_base = memmap(self.tmpfp, dtype=self.dtype, mode='w+',
  97. shape=self.shape)
  98. fp_base[0] = 5
  99. fp_view = fp_base[0:1]
  100. assert_equal(fp_view[0], 5)
  101. del fp_view
  102. # Should still be able to access and assign values after
  103. # deleting the view
  104. assert_equal(fp_base[0], 5)
  105. fp_base[0] = 6
  106. assert_equal(fp_base[0], 6)
  107. def test_arithmetic_drops_references(self):
  108. fp = memmap(self.tmpfp, dtype=self.dtype, mode='w+',
  109. shape=self.shape)
  110. tmp = (fp + 10)
  111. if isinstance(tmp, memmap):
  112. assert_(tmp._mmap is not fp._mmap)
  113. def test_indexing_drops_references(self):
  114. fp = memmap(self.tmpfp, dtype=self.dtype, mode='w+',
  115. shape=self.shape)
  116. tmp = fp[(1, 2), (2, 3)]
  117. if isinstance(tmp, memmap):
  118. assert_(tmp._mmap is not fp._mmap)
  119. def test_slicing_keeps_references(self):
  120. fp = memmap(self.tmpfp, dtype=self.dtype, mode='w+',
  121. shape=self.shape)
  122. assert_(fp[:2, :2]._mmap is fp._mmap)
  123. def test_view(self):
  124. fp = memmap(self.tmpfp, dtype=self.dtype, shape=self.shape)
  125. new1 = fp.view()
  126. new2 = new1.view()
  127. assert_(new1.base is fp)
  128. assert_(new2.base is fp)
  129. new_array = asarray(fp)
  130. assert_(new_array.base is fp)
  131. def test_ufunc_return_ndarray(self):
  132. fp = memmap(self.tmpfp, dtype=self.dtype, shape=self.shape)
  133. fp[:] = self.data
  134. with suppress_warnings() as sup:
  135. sup.filter(FutureWarning, "np.average currently does not preserve")
  136. for unary_op in [sum, average, product]:
  137. result = unary_op(fp)
  138. assert_(isscalar(result))
  139. assert_(result.__class__ is self.data[0, 0].__class__)
  140. assert_(unary_op(fp, axis=0).__class__ is ndarray)
  141. assert_(unary_op(fp, axis=1).__class__ is ndarray)
  142. for binary_op in [add, subtract, multiply]:
  143. assert_(binary_op(fp, self.data).__class__ is ndarray)
  144. assert_(binary_op(self.data, fp).__class__ is ndarray)
  145. assert_(binary_op(fp, fp).__class__ is ndarray)
  146. fp += 1
  147. assert(fp.__class__ is memmap)
  148. add(fp, 1, out=fp)
  149. assert(fp.__class__ is memmap)
  150. def test_getitem(self):
  151. fp = memmap(self.tmpfp, dtype=self.dtype, shape=self.shape)
  152. fp[:] = self.data
  153. assert_(fp[1:, :-1].__class__ is memmap)
  154. # Fancy indexing returns a copy that is not memmapped
  155. assert_(fp[[0, 1]].__class__ is ndarray)
  156. def test_memmap_subclass(self):
  157. class MemmapSubClass(memmap):
  158. pass
  159. fp = MemmapSubClass(self.tmpfp, dtype=self.dtype, shape=self.shape)
  160. fp[:] = self.data
  161. # We keep previous behavior for subclasses of memmap, i.e. the
  162. # ufunc and __getitem__ output is never turned into a ndarray
  163. assert_(sum(fp, axis=0).__class__ is MemmapSubClass)
  164. assert_(sum(fp).__class__ is MemmapSubClass)
  165. assert_(fp[1:, :-1].__class__ is MemmapSubClass)
  166. assert(fp[[0, 1]].__class__ is MemmapSubClass)
  167. def test_mmap_offset_greater_than_allocation_granularity(self):
  168. size = 5 * mmap.ALLOCATIONGRANULARITY
  169. offset = mmap.ALLOCATIONGRANULARITY + 1
  170. fp = memmap(self.tmpfp, shape=size, mode='w+', offset=offset)
  171. assert_(fp.offset == offset)
  172. def test_no_shape(self):
  173. self.tmpfp.write(b'a'*16)
  174. mm = memmap(self.tmpfp, dtype='float64')
  175. assert_equal(mm.shape, (2,))
  176. def test_empty_array(self):
  177. # gh-12653
  178. with pytest.raises(ValueError, match='empty file'):
  179. memmap(self.tmpfp, shape=(0,4), mode='w+')
  180. self.tmpfp.write(b'\0')
  181. # ok now the file is not empty
  182. memmap(self.tmpfp, shape=(0,4), mode='w+')