123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- // Copyright 2019 The Chromium Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
- #ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
- #define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
- #include <stddef.h>
- #include <atomic>
- #include <limits>
- #include "base/base_export.h"
- #include "base/callback.h"
- #include "base/macros.h"
- #include "base/optional.h"
- #include "base/synchronization/condition_variable.h"
- #include "base/task/common/checked_lock.h"
- #include "base/task/post_job.h"
- #include "base/task/task_traits.h"
- #include "base/task/thread_pool/task.h"
- #include "base/task/thread_pool/task_source.h"
- #include "base/task/thread_pool/task_source_sort_key.h"
- namespace base {
- namespace internal {
- class PooledTaskRunnerDelegate;
- // A JobTaskSource generates many Tasks from a single RepeatingClosure.
- //
- // Derived classes control the intended concurrency with GetMaxConcurrency().
- class BASE_EXPORT JobTaskSource : public TaskSource {
- public:
- JobTaskSource(const Location& from_here,
- const TaskTraits& traits,
- RepeatingCallback<void(JobDelegate*)> worker_task,
- MaxConcurrencyCallback max_concurrency_callback,
- PooledTaskRunnerDelegate* delegate);
- static JobHandle CreateJobHandle(
- scoped_refptr<internal::JobTaskSource> task_source) {
- return JobHandle(std::move(task_source));
- }
- // Notifies this task source that max concurrency was increased, and the
- // number of worker should be adjusted.
- void NotifyConcurrencyIncrease();
- // Informs this JobTaskSource that the current thread would like to join and
- // contribute to running |worker_task|. Returns true if the joining thread can
- // contribute (RunJoinTask() can be called), or false if joining was completed
- // and all other workers returned because either there's no work remaining or
- // Job was cancelled.
- bool WillJoin();
- // Contributes to running |worker_task| and returns true if the joining thread
- // can contribute again (RunJoinTask() can be called again), or false if
- // joining was completed and all other workers returned because either there's
- // no work remaining or Job was cancelled. This should be called only after
- // WillJoin() or RunJoinTask() previously returned true.
- bool RunJoinTask();
- // Cancels this JobTaskSource, causing all workers to yield and WillRunTask()
- // to return RunStatus::kDisallowed.
- void Cancel(TaskSource::Transaction* transaction = nullptr);
- // TaskSource:
- ExecutionEnvironment GetExecutionEnvironment() override;
- size_t GetRemainingConcurrency() const override;
- TaskSourceSortKey GetSortKey() const override;
- bool IsCompleted() const;
- size_t GetWorkerCount() const;
- // Returns the maximum number of tasks from this TaskSource that can run
- // concurrently.
- size_t GetMaxConcurrency() const;
- uint8_t AcquireTaskId();
- void ReleaseTaskId(uint8_t task_id);
- // Returns true if a worker should return from the worker task on the current
- // thread ASAP.
- bool ShouldYield();
- PooledTaskRunnerDelegate* delegate() const { return delegate_; }
- private:
- // Atomic internal state to track the number of workers running a task from
- // this JobTaskSource and whether this JobTaskSource is canceled. All
- // operations are performed with std::memory_order_relaxed as State is only
- // ever modified under a lock or read atomically (optimistic read).
- class State {
- public:
- static constexpr size_t kCanceledMask = 1;
- static constexpr size_t kWorkerCountBitOffset = 1;
- static constexpr size_t kWorkerCountIncrement = 1 << kWorkerCountBitOffset;
- struct Value {
- size_t worker_count() const { return value >> kWorkerCountBitOffset; }
- // Returns true if canceled.
- bool is_canceled() const { return value & kCanceledMask; }
- uint32_t value;
- };
- State();
- ~State();
- // Sets as canceled. Returns the state
- // before the operation.
- Value Cancel();
- // Increments the worker count by 1. Returns the state before the operation.
- Value IncrementWorkerCount();
- // Decrements the worker count by 1. Returns the state before the operation.
- Value DecrementWorkerCount();
- // Loads and returns the state.
- Value Load() const;
- private:
- std::atomic<uint32_t> value_{0};
- };
- // Atomic flag that indicates if the joining thread is currently waiting on
- // another worker to yield or to signal.
- class JoinFlag {
- public:
- static constexpr uint32_t kNotWaiting = 0;
- static constexpr uint32_t kWaitingForWorkerToSignal = 1;
- static constexpr uint32_t kWaitingForWorkerToYield = 3;
- // kWaitingForWorkerToYield is 3 because the impl relies on the following
- // property.
- static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) ==
- kWaitingForWorkerToSignal,
- "");
- JoinFlag();
- ~JoinFlag();
- // Returns true if the status is not kNotWaiting, using
- // std::memory_order_relaxed.
- bool IsWaiting() {
- return value_.load(std::memory_order_relaxed) != kNotWaiting;
- }
- // Sets the status as kWaitingForWorkerToYield using
- // std::memory_order_relaxed.
- void SetWaiting();
- // If the flag is kWaitingForWorkerToYield, returns true indicating that the
- // worker should yield, and atomically updates to kWaitingForWorkerToSignal
- // (using std::memory_order_relaxed) to ensure that a single worker yields
- // in response to SetWaiting().
- bool ShouldWorkerYield();
- // If the flag is kWaiting*, returns true indicating that the worker should
- // signal, and atomically updates to kNotWaiting (using
- // std::memory_order_relaxed) to ensure that a single worker signals in
- // response to SetWaiting().
- bool ShouldWorkerSignal();
- private:
- std::atomic<uint32_t> value_{kNotWaiting};
- };
- ~JobTaskSource() override;
- // Called from the joining thread. Waits for the worker count to be below or
- // equal to max concurrency (will happen when a worker calls
- // DidProcessTask()). Returns true if the joining thread should run a task, or
- // false if joining was completed and all other workers returned because
- // either there's no work remaining or Job was cancelled.
- bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_);
- size_t GetMaxConcurrency(size_t worker_count) const;
- // TaskSource:
- RunStatus WillRunTask() override;
- Task TakeTask(TaskSource::Transaction* transaction) override;
- Task Clear(TaskSource::Transaction* transaction) override;
- bool DidProcessTask(TaskSource::Transaction* transaction) override;
- // Synchronizes access to workers state.
- mutable CheckedLock worker_lock_{UniversalSuccessor()};
- // Current atomic state (atomic despite the lock to allow optimistic reads
- // without the lock).
- State state_ GUARDED_BY(worker_lock_);
- // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
- // hence the use of atomics.
- JoinFlag join_flag_ GUARDED_BY(worker_lock_);
- // Signaled when |join_flag_| is kWaiting* and a worker returns.
- std::unique_ptr<ConditionVariable> worker_released_condition_
- GUARDED_BY(worker_lock_);
- std::atomic<uint32_t> assigned_task_ids_{0};
- const Location from_here_;
- RepeatingCallback<size_t(size_t)> max_concurrency_callback_;
- // Worker task set by the job owner.
- RepeatingCallback<void(JobDelegate*)> worker_task_;
- // Task returned from TakeTask(), that calls |worker_task_| internally.
- RepeatingClosure primary_task_;
- const TimeTicks ready_time_;
- PooledTaskRunnerDelegate* delegate_;
- DISALLOW_COPY_AND_ASSIGN(JobTaskSource);
- };
- } // namespace internal
- } // namespace base
- #endif // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
|