123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- // Copyright 2016 The Chromium Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
- #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
- #define BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
- #include <stddef.h>
- #include <memory>
- #include <string>
- #include <vector>
- #include "base/base_export.h"
- #include "base/check_op.h"
- #include "base/compiler_specific.h"
- #include "base/containers/stack.h"
- #include "base/gtest_prod_util.h"
- #include "base/macros.h"
- #include "base/memory/ref_counted.h"
- #include "base/optional.h"
- #include "base/sequenced_task_runner.h"
- #include "base/strings/string_piece.h"
- #include "base/synchronization/condition_variable.h"
- #include "base/synchronization/waitable_event.h"
- #include "base/task/thread_pool/task.h"
- #include "base/task/thread_pool/task_source.h"
- #include "base/task/thread_pool/thread_group.h"
- #include "base/task/thread_pool/tracked_ref.h"
- #include "base/task/thread_pool/worker_thread.h"
- #include "base/task/thread_pool/worker_thread_stack.h"
- #include "base/time/time.h"
- namespace base {
- class HistogramBase;
- class WorkerThreadObserver;
- namespace internal {
- class TaskTracker;
- // A group of workers that run Tasks.
- //
- // The thread group doesn't create threads until Start() is called. Tasks can be
- // posted at any time but will not run until after Start() is called.
- //
- // This class is thread-safe.
- class BASE_EXPORT ThreadGroupImpl : public ThreadGroup {
- public:
- // Constructs a group without workers.
- //
- // |histogram_label| is used to label the thread group's histograms as
- // "ThreadPool." + histogram_name + "." + |histogram_label| + extra suffixes.
- // It must not be empty. |thread group_label| is used to label the thread
- // group's threads, it must not be empty. |priority_hint| is the preferred
- // thread priority; the actual thread priority depends on shutdown state and
- // platform capabilities. |task_tracker| keeps track of tasks.
- ThreadGroupImpl(StringPiece histogram_label,
- StringPiece thread_group_label,
- ThreadPriority priority_hint,
- TrackedRef<TaskTracker> task_tracker,
- TrackedRef<Delegate> delegate);
- // Creates threads, allowing existing and future tasks to run. The thread
- // group runs at most |max_tasks| / |max_best_effort_tasks| unblocked task
- // with any / BEST_EFFORT priority concurrently. It reclaims unused threads
- // after |suggested_reclaim_time|. It uses |service_thread_task_runner| to
- // monitor for blocked tasks. If specified, it notifies
- // |worker_thread_observer| when a worker enters and exits its main function
- // (the observer must not be destroyed before JoinForTesting() has returned).
- // |worker_environment| specifies the environment in which tasks are executed.
- // |may_block_threshold| is the timeout after which a task in a MAY_BLOCK
- // ScopedBlockingCall is considered blocked (the thread group will choose an
- // appropriate value if none is specified).
- // |synchronous_thread_start_for_testing| is true if this ThreadGroupImpl
- // should synchronously wait for OnMainEntry() after starting each worker. Can
- // only be called once. CHECKs on failure.
- void Start(int max_tasks,
- int max_best_effort_tasks,
- TimeDelta suggested_reclaim_time,
- scoped_refptr<SequencedTaskRunner> service_thread_task_runner,
- WorkerThreadObserver* worker_thread_observer,
- WorkerEnvironment worker_environment,
- bool synchronous_thread_start_for_testing = false,
- Optional<TimeDelta> may_block_threshold = Optional<TimeDelta>());
- // Destroying a ThreadGroupImpl returned by Create() is not allowed in
- // production; it is always leaked. In tests, it can only be destroyed after
- // JoinForTesting() has returned.
- ~ThreadGroupImpl() override;
- // ThreadGroup:
- void JoinForTesting() override;
- size_t GetMaxConcurrentNonBlockedTasksDeprecated() const override;
- void DidUpdateCanRunPolicy() override;
- const HistogramBase* num_tasks_before_detach_histogram() const {
- return num_tasks_before_detach_histogram_;
- }
- // Waits until at least |n| workers are idle. Note that while workers are
- // disallowed from cleaning up during this call: tests using a custom
- // |suggested_reclaim_time_| need to be careful to invoke this swiftly after
- // unblocking the waited upon workers as: if a worker is already detached by
- // the time this is invoked, it will never make it onto the idle stack and
- // this call will hang.
- void WaitForWorkersIdleForTesting(size_t n);
- // Waits until at least |n| workers are idle.
- void WaitForWorkersIdleLockRequiredForTesting(size_t n)
- EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Waits until all workers are idle.
- void WaitForAllWorkersIdleForTesting();
- // Waits until |n| workers have cleaned up (went through
- // WorkerThreadDelegateImpl::OnMainExit()) since the last call to
- // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet).
- void WaitForWorkersCleanedUpForTesting(size_t n);
- // Returns the number of workers in this thread group.
- size_t NumberOfWorkersForTesting() const;
- // Returns |max_tasks_|/|max_best_effort_tasks_|.
- size_t GetMaxTasksForTesting() const;
- size_t GetMaxBestEffortTasksForTesting() const;
- // Returns the number of workers that are idle (i.e. not running tasks).
- size_t NumberOfIdleWorkersForTesting() const;
- private:
- class ScopedCommandsExecutor;
- class WorkerThreadDelegateImpl;
- // Friend tests so that they can access |blocked_workers_poll_period| and
- // may_block_threshold().
- friend class ThreadGroupImplBlockingTest;
- friend class ThreadGroupImplMayBlockTest;
- FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
- ThreadBlockUnblockPremature);
- FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
- ThreadBlockUnblockPrematureBestEffort);
- // ThreadGroup:
- void UpdateSortKey(TaskSource::Transaction transaction) override;
- void PushTaskSourceAndWakeUpWorkers(
- TransactionWithRegisteredTaskSource transaction_with_task_source)
- override;
- void EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor* executor)
- override EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Creates a worker and schedules its start, if needed, to maintain one idle
- // worker, |max_tasks_| permitting.
- void MaintainAtLeastOneIdleWorkerLockRequired(
- ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Returns true if worker cleanup is permitted.
- bool CanWorkerCleanupForTestingLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Creates a worker, adds it to the thread group, schedules its start and
- // returns it. Cannot be called before Start().
- scoped_refptr<WorkerThread> CreateAndRegisterWorkerLockRequired(
- ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Returns the number of workers that are awake (i.e. not on the idle stack).
- size_t GetNumAwakeWorkersLockRequired() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Returns the desired number of awake workers, given current workload and
- // concurrency limits.
- size_t GetDesiredNumAwakeWorkersLockRequired() const
- EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Examines the list of WorkerThreads and increments |max_tasks_| for each
- // worker that has been within the scope of a MAY_BLOCK ScopedBlockingCall for
- // more than BlockedThreshold(). Reschedules a call if necessary.
- void AdjustMaxTasks();
- // Returns the threshold after which the max tasks is increased to compensate
- // for a worker that is within a MAY_BLOCK ScopedBlockingCall.
- TimeDelta may_block_threshold_for_testing() const {
- return after_start().may_block_threshold;
- }
- // Interval at which the service thread checks for workers in this thread
- // group that have been in a MAY_BLOCK ScopedBlockingCall for more than
- // may_block_threshold().
- TimeDelta blocked_workers_poll_period_for_testing() const {
- return after_start().blocked_workers_poll_period;
- }
- // Starts calling AdjustMaxTasks() periodically on
- // |service_thread_task_runner_|.
- void ScheduleAdjustMaxTasks();
- // Schedules AdjustMaxTasks() through |executor| if required.
- void MaybeScheduleAdjustMaxTasksLockRequired(ScopedCommandsExecutor* executor)
- EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Returns true if AdjustMaxTasks() should periodically be called on
- // |service_thread_task_runner_|.
- bool ShouldPeriodicallyAdjustMaxTasksLockRequired()
- EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Updates the minimum priority allowed to run below which tasks should yield.
- // This should be called whenever |num_running_tasks_| or |max_tasks| changes,
- // or when a new task is added to |priority_queue_|.
- void UpdateMinAllowedPriorityLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Increments/decrements the number of tasks of |priority| that are currently
- // running in this thread group. Must be invoked before/after running a task.
- void DecrementTasksRunningLockRequired(TaskPriority priority)
- EXCLUSIVE_LOCKS_REQUIRED(lock_);
- void IncrementTasksRunningLockRequired(TaskPriority priority)
- EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Increments/decrements the number of [best effort] tasks that can run in
- // this thread group.
- void DecrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
- void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
- void DecrementMaxBestEffortTasksLockRequired()
- EXCLUSIVE_LOCKS_REQUIRED(lock_);
- void IncrementMaxBestEffortTasksLockRequired()
- EXCLUSIVE_LOCKS_REQUIRED(lock_);
- // Values set at Start() and never modified afterwards.
- struct InitializedInStart {
- InitializedInStart();
- ~InitializedInStart();
- #if DCHECK_IS_ON()
- // Set after all members of this struct are set.
- bool initialized = false;
- #endif
- // Initial value of |max_tasks_|.
- size_t initial_max_tasks = 0;
- // Suggested reclaim time for workers.
- TimeDelta suggested_reclaim_time;
- // Environment to be initialized per worker.
- WorkerEnvironment worker_environment = WorkerEnvironment::NONE;
- scoped_refptr<SequencedTaskRunner> service_thread_task_runner;
- // Optional observer notified when a worker enters and exits its main.
- WorkerThreadObserver* worker_thread_observer = nullptr;
- bool may_block_without_delay;
- // Threshold after which the max tasks is increased to compensate for a
- // worker that is within a MAY_BLOCK ScopedBlockingCall.
- TimeDelta may_block_threshold;
- // The period between calls to AdjustMaxTasks() when the thread group is at
- // capacity.
- TimeDelta blocked_workers_poll_period;
- } initialized_in_start_;
- InitializedInStart& in_start() {
- #if DCHECK_IS_ON()
- DCHECK(!initialized_in_start_.initialized);
- #endif
- return initialized_in_start_;
- }
- const InitializedInStart& after_start() const {
- #if DCHECK_IS_ON()
- DCHECK(initialized_in_start_.initialized);
- #endif
- return initialized_in_start_;
- }
- const std::string thread_group_label_;
- const ThreadPriority priority_hint_;
- // All workers owned by this thread group.
- std::vector<scoped_refptr<WorkerThread>> workers_ GUARDED_BY(lock_);
- // Maximum number of tasks of any priority / BEST_EFFORT priority that can run
- // concurrently in this thread group.
- size_t max_tasks_ GUARDED_BY(lock_) = 0;
- size_t max_best_effort_tasks_ GUARDED_BY(lock_) = 0;
- // Number of tasks of any priority / BEST_EFFORT priority that are currently
- // running in this thread group.
- size_t num_running_tasks_ GUARDED_BY(lock_) = 0;
- size_t num_running_best_effort_tasks_ GUARDED_BY(lock_) = 0;
- // Number of workers running a task of any priority / BEST_EFFORT priority
- // that are within the scope of a MAY_BLOCK ScopedBlockingCall but haven't
- // caused a max tasks increase yet.
- int num_unresolved_may_block_ GUARDED_BY(lock_) = 0;
- int num_unresolved_best_effort_may_block_ GUARDED_BY(lock_) = 0;
- // Stack of idle workers. Initially, all workers are on this stack. A worker
- // is removed from the stack before its WakeUp() function is called and when
- // it receives work from GetWork() (a worker calls GetWork() when its sleep
- // timeout expires, even if its WakeUp() method hasn't been called). A worker
- // is pushed on this stack when it receives nullptr from GetWork().
- WorkerThreadStack idle_workers_stack_ GUARDED_BY(lock_);
- // Signaled when a worker is added to the idle workers stack.
- std::unique_ptr<ConditionVariable> idle_workers_stack_cv_for_testing_
- GUARDED_BY(lock_);
- // Stack that contains the timestamps of when workers get cleaned up.
- // Timestamps get popped off the stack as new workers are added.
- base::stack<TimeTicks, std::vector<TimeTicks>> cleanup_timestamps_
- GUARDED_BY(lock_);
- // Whether an AdjustMaxTasks() task was posted to the service thread.
- bool adjust_max_tasks_posted_ GUARDED_BY(lock_) = false;
- // Indicates to the delegates that workers are not permitted to cleanup.
- bool worker_cleanup_disallowed_for_testing_ GUARDED_BY(lock_) = false;
- // Counts the number of workers cleaned up (went through
- // WorkerThreadDelegateImpl::OnMainExit()) since the last call to
- // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet).
- // |some_workers_cleaned_up_for_testing_| is true if this was ever
- // incremented. Tests with a custom |suggested_reclaim_time_| can wait on a
- // specific number of workers being cleaned up via
- // WaitForWorkersCleanedUpForTesting().
- size_t num_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = 0;
- #if DCHECK_IS_ON()
- bool some_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = false;
- #endif
- // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is
- // incremented.
- std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_
- GUARDED_BY(lock_);
- // Set at the start of JoinForTesting().
- bool join_for_testing_started_ GUARDED_BY(lock_) = false;
- // Null-opt unless |synchronous_thread_start_for_testing| was true at
- // construction. In that case, it's signaled each time
- // WorkerThreadDelegateImpl::OnMainEntry() completes.
- Optional<WaitableEvent> worker_started_for_testing_;
- // Cached HistogramBase pointers, can be accessed without
- // holding |lock_|. If |lock_| is held, add new samples using
- // ThreadGroupImpl::ScopedCommandsExecutor (increase
- // |scheduled_histogram_samples_| size as needed) to defer until after |lock_|
- // release, due to metrics system callbacks which may schedule tasks.
- // ThreadPool.DetachDuration.[thread group name] histogram. Intentionally
- // leaked.
- HistogramBase* const detach_duration_histogram_;
- // ThreadPool.NumTasksBeforeDetach.[thread group name] histogram.
- // Intentionally leaked.
- HistogramBase* const num_tasks_before_detach_histogram_;
- // Ensures recently cleaned up workers (ref.
- // WorkerThreadDelegateImpl::CleanupLockRequired()) had time to exit as
- // they have a raw reference to |this| (and to TaskTracker) which can
- // otherwise result in racy use-after-frees per no longer being part of
- // |workers_| and hence not being explicitly joined in JoinForTesting():
- // https://crbug.com/810464. Uses AtomicRefCount to make its only public
- // method thread-safe.
- TrackedRefFactory<ThreadGroupImpl> tracked_ref_factory_;
- DISALLOW_COPY_AND_ASSIGN(ThreadGroupImpl);
- };
- } // namespace internal
- } // namespace base
- #endif // BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
|