123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- import sys
- import os
- import gc
- import threading
- import numpy as np
- from numpy.testing import assert_equal, assert_, assert_allclose
- from scipy.sparse import (_sparsetools, coo_matrix, csr_matrix, csc_matrix,
- bsr_matrix, dia_matrix)
- from scipy.sparse._sputils import supported_dtypes
- from scipy._lib._testutils import check_free_memory
- import pytest
- from pytest import raises as assert_raises
- def int_to_int8(n):
- """
- Wrap an integer to the interval [-128, 127].
- """
- return (n + 128) % 256 - 128
- def test_exception():
- assert_raises(MemoryError, _sparsetools.test_throw_error)
- def test_threads():
- # Smoke test for parallel threaded execution; doesn't actually
- # check that code runs in parallel, but just that it produces
- # expected results.
- nthreads = 10
- niter = 100
- n = 20
- a = csr_matrix(np.ones([n, n]))
- bres = []
- class Worker(threading.Thread):
- def run(self):
- b = a.copy()
- for j in range(niter):
- _sparsetools.csr_plus_csr(n, n,
- a.indptr, a.indices, a.data,
- a.indptr, a.indices, a.data,
- b.indptr, b.indices, b.data)
- bres.append(b)
- threads = [Worker() for _ in range(nthreads)]
- for thread in threads:
- thread.start()
- for thread in threads:
- thread.join()
- for b in bres:
- assert_(np.all(b.toarray() == 2))
- def test_regression_std_vector_dtypes():
- # Regression test for gh-3780, checking the std::vector typemaps
- # in sparsetools.cxx are complete.
- for dtype in supported_dtypes:
- ad = np.array([[1, 2], [3, 4]]).astype(dtype)
- a = csr_matrix(ad, dtype=dtype)
- # getcol is one function using std::vector typemaps, and should not fail
- assert_equal(a.getcol(0).toarray(), ad[:, :1])
- @pytest.mark.slow
- @pytest.mark.xfail_on_32bit("Can't create large array for test")
- def test_nnz_overflow():
- # Regression test for gh-7230 / gh-7871, checking that coo_toarray
- # with nnz > int32max doesn't overflow.
- nnz = np.iinfo(np.int32).max + 1
- # Ensure ~20 GB of RAM is free to run this test.
- check_free_memory((4 + 4 + 1) * nnz / 1e6 + 0.5)
- # Use nnz duplicate entries to keep the dense version small.
- row = np.zeros(nnz, dtype=np.int32)
- col = np.zeros(nnz, dtype=np.int32)
- data = np.zeros(nnz, dtype=np.int8)
- data[-1] = 4
- s = coo_matrix((data, (row, col)), shape=(1, 1), copy=False)
- # Sums nnz duplicates to produce a 1x1 array containing 4.
- d = s.toarray()
- assert_allclose(d, [[4]])
- @pytest.mark.skipif(not (sys.platform.startswith('linux') and np.dtype(np.intp).itemsize >= 8),
- reason="test requires 64-bit Linux")
- class TestInt32Overflow:
- """
- Some of the sparsetools routines use dense 2D matrices whose
- total size is not bounded by the nnz of the sparse matrix. These
- routines used to suffer from int32 wraparounds; here, we try to
- check that the wraparounds don't occur any more.
- """
- # choose n large enough
- n = 50000
- def setup_method(self):
- assert self.n**2 > np.iinfo(np.int32).max
- # check there's enough memory even if everything is run at the
- # same time
- try:
- parallel_count = int(os.environ.get('PYTEST_XDIST_WORKER_COUNT', '1'))
- except ValueError:
- parallel_count = np.inf
- check_free_memory(3000 * parallel_count)
- def teardown_method(self):
- gc.collect()
- def test_coo_todense(self):
- # Check *_todense routines (cf. gh-2179)
- #
- # All of them in the end call coo_matrix.todense
- n = self.n
- i = np.array([0, n-1])
- j = np.array([0, n-1])
- data = np.array([1, 2], dtype=np.int8)
- m = coo_matrix((data, (i, j)))
- r = m.todense()
- assert_equal(r[0,0], 1)
- assert_equal(r[-1,-1], 2)
- del r
- gc.collect()
- @pytest.mark.slow
- def test_matvecs(self):
- # Check *_matvecs routines
- n = self.n
- i = np.array([0, n-1])
- j = np.array([0, n-1])
- data = np.array([1, 2], dtype=np.int8)
- m = coo_matrix((data, (i, j)))
- b = np.ones((n, n), dtype=np.int8)
- for sptype in (csr_matrix, csc_matrix, bsr_matrix):
- m2 = sptype(m)
- r = m2.dot(b)
- assert_equal(r[0,0], 1)
- assert_equal(r[-1,-1], 2)
- del r
- gc.collect()
- del b
- gc.collect()
- @pytest.mark.slow
- def test_dia_matvec(self):
- # Check: huge dia_matrix _matvec
- n = self.n
- data = np.ones((n, n), dtype=np.int8)
- offsets = np.arange(n)
- m = dia_matrix((data, offsets), shape=(n, n))
- v = np.ones(m.shape[1], dtype=np.int8)
- r = m.dot(v)
- assert_equal(r[0], int_to_int8(n))
- del data, offsets, m, v, r
- gc.collect()
- _bsr_ops = [pytest.param("matmat", marks=pytest.mark.xslow),
- pytest.param("matvecs", marks=pytest.mark.xslow),
- "matvec",
- "diagonal",
- "sort_indices",
- pytest.param("transpose", marks=pytest.mark.xslow)]
- @pytest.mark.slow
- @pytest.mark.parametrize("op", _bsr_ops)
- def test_bsr_1_block(self, op):
- # Check: huge bsr_matrix (1-block)
- #
- # The point here is that indices inside a block may overflow.
- def get_matrix():
- n = self.n
- data = np.ones((1, n, n), dtype=np.int8)
- indptr = np.array([0, 1], dtype=np.int32)
- indices = np.array([0], dtype=np.int32)
- m = bsr_matrix((data, indices, indptr), blocksize=(n, n), copy=False)
- del data, indptr, indices
- return m
- gc.collect()
- try:
- getattr(self, "_check_bsr_" + op)(get_matrix)
- finally:
- gc.collect()
- @pytest.mark.slow
- @pytest.mark.parametrize("op", _bsr_ops)
- def test_bsr_n_block(self, op):
- # Check: huge bsr_matrix (n-block)
- #
- # The point here is that while indices within a block don't
- # overflow, accumulators across many block may.
- def get_matrix():
- n = self.n
- data = np.ones((n, n, 1), dtype=np.int8)
- indptr = np.array([0, n], dtype=np.int32)
- indices = np.arange(n, dtype=np.int32)
- m = bsr_matrix((data, indices, indptr), blocksize=(n, 1), copy=False)
- del data, indptr, indices
- return m
- gc.collect()
- try:
- getattr(self, "_check_bsr_" + op)(get_matrix)
- finally:
- gc.collect()
- def _check_bsr_matvecs(self, m):
- m = m()
- n = self.n
- # _matvecs
- r = m.dot(np.ones((n, 2), dtype=np.int8))
- assert_equal(r[0, 0], int_to_int8(n))
- def _check_bsr_matvec(self, m):
- m = m()
- n = self.n
- # _matvec
- r = m.dot(np.ones((n,), dtype=np.int8))
- assert_equal(r[0], int_to_int8(n))
- def _check_bsr_diagonal(self, m):
- m = m()
- n = self.n
- # _diagonal
- r = m.diagonal()
- assert_equal(r, np.ones(n))
- def _check_bsr_sort_indices(self, m):
- # _sort_indices
- m = m()
- m.sort_indices()
- def _check_bsr_transpose(self, m):
- # _transpose
- m = m()
- m.transpose()
- def _check_bsr_matmat(self, m):
- m = m()
- n = self.n
- # _bsr_matmat
- m2 = bsr_matrix(np.ones((n, 2), dtype=np.int8), blocksize=(m.blocksize[1], 2))
- m.dot(m2) # shouldn't SIGSEGV
- del m2
- # _bsr_matmat
- m2 = bsr_matrix(np.ones((2, n), dtype=np.int8), blocksize=(2, m.blocksize[0]))
- m2.dot(m) # shouldn't SIGSEGV
- @pytest.mark.skip(reason="64-bit indices in sparse matrices not available")
- def test_csr_matmat_int64_overflow():
- n = 3037000500
- assert n**2 > np.iinfo(np.int64).max
- # the test would take crazy amounts of memory
- check_free_memory(n * (8*2 + 1) * 3 / 1e6)
- # int64 overflow
- data = np.ones((n,), dtype=np.int8)
- indptr = np.arange(n+1, dtype=np.int64)
- indices = np.zeros(n, dtype=np.int64)
- a = csr_matrix((data, indices, indptr))
- b = a.T
- assert_raises(RuntimeError, a.dot, b)
- def test_upcast():
- a0 = csr_matrix([[np.pi, np.pi*1j], [3, 4]], dtype=complex)
- b0 = np.array([256+1j, 2**32], dtype=complex)
- for a_dtype in supported_dtypes:
- for b_dtype in supported_dtypes:
- msg = "(%r, %r)" % (a_dtype, b_dtype)
- if np.issubdtype(a_dtype, np.complexfloating):
- a = a0.copy().astype(a_dtype)
- else:
- a = a0.real.copy().astype(a_dtype)
- if np.issubdtype(b_dtype, np.complexfloating):
- b = b0.copy().astype(b_dtype)
- else:
- with np.errstate(invalid="ignore"):
- # Casting a large value (2**32) to int8 causes a warning in
- # numpy >1.23
- b = b0.real.copy().astype(b_dtype)
- if not (a_dtype == np.bool_ and b_dtype == np.bool_):
- c = np.zeros((2,), dtype=np.bool_)
- assert_raises(ValueError, _sparsetools.csr_matvec,
- 2, 2, a.indptr, a.indices, a.data, b, c)
- if ((np.issubdtype(a_dtype, np.complexfloating) and
- not np.issubdtype(b_dtype, np.complexfloating)) or
- (not np.issubdtype(a_dtype, np.complexfloating) and
- np.issubdtype(b_dtype, np.complexfloating))):
- c = np.zeros((2,), dtype=np.float64)
- assert_raises(ValueError, _sparsetools.csr_matvec,
- 2, 2, a.indptr, a.indices, a.data, b, c)
- c = np.zeros((2,), dtype=np.result_type(a_dtype, b_dtype))
- _sparsetools.csr_matvec(2, 2, a.indptr, a.indices, a.data, b, c)
- assert_allclose(c, np.dot(a.toarray(), b), err_msg=msg)
- def test_endianness():
- d = np.ones((3,4))
- offsets = [-1,0,1]
- a = dia_matrix((d.astype('<f8'), offsets), (4, 4))
- b = dia_matrix((d.astype('>f8'), offsets), (4, 4))
- v = np.arange(4)
- assert_allclose(a.dot(v), [1, 3, 6, 5])
- assert_allclose(b.dot(v), [1, 3, 6, 5])
|