thread_pool.hpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. //
  2. // impl/thread_pool.hpp
  3. // ~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_IMPL_THREAD_POOL_HPP
  11. #define BOOST_ASIO_IMPL_THREAD_POOL_HPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/blocking_executor_op.hpp>
  16. #include <boost/asio/detail/bulk_executor_op.hpp>
  17. #include <boost/asio/detail/executor_op.hpp>
  18. #include <boost/asio/detail/fenced_block.hpp>
  19. #include <boost/asio/detail/non_const_lvalue.hpp>
  20. #include <boost/asio/detail/type_traits.hpp>
  21. #include <boost/asio/execution_context.hpp>
  22. #include <boost/asio/detail/push_options.hpp>
  23. namespace boost {
  24. namespace asio {
  25. inline thread_pool::executor_type
  26. thread_pool::get_executor() BOOST_ASIO_NOEXCEPT
  27. {
  28. return executor_type(*this);
  29. }
  30. inline thread_pool::executor_type
  31. thread_pool::executor() BOOST_ASIO_NOEXCEPT
  32. {
  33. return executor_type(*this);
  34. }
  35. inline thread_pool::scheduler_type
  36. thread_pool::scheduler() BOOST_ASIO_NOEXCEPT
  37. {
  38. return scheduler_type(*this);
  39. }
  40. template <typename Allocator, unsigned int Bits>
  41. thread_pool::basic_executor_type<Allocator, Bits>&
  42. thread_pool::basic_executor_type<Allocator, Bits>::operator=(
  43. const basic_executor_type& other) BOOST_ASIO_NOEXCEPT
  44. {
  45. if (this != &other)
  46. {
  47. thread_pool* old_thread_pool = pool_;
  48. pool_ = other.pool_;
  49. allocator_ = other.allocator_;
  50. bits_ = other.bits_;
  51. if (Bits & outstanding_work_tracked)
  52. {
  53. if (pool_)
  54. pool_->scheduler_.work_started();
  55. if (old_thread_pool)
  56. old_thread_pool->scheduler_.work_finished();
  57. }
  58. }
  59. return *this;
  60. }
  61. #if defined(BOOST_ASIO_HAS_MOVE)
  62. template <typename Allocator, unsigned int Bits>
  63. thread_pool::basic_executor_type<Allocator, Bits>&
  64. thread_pool::basic_executor_type<Allocator, Bits>::operator=(
  65. basic_executor_type&& other) BOOST_ASIO_NOEXCEPT
  66. {
  67. if (this != &other)
  68. {
  69. thread_pool* old_thread_pool = pool_;
  70. pool_ = other.pool_;
  71. allocator_ = std::move(other.allocator_);
  72. bits_ = other.bits_;
  73. if (Bits & outstanding_work_tracked)
  74. {
  75. other.pool_ = 0;
  76. if (old_thread_pool)
  77. old_thread_pool->scheduler_.work_finished();
  78. }
  79. }
  80. return *this;
  81. }
  82. #endif // defined(BOOST_ASIO_HAS_MOVE)
  83. template <typename Allocator, unsigned int Bits>
  84. inline bool thread_pool::basic_executor_type<Allocator,
  85. Bits>::running_in_this_thread() const BOOST_ASIO_NOEXCEPT
  86. {
  87. return pool_->scheduler_.can_dispatch();
  88. }
  89. template <typename Allocator, unsigned int Bits>
  90. template <typename Function>
  91. void thread_pool::basic_executor_type<Allocator,
  92. Bits>::do_execute(BOOST_ASIO_MOVE_ARG(Function) f, false_type) const
  93. {
  94. typedef typename decay<Function>::type function_type;
  95. // Invoke immediately if the blocking.possibly property is enabled and we are
  96. // already inside the thread pool.
  97. if ((bits_ & blocking_never) == 0 && pool_->scheduler_.can_dispatch())
  98. {
  99. // Make a local, non-const copy of the function.
  100. function_type tmp(BOOST_ASIO_MOVE_CAST(Function)(f));
  101. #if defined(BOOST_ASIO_HAS_STD_EXCEPTION_PTR) \
  102. && !defined(BOOST_ASIO_NO_EXCEPTIONS)
  103. try
  104. {
  105. #endif // defined(BOOST_ASIO_HAS_STD_EXCEPTION_PTR)
  106. // && !defined(BOOST_ASIO_NO_EXCEPTIONS)
  107. detail::fenced_block b(detail::fenced_block::full);
  108. boost_asio_handler_invoke_helpers::invoke(tmp, tmp);
  109. return;
  110. #if defined(BOOST_ASIO_HAS_STD_EXCEPTION_PTR) \
  111. && !defined(BOOST_ASIO_NO_EXCEPTIONS)
  112. }
  113. catch (...)
  114. {
  115. pool_->scheduler_.capture_current_exception();
  116. return;
  117. }
  118. #endif // defined(BOOST_ASIO_HAS_STD_EXCEPTION_PTR)
  119. // && !defined(BOOST_ASIO_NO_EXCEPTIONS)
  120. }
  121. // Allocate and construct an operation to wrap the function.
  122. typedef detail::executor_op<function_type, Allocator> op;
  123. typename op::ptr p = { detail::addressof(allocator_),
  124. op::ptr::allocate(allocator_), 0 };
  125. p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(f), allocator_);
  126. if ((bits_ & relationship_continuation) != 0)
  127. {
  128. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  129. "thread_pool", pool_, 0, "execute(blk=never,rel=cont)"));
  130. }
  131. else
  132. {
  133. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  134. "thread_pool", pool_, 0, "execute(blk=never,rel=fork)"));
  135. }
  136. pool_->scheduler_.post_immediate_completion(p.p,
  137. (bits_ & relationship_continuation) != 0);
  138. p.v = p.p = 0;
  139. }
  140. template <typename Allocator, unsigned int Bits>
  141. template <typename Function>
  142. void thread_pool::basic_executor_type<Allocator,
  143. Bits>::do_execute(BOOST_ASIO_MOVE_ARG(Function) f, true_type) const
  144. {
  145. // Obtain a non-const instance of the function.
  146. detail::non_const_lvalue<Function> f2(f);
  147. // Invoke immediately if we are already inside the thread pool.
  148. if (pool_->scheduler_.can_dispatch())
  149. {
  150. #if !defined(BOOST_ASIO_NO_EXCEPTIONS)
  151. try
  152. {
  153. #endif // !defined(BOOST_ASIO_NO_EXCEPTIONS)
  154. detail::fenced_block b(detail::fenced_block::full);
  155. boost_asio_handler_invoke_helpers::invoke(f2.value, f2.value);
  156. return;
  157. #if !defined(BOOST_ASIO_NO_EXCEPTIONS)
  158. }
  159. catch (...)
  160. {
  161. std::terminate();
  162. }
  163. #endif // !defined(BOOST_ASIO_NO_EXCEPTIONS)
  164. }
  165. // Construct an operation to wrap the function.
  166. typedef typename decay<Function>::type function_type;
  167. detail::blocking_executor_op<function_type> op(f2.value);
  168. BOOST_ASIO_HANDLER_CREATION((*pool_, op,
  169. "thread_pool", pool_, 0, "execute(blk=always)"));
  170. pool_->scheduler_.post_immediate_completion(&op, false);
  171. op.wait();
  172. }
  173. template <typename Allocator, unsigned int Bits>
  174. template <typename Function>
  175. void thread_pool::basic_executor_type<Allocator, Bits>::do_bulk_execute(
  176. BOOST_ASIO_MOVE_ARG(Function) f, std::size_t n, false_type) const
  177. {
  178. typedef typename decay<Function>::type function_type;
  179. typedef detail::bulk_executor_op<function_type, Allocator> op;
  180. // Allocate and construct operations to wrap the function.
  181. detail::op_queue<detail::scheduler_operation> ops;
  182. for (std::size_t i = 0; i < n; ++i)
  183. {
  184. typename op::ptr p = { detail::addressof(allocator_),
  185. op::ptr::allocate(allocator_), 0 };
  186. p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(f), allocator_, i);
  187. ops.push(p.p);
  188. if ((bits_ & relationship_continuation) != 0)
  189. {
  190. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  191. "thread_pool", pool_, 0, "bulk_execute(blk=never,rel=cont)"));
  192. }
  193. else
  194. {
  195. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  196. "thread_pool", pool_, 0, "bulk)execute(blk=never,rel=fork)"));
  197. }
  198. p.v = p.p = 0;
  199. }
  200. pool_->scheduler_.post_immediate_completions(n,
  201. ops, (bits_ & relationship_continuation) != 0);
  202. }
  203. template <typename Function>
  204. struct thread_pool_always_blocking_function_adapter
  205. {
  206. typename decay<Function>::type* f;
  207. std::size_t n;
  208. void operator()()
  209. {
  210. for (std::size_t i = 0; i < n; ++i)
  211. {
  212. (*f)(i);
  213. }
  214. }
  215. };
  216. template <typename Allocator, unsigned int Bits>
  217. template <typename Function>
  218. void thread_pool::basic_executor_type<Allocator, Bits>::do_bulk_execute(
  219. BOOST_ASIO_MOVE_ARG(Function) f, std::size_t n, true_type) const
  220. {
  221. // Obtain a non-const instance of the function.
  222. detail::non_const_lvalue<Function> f2(f);
  223. thread_pool_always_blocking_function_adapter<Function>
  224. adapter = { detail::addressof(f2.value), n };
  225. this->do_execute(adapter, true_type());
  226. }
  227. #if !defined(BOOST_ASIO_NO_TS_EXECUTORS)
  228. template <typename Allocator, unsigned int Bits>
  229. inline thread_pool& thread_pool::basic_executor_type<
  230. Allocator, Bits>::context() const BOOST_ASIO_NOEXCEPT
  231. {
  232. return *pool_;
  233. }
  234. template <typename Allocator, unsigned int Bits>
  235. inline void thread_pool::basic_executor_type<Allocator,
  236. Bits>::on_work_started() const BOOST_ASIO_NOEXCEPT
  237. {
  238. pool_->scheduler_.work_started();
  239. }
  240. template <typename Allocator, unsigned int Bits>
  241. inline void thread_pool::basic_executor_type<Allocator,
  242. Bits>::on_work_finished() const BOOST_ASIO_NOEXCEPT
  243. {
  244. pool_->scheduler_.work_finished();
  245. }
  246. template <typename Allocator, unsigned int Bits>
  247. template <typename Function, typename OtherAllocator>
  248. void thread_pool::basic_executor_type<Allocator, Bits>::dispatch(
  249. BOOST_ASIO_MOVE_ARG(Function) f, const OtherAllocator& a) const
  250. {
  251. typedef typename decay<Function>::type function_type;
  252. // Invoke immediately if we are already inside the thread pool.
  253. if (pool_->scheduler_.can_dispatch())
  254. {
  255. // Make a local, non-const copy of the function.
  256. function_type tmp(BOOST_ASIO_MOVE_CAST(Function)(f));
  257. detail::fenced_block b(detail::fenced_block::full);
  258. boost_asio_handler_invoke_helpers::invoke(tmp, tmp);
  259. return;
  260. }
  261. // Allocate and construct an operation to wrap the function.
  262. typedef detail::executor_op<function_type, OtherAllocator> op;
  263. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
  264. p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(f), a);
  265. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  266. "thread_pool", pool_, 0, "dispatch"));
  267. pool_->scheduler_.post_immediate_completion(p.p, false);
  268. p.v = p.p = 0;
  269. }
  270. template <typename Allocator, unsigned int Bits>
  271. template <typename Function, typename OtherAllocator>
  272. void thread_pool::basic_executor_type<Allocator, Bits>::post(
  273. BOOST_ASIO_MOVE_ARG(Function) f, const OtherAllocator& a) const
  274. {
  275. typedef typename decay<Function>::type function_type;
  276. // Allocate and construct an operation to wrap the function.
  277. typedef detail::executor_op<function_type, OtherAllocator> op;
  278. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
  279. p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(f), a);
  280. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  281. "thread_pool", pool_, 0, "post"));
  282. pool_->scheduler_.post_immediate_completion(p.p, false);
  283. p.v = p.p = 0;
  284. }
  285. template <typename Allocator, unsigned int Bits>
  286. template <typename Function, typename OtherAllocator>
  287. void thread_pool::basic_executor_type<Allocator, Bits>::defer(
  288. BOOST_ASIO_MOVE_ARG(Function) f, const OtherAllocator& a) const
  289. {
  290. typedef typename decay<Function>::type function_type;
  291. // Allocate and construct an operation to wrap the function.
  292. typedef detail::executor_op<function_type, OtherAllocator> op;
  293. typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 };
  294. p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(f), a);
  295. BOOST_ASIO_HANDLER_CREATION((*pool_, *p.p,
  296. "thread_pool", pool_, 0, "defer"));
  297. pool_->scheduler_.post_immediate_completion(p.p, true);
  298. p.v = p.p = 0;
  299. }
  300. #endif // !defined(BOOST_ASIO_NO_TS_EXECUTORS)
  301. } // namespace asio
  302. } // namespace boost
  303. #include <boost/asio/detail/pop_options.hpp>
  304. #endif // BOOST_ASIO_IMPL_THREAD_POOL_HPP