task_source.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. // Copyright 2019 The Chromium Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file.
  4. #ifndef BASE_TASK_THREAD_POOL_TASK_SOURCE_H_
  5. #define BASE_TASK_THREAD_POOL_TASK_SOURCE_H_
  6. #include <stddef.h>
  7. #include "base/base_export.h"
  8. #include "base/compiler_specific.h"
  9. #include "base/macros.h"
  10. #include "base/memory/ref_counted.h"
  11. #include "base/optional.h"
  12. #include "base/sequence_token.h"
  13. #include "base/task/common/checked_lock.h"
  14. #include "base/task/common/intrusive_heap.h"
  15. #include "base/task/task_traits.h"
  16. #include "base/task/thread_pool/task.h"
  17. #include "base/task/thread_pool/task_source_sort_key.h"
  18. #include "base/threading/sequence_local_storage_map.h"
  19. namespace base {
  20. namespace internal {
  21. class TaskTracker;
  22. enum class TaskSourceExecutionMode {
  23. kParallel,
  24. kSequenced,
  25. kSingleThread,
  26. kJob,
  27. kMax = kJob,
  28. };
  29. struct BASE_EXPORT ExecutionEnvironment {
  30. SequenceToken token;
  31. SequenceLocalStorageMap* sequence_local_storage;
  32. };
  33. // A TaskSource is a virtual class that provides a series of Tasks that must be
  34. // executed.
  35. //
  36. // A task source is registered when it's ready to be queued. A task source is
  37. // ready to be queued when either:
  38. // 1- It has new tasks that can run concurrently as a result of external
  39. // operations, e.g. posting a new task to an empty Sequence or increasing
  40. // max concurrency of a JobTaskSource;
  41. // 2- A worker finished running a task from it and DidProcessTask() returned
  42. // true; or
  43. // 3- A worker is about to run a task from it and WillRunTask() returned
  44. // kAllowedNotSaturated.
  45. //
  46. // A worker may perform the following sequence of operations on a
  47. // RegisteredTaskSource after obtaining it from the queue:
  48. // 1- Check whether a task can run with WillRunTask() (and register/enqueue the
  49. // task source again if not saturated).
  50. // 2- (optional) Iff (1) determined that a task can run, access the next task
  51. // with TakeTask().
  52. // 3- (optional) Execute the task.
  53. // 4- Inform the task source that a task was processed with DidProcessTask(),
  54. // and re-enqueue the task source iff requested.
  55. // When a task source is registered multiple times, many overlapping chains of
  56. // operations may run concurrently, as permitted by WillRunTask(). This allows
  57. // tasks from the same task source to run in parallel.
  58. // However, the following invariants are kept:
  59. // - The number of workers concurrently running tasks never goes over the
  60. // intended concurrency.
  61. // - If the task source has more tasks that can run concurrently, it must be
  62. // queued.
  63. //
  64. // Note: there is a known refcounted-ownership cycle in the ThreadPool
  65. // architecture: TaskSource -> TaskRunner -> TaskSource -> ... This is okay so
  66. // long as the other owners of TaskSource (PriorityQueue and WorkerThread in
  67. // alternation and ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork()
  68. // temporarily) keep running it (and taking Tasks from it as a result). A
  69. // dangling reference cycle would only occur should they release their reference
  70. // to it while it's not empty. In other words, it is only correct for them to
  71. // release it when DidProcessTask() returns false.
  72. //
  73. // This class is thread-safe.
  74. class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> {
  75. public:
  76. // Indicates whether WillRunTask() allows TakeTask() to be called on a
  77. // RegisteredTaskSource.
  78. enum class RunStatus {
  79. // TakeTask() cannot be called.
  80. kDisallowed,
  81. // TakeTask() may called, and the TaskSource has not reached its maximum
  82. // concurrency (i.e. the TaskSource still needs to be queued).
  83. kAllowedNotSaturated,
  84. // TakeTask() may called, and the TaskSource has reached its maximum
  85. // concurrency (i.e. the TaskSource no longer needs to be queued).
  86. kAllowedSaturated,
  87. };
  88. // A Transaction can perform multiple operations atomically on a
  89. // TaskSource. While a Transaction is alive, it is guaranteed that nothing
  90. // else will access the TaskSource; the TaskSource's lock is held for the
  91. // lifetime of the Transaction.
  92. class BASE_EXPORT Transaction {
  93. public:
  94. Transaction(Transaction&& other);
  95. ~Transaction();
  96. operator bool() const { return !!task_source_; }
  97. // Sets TaskSource priority to |priority|.
  98. void UpdatePriority(TaskPriority priority);
  99. // Returns the traits of all Tasks in the TaskSource.
  100. TaskTraits traits() const { return task_source_->traits_; }
  101. TaskSource* task_source() const { return task_source_; }
  102. protected:
  103. explicit Transaction(TaskSource* task_source);
  104. private:
  105. friend class TaskSource;
  106. TaskSource* task_source_;
  107. DISALLOW_COPY_AND_ASSIGN(Transaction);
  108. };
  109. // |traits| is metadata that applies to all Tasks in the TaskSource.
  110. // |task_runner| is a reference to the TaskRunner feeding this TaskSource.
  111. // |task_runner| can be nullptr only for tasks with no TaskRunner, in which
  112. // case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the
  113. // execution mode of |task_runner|.
  114. TaskSource(const TaskTraits& traits,
  115. TaskRunner* task_runner,
  116. TaskSourceExecutionMode execution_mode);
  117. // Begins a Transaction. This method cannot be called on a thread which has an
  118. // active TaskSource::Transaction.
  119. Transaction BeginTransaction() WARN_UNUSED_RESULT;
  120. virtual ExecutionEnvironment GetExecutionEnvironment() = 0;
  121. // Thread-safe but the returned value may immediately be obsolete. As such
  122. // this should only be used as a best-effort guess of how many more workers
  123. // are needed. This may be called on an empty task source.
  124. virtual size_t GetRemainingConcurrency() const = 0;
  125. // Returns a TaskSourceSortKey representing the priority of the TaskSource.
  126. virtual TaskSourceSortKey GetSortKey() const = 0;
  127. // Support for IntrusiveHeap.
  128. void SetHeapHandle(const HeapHandle& handle);
  129. void ClearHeapHandle();
  130. HeapHandle GetHeapHandle() const { return heap_handle_; }
  131. HeapHandle heap_handle() const { return heap_handle_; }
  132. // Returns the shutdown behavior of all Tasks in the TaskSource. Can be
  133. // accessed without a Transaction because it is never mutated.
  134. TaskShutdownBehavior shutdown_behavior() const {
  135. return traits_.shutdown_behavior();
  136. }
  137. // Returns a racy priority of the TaskSource. Can be accessed without a
  138. // Transaction but may return an outdated result.
  139. TaskPriority priority_racy() const {
  140. return priority_racy_.load(std::memory_order_relaxed);
  141. }
  142. // Returns the thread policy of the TaskSource. Can be accessed without a
  143. // Transaction because it is never mutated.
  144. ThreadPolicy thread_policy() const { return traits_.thread_policy(); }
  145. // A reference to TaskRunner is only retained between PushTask() and when
  146. // DidProcessTask() returns false, guaranteeing it is safe to dereference this
  147. // pointer. Otherwise, the caller should guarantee such TaskRunner still
  148. // exists before dereferencing.
  149. TaskRunner* task_runner() const { return task_runner_; }
  150. TaskSourceExecutionMode execution_mode() const { return execution_mode_; }
  151. protected:
  152. virtual ~TaskSource();
  153. virtual RunStatus WillRunTask() = 0;
  154. // Implementations of TakeTask(), DidProcessTask() and Clear() must ensure
  155. // proper synchronization iff |transaction| is nullptr.
  156. virtual Task TakeTask(TaskSource::Transaction* transaction) = 0;
  157. virtual bool DidProcessTask(TaskSource::Transaction* transaction) = 0;
  158. // This may be called for each outstanding RegisteredTaskSource that's ready.
  159. // The implementation needs to support this being called multiple times;
  160. // unless it guarantees never to hand-out multiple RegisteredTaskSources that
  161. // are concurrently ready.
  162. virtual Task Clear(TaskSource::Transaction* transaction) = 0;
  163. // Sets TaskSource priority to |priority|.
  164. void UpdatePriority(TaskPriority priority);
  165. // The TaskTraits of all Tasks in the TaskSource.
  166. TaskTraits traits_;
  167. // The cached priority for atomic access.
  168. std::atomic<TaskPriority> priority_racy_;
  169. // Synchronizes access to all members.
  170. mutable CheckedLock lock_{UniversalPredecessor()};
  171. private:
  172. friend class RefCountedThreadSafe<TaskSource>;
  173. friend class RegisteredTaskSource;
  174. // The TaskSource's position in its current PriorityQueue. Access is protected
  175. // by the PriorityQueue's lock.
  176. HeapHandle heap_handle_;
  177. // A pointer to the TaskRunner that posts to this TaskSource, if any. The
  178. // derived class is responsible for calling AddRef() when a TaskSource from
  179. // which no Task is executing becomes non-empty and Release() when
  180. // it becomes empty again (e.g. when DidProcessTask() returns false).
  181. TaskRunner* task_runner_;
  182. TaskSourceExecutionMode execution_mode_;
  183. DISALLOW_COPY_AND_ASSIGN(TaskSource);
  184. };
  185. // Wrapper around TaskSource to signify the intent to queue and run it.
  186. // RegisteredTaskSource can only be created with TaskTracker and may only be
  187. // used by a single worker at a time. However, the same task source may be
  188. // registered several times, spawning multiple RegisteredTaskSources. A
  189. // RegisteredTaskSource resets to its initial state when WillRunTask() fails
  190. // or after DidProcessTask(), so it can be used again.
  191. class BASE_EXPORT RegisteredTaskSource {
  192. public:
  193. RegisteredTaskSource();
  194. RegisteredTaskSource(std::nullptr_t);
  195. RegisteredTaskSource(RegisteredTaskSource&& other) noexcept;
  196. ~RegisteredTaskSource();
  197. RegisteredTaskSource& operator=(RegisteredTaskSource&& other);
  198. operator bool() const { return task_source_ != nullptr; }
  199. TaskSource* operator->() const { return task_source_.get(); }
  200. TaskSource* get() const { return task_source_.get(); }
  201. static RegisteredTaskSource CreateForTesting(
  202. scoped_refptr<TaskSource> task_source,
  203. TaskTracker* task_tracker = nullptr);
  204. // Can only be called if this RegisteredTaskSource is in its initial state.
  205. // Returns the underlying task source. An Optional is used in preparation for
  206. // the merge between ThreadPool and TaskQueueManager (in Blink).
  207. // https://crbug.com/783309
  208. scoped_refptr<TaskSource> Unregister();
  209. // Informs this TaskSource that the current worker would like to run a Task
  210. // from it. Can only be called if in its initial state. Returns a RunStatus
  211. // that indicates if the operation is allowed (TakeTask() can be called).
  212. TaskSource::RunStatus WillRunTask();
  213. // Returns the next task to run from this TaskSource. This should be called
  214. // only after WillRunTask() returned RunStatus::kAllowed*. |transaction| is
  215. // optional and should only be provided if this operation is already part of
  216. // a transaction.
  217. Task TakeTask(TaskSource::Transaction* transaction = nullptr)
  218. WARN_UNUSED_RESULT;
  219. // Must be called after WillRunTask() or once the task was run if TakeTask()
  220. // was called. This resets this RegisteredTaskSource to its initial state so
  221. // that WillRunTask() may be called again. |transaction| is optional and
  222. // should only be provided if this operation is already part of a transaction.
  223. // Returns true if the TaskSource should be queued after this operation.
  224. bool DidProcessTask(TaskSource::Transaction* transaction = nullptr);
  225. // Returns a task that clears this TaskSource to make it empty. |transaction|
  226. // is optional and should only be provided if this operation is already part
  227. // of a transaction.
  228. Task Clear(TaskSource::Transaction* transaction = nullptr) WARN_UNUSED_RESULT;
  229. private:
  230. friend class TaskTracker;
  231. RegisteredTaskSource(scoped_refptr<TaskSource> task_source,
  232. TaskTracker* task_tracker);
  233. #if DCHECK_IS_ON()
  234. // Indicates the step of a task execution chain.
  235. enum class State {
  236. kInitial, // WillRunTask() may be called.
  237. kReady, // After WillRunTask() returned a valid RunStatus.
  238. };
  239. State run_step_ = State::kInitial;
  240. #endif // DCHECK_IS_ON()
  241. scoped_refptr<TaskSource> task_source_;
  242. TaskTracker* task_tracker_ = nullptr;
  243. DISALLOW_COPY_AND_ASSIGN(RegisteredTaskSource);
  244. };
  245. // A pair of Transaction and RegisteredTaskSource. Useful to carry a
  246. // RegisteredTaskSource with an associated Transaction.
  247. // TODO(crbug.com/839091): Rename to RegisteredTaskSourceAndTransaction.
  248. struct BASE_EXPORT TransactionWithRegisteredTaskSource {
  249. public:
  250. TransactionWithRegisteredTaskSource(RegisteredTaskSource task_source_in,
  251. TaskSource::Transaction transaction_in);
  252. TransactionWithRegisteredTaskSource(
  253. TransactionWithRegisteredTaskSource&& other) = default;
  254. ~TransactionWithRegisteredTaskSource() = default;
  255. static TransactionWithRegisteredTaskSource FromTaskSource(
  256. RegisteredTaskSource task_source_in);
  257. RegisteredTaskSource task_source;
  258. TaskSource::Transaction transaction;
  259. DISALLOW_COPY_AND_ASSIGN(TransactionWithRegisteredTaskSource);
  260. };
  261. } // namespace internal
  262. } // namespace base
  263. #endif // BASE_TASK_THREAD_POOL_TASK_SOURCE_H_