123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- """
- Tests multithreading behaviour for reading and
- parsing files for each parser defined in parsers.py
- """
- from contextlib import ExitStack
- from io import BytesIO
- from multiprocessing.pool import ThreadPool
- import numpy as np
- import pytest
- import pandas as pd
- from pandas import DataFrame
- import pandas._testing as tm
- # We'll probably always skip these for pyarrow
- # Maybe we'll add our own tests for pyarrow too
- pytestmark = pytest.mark.usefixtures("pyarrow_skip")
- def _construct_dataframe(num_rows):
- """
- Construct a DataFrame for testing.
- Parameters
- ----------
- num_rows : int
- The number of rows for our DataFrame.
- Returns
- -------
- df : DataFrame
- """
- df = DataFrame(np.random.rand(num_rows, 5), columns=list("abcde"))
- df["foo"] = "foo"
- df["bar"] = "bar"
- df["baz"] = "baz"
- df["date"] = pd.date_range("20000101 09:00:00", periods=num_rows, freq="s")
- df["int"] = np.arange(num_rows, dtype="int64")
- return df
- @pytest.mark.slow
- def test_multi_thread_string_io_read_csv(all_parsers):
- # see gh-11786
- parser = all_parsers
- max_row_range = 10000
- num_files = 100
- bytes_to_df = [
- "\n".join([f"{i:d},{i:d},{i:d}" for i in range(max_row_range)]).encode()
- for _ in range(num_files)
- ]
- # Read all files in many threads.
- with ExitStack() as stack:
- files = [stack.enter_context(BytesIO(b)) for b in bytes_to_df]
- pool = stack.enter_context(ThreadPool(8))
- results = pool.map(parser.read_csv, files)
- first_result = results[0]
- for result in results:
- tm.assert_frame_equal(first_result, result)
- def _generate_multi_thread_dataframe(parser, path, num_rows, num_tasks):
- """
- Generate a DataFrame via multi-thread.
- Parameters
- ----------
- parser : BaseParser
- The parser object to use for reading the data.
- path : str
- The location of the CSV file to read.
- num_rows : int
- The number of rows to read per task.
- num_tasks : int
- The number of tasks to use for reading this DataFrame.
- Returns
- -------
- df : DataFrame
- """
- def reader(arg):
- """
- Create a reader for part of the CSV.
- Parameters
- ----------
- arg : tuple
- A tuple of the following:
- * start : int
- The starting row to start for parsing CSV
- * nrows : int
- The number of rows to read.
- Returns
- -------
- df : DataFrame
- """
- start, nrows = arg
- if not start:
- return parser.read_csv(
- path, index_col=0, header=0, nrows=nrows, parse_dates=["date"]
- )
- return parser.read_csv(
- path,
- index_col=0,
- header=None,
- skiprows=int(start) + 1,
- nrows=nrows,
- parse_dates=[9],
- )
- tasks = [
- (num_rows * i // num_tasks, num_rows // num_tasks) for i in range(num_tasks)
- ]
- with ThreadPool(processes=num_tasks) as pool:
- results = pool.map(reader, tasks)
- header = results[0].columns
- for r in results[1:]:
- r.columns = header
- final_dataframe = pd.concat(results)
- return final_dataframe
- @pytest.mark.slow
- def test_multi_thread_path_multipart_read_csv(all_parsers):
- # see gh-11786
- num_tasks = 4
- num_rows = 100000
- parser = all_parsers
- file_name = "__thread_pool_reader__.csv"
- df = _construct_dataframe(num_rows)
- with tm.ensure_clean(file_name) as path:
- df.to_csv(path)
- final_dataframe = _generate_multi_thread_dataframe(
- parser, path, num_rows, num_tasks
- )
- tm.assert_frame_equal(df, final_dataframe)
|