async_frontend.hpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. /*
  2. * Copyright Andrey Semashev 2007 - 2015.
  3. * Distributed under the Boost Software License, Version 1.0.
  4. * (See accompanying file LICENSE_1_0.txt or copy at
  5. * http://www.boost.org/LICENSE_1_0.txt)
  6. */
  7. /*!
  8. * \file async_frontend.hpp
  9. * \author Andrey Semashev
  10. * \date 14.07.2009
  11. *
  12. * The header contains implementation of asynchronous sink frontend.
  13. */
  14. #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
  15. #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
  16. #include <exception> // std::terminate
  17. #include <boost/log/detail/config.hpp>
  18. #ifdef BOOST_HAS_PRAGMA_ONCE
  19. #pragma once
  20. #endif
  21. #if defined(BOOST_LOG_NO_THREADS)
  22. #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
  23. #endif
  24. #include <boost/static_assert.hpp>
  25. #include <boost/memory_order.hpp>
  26. #include <boost/atomic/atomic.hpp>
  27. #include <boost/smart_ptr/shared_ptr.hpp>
  28. #include <boost/smart_ptr/make_shared_object.hpp>
  29. #include <boost/preprocessor/control/if.hpp>
  30. #include <boost/preprocessor/comparison/equal.hpp>
  31. #include <boost/thread/locks.hpp>
  32. #include <boost/thread/recursive_mutex.hpp>
  33. #include <boost/thread/thread.hpp>
  34. #include <boost/thread/condition_variable.hpp>
  35. #include <boost/log/exceptions.hpp>
  36. #include <boost/log/detail/locking_ptr.hpp>
  37. #include <boost/log/detail/parameter_tools.hpp>
  38. #include <boost/log/core/record_view.hpp>
  39. #include <boost/log/sinks/basic_sink_frontend.hpp>
  40. #include <boost/log/sinks/frontend_requirements.hpp>
  41. #include <boost/log/sinks/unbounded_fifo_queue.hpp>
  42. #include <boost/log/keywords/start_thread.hpp>
  43. #include <boost/log/detail/header.hpp>
  44. namespace boost {
  45. BOOST_LOG_OPEN_NAMESPACE
  46. namespace sinks {
  47. #ifndef BOOST_LOG_DOXYGEN_PASS
  48. #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(z, n, data)\
  49. template< typename T0 >\
  50. explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
  51. base_type(true),\
  52. queue_base_type(arg0),\
  53. m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
  54. m_ActiveOperation(idle),\
  55. m_StopRequested(false),\
  56. m_FlushRequested(false)\
  57. {\
  58. if (arg0[keywords::start_thread | true])\
  59. start_feeding_thread();\
  60. }\
  61. template< typename T0 >\
  62. explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\
  63. base_type(true),\
  64. queue_base_type(arg0),\
  65. m_pBackend(backend),\
  66. m_ActiveOperation(idle),\
  67. m_StopRequested(false),\
  68. m_FlushRequested(false)\
  69. {\
  70. if (arg0[keywords::start_thread | true])\
  71. start_feeding_thread();\
  72. }
  73. #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(z, n, data)\
  74. template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
  75. explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
  76. base_type(true),\
  77. queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
  78. m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
  79. m_ActiveOperation(idle),\
  80. m_StopRequested(false),\
  81. m_FlushRequested(false)\
  82. {\
  83. if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
  84. start_feeding_thread();\
  85. }\
  86. template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
  87. explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
  88. base_type(true),\
  89. queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
  90. m_pBackend(backend),\
  91. m_ActiveOperation(idle),\
  92. m_StopRequested(false),\
  93. m_FlushRequested(false)\
  94. {\
  95. if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
  96. start_feeding_thread();\
  97. }
  98. #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
  99. BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(z, n, data)
  100. #endif // BOOST_LOG_DOXYGEN_PASS
  101. /*!
  102. * \brief Asynchronous logging sink frontend
  103. *
  104. * The frontend starts a separate thread on construction. All logging records are passed
  105. * to the backend in this dedicated thread.
  106. *
  107. * The user can prevent spawning the internal thread by specifying \c start_thread parameter
  108. * with the value of \c false on construction. In this case log records will be buffered
  109. * in the internal queue until the user calls \c run, \c feed_records or \c flush in his own
  110. * thread. Log record queueing strategy is specified in the \c QueueingStrategyT template
  111. * parameter.
  112. */
  113. template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
  114. class asynchronous_sink :
  115. public aux::make_sink_frontend_base< SinkBackendT >::type,
  116. public QueueingStrategyT
  117. {
  118. typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
  119. typedef QueueingStrategyT queue_base_type;
  120. private:
  121. //! Backend synchronization mutex type
  122. typedef boost::recursive_mutex backend_mutex_type;
  123. //! Frontend synchronization mutex type
  124. typedef typename base_type::mutex_type frontend_mutex_type;
  125. //! Operation bit mask
  126. enum operation
  127. {
  128. idle = 0u,
  129. feeding_records = 1u,
  130. flushing = 3u
  131. };
  132. //! Function object to run the log record feeding thread
  133. class run_func
  134. {
  135. public:
  136. typedef void result_type;
  137. private:
  138. asynchronous_sink* m_self;
  139. public:
  140. explicit run_func(asynchronous_sink* self) BOOST_NOEXCEPT : m_self(self)
  141. {
  142. }
  143. result_type operator()() const
  144. {
  145. m_self->run();
  146. }
  147. };
  148. //! A scope guard that implements active operation management
  149. class scoped_feeding_opereation
  150. {
  151. private:
  152. asynchronous_sink& m_self;
  153. public:
  154. //! Initializing constructor
  155. explicit scoped_feeding_opereation(asynchronous_sink& self) : m_self(self)
  156. {
  157. }
  158. //! Destructor
  159. ~scoped_feeding_opereation()
  160. {
  161. m_self.complete_feeding_operation();
  162. }
  163. BOOST_DELETED_FUNCTION(scoped_feeding_opereation(scoped_feeding_opereation const&))
  164. BOOST_DELETED_FUNCTION(scoped_feeding_opereation& operator= (scoped_feeding_opereation const&))
  165. };
  166. //! A scope guard that resets a flag on destructor
  167. class scoped_flag
  168. {
  169. private:
  170. frontend_mutex_type& m_Mutex;
  171. condition_variable_any& m_Cond;
  172. boost::atomic< bool >& m_Flag;
  173. public:
  174. explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, boost::atomic< bool >& f) :
  175. m_Mutex(mut), m_Cond(cond), m_Flag(f)
  176. {
  177. }
  178. ~scoped_flag()
  179. {
  180. try
  181. {
  182. lock_guard< frontend_mutex_type > lock(m_Mutex);
  183. m_Flag.store(false, boost::memory_order_relaxed);
  184. m_Cond.notify_all();
  185. }
  186. catch (...)
  187. {
  188. }
  189. }
  190. BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&))
  191. BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&))
  192. };
  193. public:
  194. //! Sink implementation type
  195. typedef SinkBackendT sink_backend_type;
  196. //! \cond
  197. BOOST_STATIC_ASSERT_MSG((has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value), "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
  198. //! \endcond
  199. #ifndef BOOST_LOG_DOXYGEN_PASS
  200. //! A pointer type that locks the backend until it's destroyed
  201. typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr;
  202. #else // BOOST_LOG_DOXYGEN_PASS
  203. //! A pointer type that locks the backend until it's destroyed
  204. typedef implementation_defined locked_backend_ptr;
  205. #endif // BOOST_LOG_DOXYGEN_PASS
  206. private:
  207. //! Synchronization mutex
  208. backend_mutex_type m_BackendMutex;
  209. //! Pointer to the backend
  210. const shared_ptr< sink_backend_type > m_pBackend;
  211. //! Dedicated record feeding thread
  212. thread m_DedicatedFeedingThread;
  213. //! Condition variable to implement blocking operations
  214. condition_variable_any m_BlockCond;
  215. //! Currently active operation
  216. operation m_ActiveOperation;
  217. //! The flag indicates that the feeding loop has to be stopped
  218. boost::atomic< bool > m_StopRequested;
  219. //! The flag indicates that queue flush has been requested
  220. boost::atomic< bool > m_FlushRequested;
  221. public:
  222. /*!
  223. * Default constructor. Constructs the sink backend instance.
  224. * Requires the backend to be default-constructible.
  225. *
  226. * \param start_thread If \c true, the frontend creates a thread to feed
  227. * log records to the backend. Otherwise no thread is
  228. * started and it is assumed that the user will call
  229. * \c run, \c feed_records or \c flush himself.
  230. */
  231. explicit asynchronous_sink(bool start_thread = true) :
  232. base_type(true),
  233. m_pBackend(boost::make_shared< sink_backend_type >()),
  234. m_ActiveOperation(idle),
  235. m_StopRequested(false),
  236. m_FlushRequested(false)
  237. {
  238. if (start_thread)
  239. start_feeding_thread();
  240. }
  241. /*!
  242. * Constructor attaches user-constructed backend instance
  243. *
  244. * \param backend Pointer to the backend instance.
  245. * \param start_thread If \c true, the frontend creates a thread to feed
  246. * log records to the backend. Otherwise no thread is
  247. * started and it is assumed that the user will call
  248. * \c run, \c feed_records or \c flush himself.
  249. *
  250. * \pre \a backend is not \c NULL.
  251. */
  252. explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
  253. base_type(true),
  254. m_pBackend(backend),
  255. m_ActiveOperation(idle),
  256. m_StopRequested(false),
  257. m_FlushRequested(false)
  258. {
  259. if (start_thread)
  260. start_feeding_thread();
  261. }
  262. /*!
  263. * Constructor that passes arbitrary named parameters to the interprocess sink backend constructor.
  264. * Refer to the backend documentation for the list of supported parameters.
  265. *
  266. * The frontend uses the following named parameters:
  267. *
  268. * \li start_thread - If \c true, the frontend creates a thread to feed
  269. * log records to the backend. Otherwise no thread is
  270. * started and it is assumed that the user will call
  271. * \c run, \c feed_records or \c flush himself.
  272. */
  273. #ifndef BOOST_LOG_DOXYGEN_PASS
  274. BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
  275. #else
  276. template< typename... Args >
  277. explicit asynchronous_sink(Args&&... args);
  278. #endif
  279. /*!
  280. * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
  281. */
  282. ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE
  283. {
  284. try
  285. {
  286. boost::this_thread::disable_interruption no_interrupts;
  287. stop();
  288. }
  289. catch (...)
  290. {
  291. std::terminate();
  292. }
  293. }
  294. /*!
  295. * Locking accessor to the attached backend
  296. */
  297. locked_backend_ptr locked_backend()
  298. {
  299. return locked_backend_ptr(m_pBackend, m_BackendMutex);
  300. }
  301. /*!
  302. * Enqueues the log record to the backend
  303. */
  304. void consume(record_view const& rec) BOOST_OVERRIDE
  305. {
  306. if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
  307. {
  308. unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  309. // Wait until flush is done
  310. while (m_FlushRequested.load(boost::memory_order_acquire))
  311. m_BlockCond.wait(lock);
  312. }
  313. queue_base_type::enqueue(rec);
  314. }
  315. /*!
  316. * The method attempts to pass logging record to the backend
  317. */
  318. bool try_consume(record_view const& rec) BOOST_OVERRIDE
  319. {
  320. if (!m_FlushRequested.load(boost::memory_order_acquire))
  321. {
  322. return queue_base_type::try_enqueue(rec);
  323. }
  324. else
  325. return false;
  326. }
  327. /*!
  328. * The method starts record feeding loop and effectively blocks until either of this happens:
  329. *
  330. * \li the thread is interrupted due to either standard thread interruption or a call to \c stop
  331. * \li an exception is thrown while processing a log record in the backend, and the exception is
  332. * not terminated by the exception handler, if one is installed
  333. *
  334. * \pre The sink frontend must be constructed without spawning a dedicated thread
  335. */
  336. void run()
  337. {
  338. // First check that no other thread is running
  339. {
  340. unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  341. if (start_feeding_operation(lock, feeding_records))
  342. return;
  343. }
  344. scoped_feeding_opereation guard(*this);
  345. // Now start the feeding loop
  346. while (true)
  347. {
  348. do_feed_records();
  349. if (!m_StopRequested.load(boost::memory_order_acquire))
  350. {
  351. // Block until new record is available
  352. record_view rec;
  353. if (queue_base_type::dequeue_ready(rec))
  354. base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
  355. }
  356. else
  357. break;
  358. }
  359. }
  360. /*!
  361. * The method softly interrupts record feeding loop. This method must be called when \c run,
  362. * \c feed_records or \c flush method execution has to be interrupted. Unlike regular thread
  363. * interruption, calling \c stop will not interrupt the record processing in the middle.
  364. * Instead, the sink frontend will attempt to finish its business with the record in progress
  365. * and return afterwards. This method can be called either if the sink was created with
  366. * an internal dedicated thread, or if the feeding loop was initiated by user.
  367. *
  368. * If no record feeding operation is in progress, calling \c stop marks the sink frontend
  369. * so that the next feeding operation stops immediately.
  370. *
  371. * \note Returning from this method does not guarantee that there are no records left buffered
  372. * in the sink frontend. It is possible that log records keep coming during and after this
  373. * method is called. At some point of execution of this method log records stop being processed,
  374. * and all records that come after this point are put into the queue. These records will be
  375. * processed upon further calls to \c run or \c feed_records.
  376. *
  377. * \note If the record feeding loop is being run in a user's thread (i.e. \c start_thread was specified
  378. * as \c false on frontend construction), this method does not guarantee that upon return the thread
  379. * has returned from the record feeding loop or that it won't enter it in the future. The method
  380. * only ensures that the record feeding thread will eventually return from the feeding loop. It is
  381. * user's responsibility to synchronize with the user's record feeding thread.
  382. */
  383. void stop()
  384. {
  385. boost::thread feeding_thread;
  386. {
  387. lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
  388. m_StopRequested.store(true, boost::memory_order_release);
  389. queue_base_type::interrupt_dequeue();
  390. m_DedicatedFeedingThread.swap(feeding_thread);
  391. }
  392. if (feeding_thread.joinable())
  393. feeding_thread.join();
  394. }
  395. /*!
  396. * The method feeds log records that may have been buffered to the backend and returns
  397. *
  398. * \pre The sink frontend must be constructed without spawning a dedicated thread
  399. */
  400. void feed_records()
  401. {
  402. // First check that no other thread is running
  403. {
  404. unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  405. if (start_feeding_operation(lock, feeding_records))
  406. return;
  407. }
  408. scoped_feeding_opereation guard(*this);
  409. // Now start the feeding loop
  410. do_feed_records();
  411. }
  412. /*!
  413. * The method feeds all log records that may have been buffered to the backend and returns.
  414. * Unlike \c feed_records, in case of ordering queueing the method also feeds records
  415. * that were enqueued during the ordering window, attempting to drain the queue completely.
  416. */
  417. void flush() BOOST_OVERRIDE
  418. {
  419. {
  420. unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
  421. if (static_cast< unsigned int >(m_ActiveOperation & feeding_records) != 0u)
  422. {
  423. // There is already a thread feeding records, let it do the job
  424. m_FlushRequested.store(true, boost::memory_order_release);
  425. queue_base_type::interrupt_dequeue();
  426. while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
  427. m_BlockCond.wait(lock);
  428. // The condition may have been signalled when the feeding operation was finishing.
  429. // In that case records may not have been flushed, and we do the flush ourselves.
  430. if (m_ActiveOperation != idle)
  431. return;
  432. }
  433. m_ActiveOperation = flushing;
  434. m_FlushRequested.store(true, boost::memory_order_relaxed);
  435. }
  436. scoped_feeding_opereation guard(*this);
  437. do_feed_records();
  438. }
  439. private:
  440. #ifndef BOOST_LOG_DOXYGEN_PASS
  441. //! The method spawns record feeding thread
  442. void start_feeding_thread()
  443. {
  444. boost::thread(run_func(this)).swap(m_DedicatedFeedingThread);
  445. }
  446. //! Starts record feeding operation. The method blocks or throws if another feeding operation is in progress.
  447. bool start_feeding_operation(unique_lock< frontend_mutex_type >& lock, operation op)
  448. {
  449. while (m_ActiveOperation != idle)
  450. {
  451. if (BOOST_UNLIKELY(op == feeding_records && m_ActiveOperation == feeding_records))
  452. BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
  453. if (BOOST_UNLIKELY(m_StopRequested.load(boost::memory_order_relaxed)))
  454. {
  455. m_StopRequested.store(false, boost::memory_order_relaxed);
  456. return true;
  457. }
  458. m_BlockCond.wait(lock);
  459. }
  460. m_ActiveOperation = op;
  461. return false;
  462. }
  463. //! Completes record feeding operation
  464. void complete_feeding_operation() BOOST_NOEXCEPT
  465. {
  466. try
  467. {
  468. lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
  469. m_ActiveOperation = idle;
  470. m_StopRequested.store(false, boost::memory_order_relaxed);
  471. m_BlockCond.notify_all();
  472. }
  473. catch (...)
  474. {
  475. }
  476. }
  477. //! The record feeding loop
  478. void do_feed_records()
  479. {
  480. while (!m_StopRequested.load(boost::memory_order_acquire))
  481. {
  482. record_view rec;
  483. bool dequeued = false;
  484. if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire)))
  485. dequeued = queue_base_type::try_dequeue_ready(rec);
  486. else
  487. dequeued = queue_base_type::try_dequeue(rec);
  488. if (dequeued)
  489. base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
  490. else
  491. break;
  492. }
  493. if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
  494. {
  495. scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
  496. base_type::flush_backend(m_BackendMutex, *m_pBackend);
  497. }
  498. }
  499. #endif // BOOST_LOG_DOXYGEN_PASS
  500. };
  501. #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1
  502. #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N
  503. #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
  504. } // namespace sinks
  505. BOOST_LOG_CLOSE_NAMESPACE // namespace log
  506. } // namespace boost
  507. #include <boost/log/detail/footer.hpp>
  508. #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_