job_task_source.h 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. // Copyright 2019 The Chromium Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file.
  4. #ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
  5. #define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
  6. #include <stddef.h>
  7. #include <atomic>
  8. #include <limits>
  9. #include "base/base_export.h"
  10. #include "base/callback.h"
  11. #include "base/macros.h"
  12. #include "base/optional.h"
  13. #include "base/synchronization/condition_variable.h"
  14. #include "base/task/common/checked_lock.h"
  15. #include "base/task/post_job.h"
  16. #include "base/task/task_traits.h"
  17. #include "base/task/thread_pool/task.h"
  18. #include "base/task/thread_pool/task_source.h"
  19. #include "base/task/thread_pool/task_source_sort_key.h"
  20. namespace base {
  21. namespace internal {
  22. class PooledTaskRunnerDelegate;
  23. // A JobTaskSource generates many Tasks from a single RepeatingClosure.
  24. //
  25. // Derived classes control the intended concurrency with GetMaxConcurrency().
  26. class BASE_EXPORT JobTaskSource : public TaskSource {
  27. public:
  28. JobTaskSource(const Location& from_here,
  29. const TaskTraits& traits,
  30. RepeatingCallback<void(JobDelegate*)> worker_task,
  31. MaxConcurrencyCallback max_concurrency_callback,
  32. PooledTaskRunnerDelegate* delegate);
  33. static JobHandle CreateJobHandle(
  34. scoped_refptr<internal::JobTaskSource> task_source) {
  35. return JobHandle(std::move(task_source));
  36. }
  37. // Notifies this task source that max concurrency was increased, and the
  38. // number of worker should be adjusted.
  39. void NotifyConcurrencyIncrease();
  40. // Informs this JobTaskSource that the current thread would like to join and
  41. // contribute to running |worker_task|. Returns true if the joining thread can
  42. // contribute (RunJoinTask() can be called), or false if joining was completed
  43. // and all other workers returned because either there's no work remaining or
  44. // Job was cancelled.
  45. bool WillJoin();
  46. // Contributes to running |worker_task| and returns true if the joining thread
  47. // can contribute again (RunJoinTask() can be called again), or false if
  48. // joining was completed and all other workers returned because either there's
  49. // no work remaining or Job was cancelled. This should be called only after
  50. // WillJoin() or RunJoinTask() previously returned true.
  51. bool RunJoinTask();
  52. // Cancels this JobTaskSource, causing all workers to yield and WillRunTask()
  53. // to return RunStatus::kDisallowed.
  54. void Cancel(TaskSource::Transaction* transaction = nullptr);
  55. // TaskSource:
  56. ExecutionEnvironment GetExecutionEnvironment() override;
  57. size_t GetRemainingConcurrency() const override;
  58. TaskSourceSortKey GetSortKey() const override;
  59. bool IsCompleted() const;
  60. size_t GetWorkerCount() const;
  61. // Returns the maximum number of tasks from this TaskSource that can run
  62. // concurrently.
  63. size_t GetMaxConcurrency() const;
  64. uint8_t AcquireTaskId();
  65. void ReleaseTaskId(uint8_t task_id);
  66. // Returns true if a worker should return from the worker task on the current
  67. // thread ASAP.
  68. bool ShouldYield();
  69. PooledTaskRunnerDelegate* delegate() const { return delegate_; }
  70. private:
  71. // Atomic internal state to track the number of workers running a task from
  72. // this JobTaskSource and whether this JobTaskSource is canceled. All
  73. // operations are performed with std::memory_order_relaxed as State is only
  74. // ever modified under a lock or read atomically (optimistic read).
  75. class State {
  76. public:
  77. static constexpr size_t kCanceledMask = 1;
  78. static constexpr size_t kWorkerCountBitOffset = 1;
  79. static constexpr size_t kWorkerCountIncrement = 1 << kWorkerCountBitOffset;
  80. struct Value {
  81. size_t worker_count() const { return value >> kWorkerCountBitOffset; }
  82. // Returns true if canceled.
  83. bool is_canceled() const { return value & kCanceledMask; }
  84. uint32_t value;
  85. };
  86. State();
  87. ~State();
  88. // Sets as canceled. Returns the state
  89. // before the operation.
  90. Value Cancel();
  91. // Increments the worker count by 1. Returns the state before the operation.
  92. Value IncrementWorkerCount();
  93. // Decrements the worker count by 1. Returns the state before the operation.
  94. Value DecrementWorkerCount();
  95. // Loads and returns the state.
  96. Value Load() const;
  97. private:
  98. std::atomic<uint32_t> value_{0};
  99. };
  100. // Atomic flag that indicates if the joining thread is currently waiting on
  101. // another worker to yield or to signal.
  102. class JoinFlag {
  103. public:
  104. static constexpr uint32_t kNotWaiting = 0;
  105. static constexpr uint32_t kWaitingForWorkerToSignal = 1;
  106. static constexpr uint32_t kWaitingForWorkerToYield = 3;
  107. // kWaitingForWorkerToYield is 3 because the impl relies on the following
  108. // property.
  109. static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) ==
  110. kWaitingForWorkerToSignal,
  111. "");
  112. JoinFlag();
  113. ~JoinFlag();
  114. // Returns true if the status is not kNotWaiting, using
  115. // std::memory_order_relaxed.
  116. bool IsWaiting() {
  117. return value_.load(std::memory_order_relaxed) != kNotWaiting;
  118. }
  119. // Sets the status as kWaitingForWorkerToYield using
  120. // std::memory_order_relaxed.
  121. void SetWaiting();
  122. // If the flag is kWaitingForWorkerToYield, returns true indicating that the
  123. // worker should yield, and atomically updates to kWaitingForWorkerToSignal
  124. // (using std::memory_order_relaxed) to ensure that a single worker yields
  125. // in response to SetWaiting().
  126. bool ShouldWorkerYield();
  127. // If the flag is kWaiting*, returns true indicating that the worker should
  128. // signal, and atomically updates to kNotWaiting (using
  129. // std::memory_order_relaxed) to ensure that a single worker signals in
  130. // response to SetWaiting().
  131. bool ShouldWorkerSignal();
  132. private:
  133. std::atomic<uint32_t> value_{kNotWaiting};
  134. };
  135. ~JobTaskSource() override;
  136. // Called from the joining thread. Waits for the worker count to be below or
  137. // equal to max concurrency (will happen when a worker calls
  138. // DidProcessTask()). Returns true if the joining thread should run a task, or
  139. // false if joining was completed and all other workers returned because
  140. // either there's no work remaining or Job was cancelled.
  141. bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_);
  142. size_t GetMaxConcurrency(size_t worker_count) const;
  143. // TaskSource:
  144. RunStatus WillRunTask() override;
  145. Task TakeTask(TaskSource::Transaction* transaction) override;
  146. Task Clear(TaskSource::Transaction* transaction) override;
  147. bool DidProcessTask(TaskSource::Transaction* transaction) override;
  148. // Synchronizes access to workers state.
  149. mutable CheckedLock worker_lock_{UniversalSuccessor()};
  150. // Current atomic state (atomic despite the lock to allow optimistic reads
  151. // without the lock).
  152. State state_ GUARDED_BY(worker_lock_);
  153. // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
  154. // hence the use of atomics.
  155. JoinFlag join_flag_ GUARDED_BY(worker_lock_);
  156. // Signaled when |join_flag_| is kWaiting* and a worker returns.
  157. std::unique_ptr<ConditionVariable> worker_released_condition_
  158. GUARDED_BY(worker_lock_);
  159. std::atomic<uint32_t> assigned_task_ids_{0};
  160. const Location from_here_;
  161. RepeatingCallback<size_t(size_t)> max_concurrency_callback_;
  162. // Worker task set by the job owner.
  163. RepeatingCallback<void(JobDelegate*)> worker_task_;
  164. // Task returned from TakeTask(), that calls |worker_task_| internally.
  165. RepeatingClosure primary_task_;
  166. const TimeTicks ready_time_;
  167. PooledTaskRunnerDelegate* delegate_;
  168. DISALLOW_COPY_AND_ASSIGN(JobTaskSource);
  169. };
  170. } // namespace internal
  171. } // namespace base
  172. #endif // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_