123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659 |
- #ifndef RTC_BASE_THREAD_H_
- #define RTC_BASE_THREAD_H_
- #include <stdint.h>
- #include <list>
- #include <map>
- #include <memory>
- #include <queue>
- #include <set>
- #include <string>
- #include <type_traits>
- #include <vector>
- #if defined(WEBRTC_POSIX)
- #include <pthread.h>
- #endif
- #include "api/function_view.h"
- #include "api/task_queue/queued_task.h"
- #include "api/task_queue/task_queue_base.h"
- #include "rtc_base/constructor_magic.h"
- #include "rtc_base/deprecated/recursive_critical_section.h"
- #include "rtc_base/location.h"
- #include "rtc_base/message_handler.h"
- #include "rtc_base/platform_thread_types.h"
- #include "rtc_base/socket_server.h"
- #include "rtc_base/system/rtc_export.h"
- #include "rtc_base/thread_annotations.h"
- #include "rtc_base/thread_message.h"
- #if defined(WEBRTC_WIN)
- #include "rtc_base/win32.h"
- #endif
- namespace rtc {
- class Thread;
- namespace rtc_thread_internal {
- class MessageLikeTask : public MessageData {
- public:
- virtual void Run() = 0;
- };
- template <class FunctorT>
- class MessageWithFunctor final : public MessageLikeTask {
- public:
- explicit MessageWithFunctor(FunctorT&& functor)
- : functor_(std::forward<FunctorT>(functor)) {}
- void Run() override { functor_(); }
- private:
- ~MessageWithFunctor() override {}
- typename std::remove_reference<FunctorT>::type functor_;
- RTC_DISALLOW_COPY_AND_ASSIGN(MessageWithFunctor);
- };
- }
- class RTC_EXPORT ThreadManager {
- public:
- static const int kForever = -1;
-
- static ThreadManager* Instance();
- static void Add(Thread* message_queue);
- static void Remove(Thread* message_queue);
- static void Clear(MessageHandler* handler);
-
-
-
- static void ProcessAllMessageQueuesForTesting();
- Thread* CurrentThread();
- void SetCurrentThread(Thread* thread);
-
-
- void ChangeCurrentThreadForTest(Thread* thread);
-
-
-
-
-
-
-
-
-
-
-
-
-
- Thread* WrapCurrentThread();
- void UnwrapCurrentThread();
- bool IsMainThread();
- #if RTC_DCHECK_IS_ON
-
-
-
- void RegisterSendAndCheckForCycles(Thread* source, Thread* target);
- #endif
- private:
- ThreadManager();
- ~ThreadManager();
- void SetCurrentThreadInternal(Thread* thread);
- void AddInternal(Thread* message_queue);
- void RemoveInternal(Thread* message_queue);
- void ClearInternal(MessageHandler* handler);
- void ProcessAllMessageQueuesInternal();
- #if RTC_DCHECK_IS_ON
- void RemoveFromSendGraph(Thread* thread) RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
- #endif
-
- std::vector<Thread*> message_queues_ RTC_GUARDED_BY(crit_);
-
-
-
- RecursiveCriticalSection crit_;
- size_t processing_ RTC_GUARDED_BY(crit_) = 0;
- #if RTC_DCHECK_IS_ON
-
-
-
- std::map<Thread*, std::set<Thread*>> send_graph_ RTC_GUARDED_BY(crit_);
- #endif
- #if defined(WEBRTC_POSIX)
- pthread_key_t key_;
- #endif
- #if defined(WEBRTC_WIN)
- const DWORD key_;
- #endif
-
- const PlatformThreadRef main_thread_ref_;
- RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager);
- };
- class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
- public:
- static const int kForever = -1;
-
-
-
-
-
- explicit Thread(SocketServer* ss);
- explicit Thread(std::unique_ptr<SocketServer> ss);
-
-
-
- Thread(SocketServer* ss, bool do_init);
- Thread(std::unique_ptr<SocketServer> ss, bool do_init);
-
-
-
-
-
-
-
-
- ~Thread() override;
- static std::unique_ptr<Thread> CreateWithSocketServer();
- static std::unique_ptr<Thread> Create();
- static Thread* Current();
-
-
-
-
- class ScopedDisallowBlockingCalls {
- public:
- ScopedDisallowBlockingCalls();
- ScopedDisallowBlockingCalls(const ScopedDisallowBlockingCalls&) = delete;
- ScopedDisallowBlockingCalls& operator=(const ScopedDisallowBlockingCalls&) =
- delete;
- ~ScopedDisallowBlockingCalls();
- private:
- Thread* const thread_;
- const bool previous_state_;
- };
- SocketServer* socketserver();
-
-
-
-
-
-
- virtual void Quit();
- virtual bool IsQuitting();
- virtual void Restart();
-
-
-
- virtual bool IsProcessingMessagesForTesting();
-
-
-
-
- virtual bool Get(Message* pmsg,
- int cmsWait = kForever,
- bool process_io = true);
- virtual bool Peek(Message* pmsg, int cmsWait = 0);
-
- virtual void Post(const Location& posted_from,
- MessageHandler* phandler,
- uint32_t id = 0,
- MessageData* pdata = nullptr,
- bool time_sensitive = false);
- virtual void PostDelayed(const Location& posted_from,
- int delay_ms,
- MessageHandler* phandler,
- uint32_t id = 0,
- MessageData* pdata = nullptr);
- virtual void PostAt(const Location& posted_from,
- int64_t run_at_ms,
- MessageHandler* phandler,
- uint32_t id = 0,
- MessageData* pdata = nullptr);
- virtual void Clear(MessageHandler* phandler,
- uint32_t id = MQID_ANY,
- MessageList* removed = nullptr);
- virtual void Dispatch(Message* pmsg);
-
- virtual int GetDelay();
- bool empty() const { return size() == 0u; }
- size_t size() const {
- CritScope cs(&crit_);
- return messages_.size() + delayed_messages_.size() + (fPeekKeep_ ? 1u : 0u);
- }
-
- template <class T>
- void Dispose(T* doomed) {
- if (doomed) {
- Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed));
- }
- }
-
-
- sigslot::signal0<> SignalQueueDestroyed;
- bool IsCurrent() const;
-
-
-
- static bool SleepMs(int millis);
-
-
- const std::string& name() const { return name_; }
- bool SetName(const std::string& name, const void* obj);
-
- bool Start();
-
-
-
-
- virtual void Stop();
-
-
-
- virtual void Run();
- virtual void Send(const Location& posted_from,
- MessageHandler* phandler,
- uint32_t id = 0,
- MessageData* pdata = nullptr);
-
-
-
-
-
-
-
-
-
-
- template <
- class ReturnT,
- typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
- ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
- ReturnT result;
- InvokeInternal(posted_from, [functor, &result] { result = functor(); });
- return result;
- }
- template <
- class ReturnT,
- typename = typename std::enable_if<std::is_void<ReturnT>::value>::type>
- void Invoke(const Location& posted_from, FunctionView<void()> functor) {
- InvokeInternal(posted_from, functor);
- }
-
-
-
- void AllowInvokesToThread(Thread* thread);
-
- void DisallowAllInvokes();
-
-
-
-
- bool IsInvokeToThreadAllowed(rtc::Thread* target);
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- template <class FunctorT>
- void PostTask(const Location& posted_from, FunctorT&& functor) {
- Post(posted_from, GetPostTaskMessageHandler(), 0,
- new rtc_thread_internal::MessageWithFunctor<FunctorT>(
- std::forward<FunctorT>(functor)));
- }
- template <class FunctorT>
- void PostDelayedTask(const Location& posted_from,
- FunctorT&& functor,
- uint32_t milliseconds) {
- PostDelayed(posted_from, milliseconds, GetPostTaskMessageHandler(),
- 0,
- new rtc_thread_internal::MessageWithFunctor<FunctorT>(
- std::forward<FunctorT>(functor)));
- }
-
- void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override;
- void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
- uint32_t milliseconds) override;
- void Delete() override;
-
-
-
- bool ProcessMessages(int cms);
-
-
-
-
-
-
- bool IsOwned();
-
-
-
-
-
-
- bool RunningForTest() { return IsRunning(); }
-
-
-
-
-
-
- bool WrapCurrent();
- void UnwrapCurrent();
-
-
- void DisallowBlockingCalls() { SetAllowBlockingCalls(false); }
- #ifdef WEBRTC_ANDROID
-
-
-
- void DEPRECATED_AllowBlockingCalls() { SetAllowBlockingCalls(true); }
- #endif
- protected:
- class CurrentThreadSetter : CurrentTaskQueueSetter {
- public:
- explicit CurrentThreadSetter(Thread* thread)
- : CurrentTaskQueueSetter(thread),
- manager_(rtc::ThreadManager::Instance()),
- previous_(manager_->CurrentThread()) {
- manager_->ChangeCurrentThreadForTest(thread);
- }
- ~CurrentThreadSetter() { manager_->ChangeCurrentThreadForTest(previous_); }
- private:
- rtc::ThreadManager* const manager_;
- rtc::Thread* const previous_;
- };
-
-
- class DelayedMessage {
- public:
- DelayedMessage(int64_t delay,
- int64_t run_time_ms,
- uint32_t num,
- const Message& msg)
- : delay_ms_(delay),
- run_time_ms_(run_time_ms),
- message_number_(num),
- msg_(msg) {}
- bool operator<(const DelayedMessage& dmsg) const {
- return (dmsg.run_time_ms_ < run_time_ms_) ||
- ((dmsg.run_time_ms_ == run_time_ms_) &&
- (dmsg.message_number_ < message_number_));
- }
- int64_t delay_ms_;
- int64_t run_time_ms_;
-
-
- uint32_t message_number_;
- Message msg_;
- };
- class PriorityQueue : public std::priority_queue<DelayedMessage> {
- public:
- container_type& container() { return c; }
- void reheap() { make_heap(c.begin(), c.end(), comp); }
- };
- void DoDelayPost(const Location& posted_from,
- int64_t cmsDelay,
- int64_t tstamp,
- MessageHandler* phandler,
- uint32_t id,
- MessageData* pdata);
-
-
- void DoInit();
-
-
- void ClearInternal(MessageHandler* phandler,
- uint32_t id,
- MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
-
-
- void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
- void WakeUpSocketServer();
-
-
-
- void SafeWrapCurrent();
-
- void Join();
- static void AssertBlockingIsAllowedOnCurrentThread();
- friend class ScopedDisallowBlockingCalls;
- RecursiveCriticalSection* CritForTest() { return &crit_; }
- private:
- class QueuedTaskHandler final : public MessageHandler {
- public:
- QueuedTaskHandler() {}
- void OnMessage(Message* msg) override;
- };
-
-
- bool SetAllowBlockingCalls(bool allow);
- #if defined(WEBRTC_WIN)
- static DWORD WINAPI PreRun(LPVOID context);
- #else
- static void* PreRun(void* pv);
- #endif
-
-
-
-
-
- bool WrapCurrentWithThreadManager(ThreadManager* thread_manager,
- bool need_synchronize_access);
-
- bool IsRunning();
- void InvokeInternal(const Location& posted_from,
- rtc::FunctionView<void()> functor);
-
- void EnsureIsCurrentTaskQueue();
-
- void ClearCurrentTaskQueue();
-
-
- static MessageHandler* GetPostTaskMessageHandler();
- bool fPeekKeep_;
- Message msgPeek_;
- MessageList messages_ RTC_GUARDED_BY(crit_);
- PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
- uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
- #if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
- std::vector<Thread*> allowed_threads_ RTC_GUARDED_BY(this);
- bool invoke_policy_enabled_ RTC_GUARDED_BY(this) = false;
- #endif
- RecursiveCriticalSection crit_;
- bool fInitialized_;
- bool fDestroyed_;
- volatile int stop_;
-
- SocketServer* const ss_;
-
- std::unique_ptr<SocketServer> own_ss_;
- std::string name_;
-
-
- #if defined(WEBRTC_POSIX)
- pthread_t thread_ = 0;
- #endif
- #if defined(WEBRTC_WIN)
- HANDLE thread_ = nullptr;
- DWORD thread_id_ = 0;
- #endif
-
-
-
- bool owned_ = true;
-
- bool blocking_calls_allowed_ = true;
-
- QueuedTaskHandler queued_task_handler_;
- std::unique_ptr<TaskQueueBase::CurrentTaskQueueSetter>
- task_queue_registration_;
- friend class ThreadManager;
- RTC_DISALLOW_COPY_AND_ASSIGN(Thread);
- };
- class AutoThread : public Thread {
- public:
- AutoThread();
- ~AutoThread() override;
- private:
- RTC_DISALLOW_COPY_AND_ASSIGN(AutoThread);
- };
- class AutoSocketServerThread : public Thread {
- public:
- explicit AutoSocketServerThread(SocketServer* ss);
- ~AutoSocketServerThread() override;
- private:
- rtc::Thread* old_thread_;
- RTC_DISALLOW_COPY_AND_ASSIGN(AutoSocketServerThread);
- };
- }
- #endif
|