close.hpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
  11. #include <boost/beast/websocket/teardown.hpp>
  12. #include <boost/beast/websocket/detail/mask.hpp>
  13. #include <boost/beast/websocket/impl/stream_impl.hpp>
  14. #include <boost/beast/core/async_base.hpp>
  15. #include <boost/beast/core/flat_static_buffer.hpp>
  16. #include <boost/beast/core/stream_traits.hpp>
  17. #include <boost/beast/core/detail/bind_continuation.hpp>
  18. #include <boost/asio/coroutine.hpp>
  19. #include <boost/asio/post.hpp>
  20. #include <boost/throw_exception.hpp>
  21. #include <memory>
  22. namespace boost {
  23. namespace beast {
  24. namespace websocket {
  25. /* Close the WebSocket Connection
  26. This composed operation sends the close frame if it hasn't already
  27. been sent, then reads and discards frames until receiving a close
  28. frame. Finally it invokes the teardown operation to shut down the
  29. underlying connection.
  30. */
  31. template<class NextLayer, bool deflateSupported>
  32. template<class Handler>
  33. class stream<NextLayer, deflateSupported>::close_op
  34. : public beast::stable_async_base<
  35. Handler, beast::executor_type<stream>>
  36. , public asio::coroutine
  37. {
  38. boost::weak_ptr<impl_type> wp_;
  39. error_code ev_;
  40. detail::frame_buffer& fb_;
  41. public:
  42. static constexpr int id = 5; // for soft_mutex
  43. template<class Handler_>
  44. close_op(
  45. Handler_&& h,
  46. boost::shared_ptr<impl_type> const& sp,
  47. close_reason const& cr)
  48. : stable_async_base<Handler,
  49. beast::executor_type<stream>>(
  50. std::forward<Handler_>(h),
  51. sp->stream().get_executor())
  52. , wp_(sp)
  53. , fb_(beast::allocate_stable<
  54. detail::frame_buffer>(*this))
  55. {
  56. // Serialize the close frame
  57. sp->template write_close<
  58. flat_static_buffer_base>(fb_, cr);
  59. (*this)({}, 0, false);
  60. }
  61. void
  62. operator()(
  63. error_code ec = {},
  64. std::size_t bytes_transferred = 0,
  65. bool cont = true)
  66. {
  67. using beast::detail::clamp;
  68. auto sp = wp_.lock();
  69. if(! sp)
  70. {
  71. ec = net::error::operation_aborted;
  72. return this->complete(cont, ec);
  73. }
  74. auto& impl = *sp;
  75. BOOST_ASIO_CORO_REENTER(*this)
  76. {
  77. // Acquire the write lock
  78. if(! impl.wr_block.try_lock(this))
  79. {
  80. BOOST_ASIO_CORO_YIELD
  81. {
  82. BOOST_ASIO_HANDLER_LOCATION((
  83. __FILE__, __LINE__,
  84. "websocket::async_close"));
  85. impl.op_close.emplace(std::move(*this));
  86. }
  87. impl.wr_block.lock(this);
  88. BOOST_ASIO_CORO_YIELD
  89. {
  90. BOOST_ASIO_HANDLER_LOCATION((
  91. __FILE__, __LINE__,
  92. "websocket::async_close"));
  93. net::post(std::move(*this));
  94. }
  95. BOOST_ASSERT(impl.wr_block.is_locked(this));
  96. }
  97. if(impl.check_stop_now(ec))
  98. goto upcall;
  99. // Can't call close twice
  100. // TODO return a custom error code
  101. BOOST_ASSERT(! impl.wr_close);
  102. // Send close frame
  103. impl.wr_close = true;
  104. impl.change_status(status::closing);
  105. impl.update_timer(this->get_executor());
  106. BOOST_ASIO_CORO_YIELD
  107. {
  108. BOOST_ASIO_HANDLER_LOCATION((
  109. __FILE__, __LINE__,
  110. "websocket::async_close"));
  111. net::async_write(impl.stream(), fb_.data(),
  112. beast::detail::bind_continuation(std::move(*this)));
  113. }
  114. if(impl.check_stop_now(ec))
  115. goto upcall;
  116. if(impl.rd_close)
  117. {
  118. // This happens when the read_op gets a close frame
  119. // at the same time close_op is sending the close frame.
  120. // The read_op will be suspended on the write block.
  121. goto teardown;
  122. }
  123. // Acquire the read lock
  124. if(! impl.rd_block.try_lock(this))
  125. {
  126. BOOST_ASIO_CORO_YIELD
  127. {
  128. BOOST_ASIO_HANDLER_LOCATION((
  129. __FILE__, __LINE__,
  130. "websocket::async_close"));
  131. impl.op_r_close.emplace(std::move(*this));
  132. }
  133. impl.rd_block.lock(this);
  134. BOOST_ASIO_CORO_YIELD
  135. {
  136. BOOST_ASIO_HANDLER_LOCATION((
  137. __FILE__, __LINE__,
  138. "websocket::async_close"));
  139. net::post(std::move(*this));
  140. }
  141. BOOST_ASSERT(impl.rd_block.is_locked(this));
  142. if(impl.check_stop_now(ec))
  143. goto upcall;
  144. BOOST_ASSERT(! impl.rd_close);
  145. }
  146. // Read until a receiving a close frame
  147. // TODO There should be a timeout on this
  148. if(impl.rd_remain > 0)
  149. goto read_payload;
  150. for(;;)
  151. {
  152. // Read frame header
  153. while(! impl.parse_fh(
  154. impl.rd_fh, impl.rd_buf, ev_))
  155. {
  156. if(ev_)
  157. goto teardown;
  158. BOOST_ASIO_CORO_YIELD
  159. {
  160. BOOST_ASIO_HANDLER_LOCATION((
  161. __FILE__, __LINE__,
  162. "websocket::async_close"));
  163. impl.stream().async_read_some(
  164. impl.rd_buf.prepare(read_size(
  165. impl.rd_buf, impl.rd_buf.max_size())),
  166. beast::detail::bind_continuation(std::move(*this)));
  167. }
  168. impl.rd_buf.commit(bytes_transferred);
  169. if(impl.check_stop_now(ec))
  170. goto upcall;
  171. }
  172. if(detail::is_control(impl.rd_fh.op))
  173. {
  174. // Discard ping or pong frame
  175. if(impl.rd_fh.op != detail::opcode::close)
  176. {
  177. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  178. continue;
  179. }
  180. // Process close frame
  181. // TODO Should we invoke the control callback?
  182. BOOST_ASSERT(! impl.rd_close);
  183. impl.rd_close = true;
  184. auto const mb = buffers_prefix(
  185. clamp(impl.rd_fh.len),
  186. impl.rd_buf.data());
  187. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  188. detail::mask_inplace(mb, impl.rd_key);
  189. detail::read_close(impl.cr, mb, ev_);
  190. if(ev_)
  191. goto teardown;
  192. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  193. goto teardown;
  194. }
  195. read_payload:
  196. // Discard message frame
  197. while(impl.rd_buf.size() < impl.rd_remain)
  198. {
  199. impl.rd_remain -= impl.rd_buf.size();
  200. impl.rd_buf.consume(impl.rd_buf.size());
  201. BOOST_ASIO_CORO_YIELD
  202. {
  203. BOOST_ASIO_HANDLER_LOCATION((
  204. __FILE__, __LINE__,
  205. "websocket::async_close"));
  206. impl.stream().async_read_some(
  207. impl.rd_buf.prepare(read_size(
  208. impl.rd_buf, impl.rd_buf.max_size())),
  209. beast::detail::bind_continuation(std::move(*this)));
  210. }
  211. impl.rd_buf.commit(bytes_transferred);
  212. if(impl.check_stop_now(ec))
  213. goto upcall;
  214. }
  215. BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
  216. impl.rd_buf.consume(clamp(impl.rd_remain));
  217. impl.rd_remain = 0;
  218. }
  219. teardown:
  220. // Teardown
  221. BOOST_ASSERT(impl.wr_block.is_locked(this));
  222. using beast::websocket::async_teardown;
  223. BOOST_ASIO_CORO_YIELD
  224. {
  225. BOOST_ASIO_HANDLER_LOCATION((
  226. __FILE__, __LINE__,
  227. "websocket::async_close"));
  228. async_teardown(impl.role, impl.stream(),
  229. beast::detail::bind_continuation(std::move(*this)));
  230. }
  231. BOOST_ASSERT(impl.wr_block.is_locked(this));
  232. if(ec == net::error::eof)
  233. {
  234. // Rationale:
  235. // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
  236. ec = {};
  237. }
  238. if(! ec)
  239. ec = ev_;
  240. if(ec)
  241. impl.change_status(status::failed);
  242. else
  243. impl.change_status(status::closed);
  244. impl.close();
  245. upcall:
  246. impl.wr_block.unlock(this);
  247. impl.rd_block.try_unlock(this)
  248. && impl.op_r_rd.maybe_invoke();
  249. impl.op_rd.maybe_invoke()
  250. || impl.op_idle_ping.maybe_invoke()
  251. || impl.op_ping.maybe_invoke()
  252. || impl.op_wr.maybe_invoke();
  253. this->complete(cont, ec);
  254. }
  255. }
  256. };
  257. template<class NextLayer, bool deflateSupported>
  258. struct stream<NextLayer, deflateSupported>::
  259. run_close_op
  260. {
  261. template<class CloseHandler>
  262. void
  263. operator()(
  264. CloseHandler&& h,
  265. boost::shared_ptr<impl_type> const& sp,
  266. close_reason const& cr)
  267. {
  268. // If you get an error on the following line it means
  269. // that your handler does not meet the documented type
  270. // requirements for the handler.
  271. static_assert(
  272. beast::detail::is_invocable<CloseHandler,
  273. void(error_code)>::value,
  274. "CloseHandler type requirements not met");
  275. close_op<
  276. typename std::decay<CloseHandler>::type>(
  277. std::forward<CloseHandler>(h),
  278. sp,
  279. cr);
  280. }
  281. };
  282. //------------------------------------------------------------------------------
  283. template<class NextLayer, bool deflateSupported>
  284. void
  285. stream<NextLayer, deflateSupported>::
  286. close(close_reason const& cr)
  287. {
  288. static_assert(is_sync_stream<next_layer_type>::value,
  289. "SyncStream type requirements not met");
  290. error_code ec;
  291. close(cr, ec);
  292. if(ec)
  293. BOOST_THROW_EXCEPTION(system_error{ec});
  294. }
  295. template<class NextLayer, bool deflateSupported>
  296. void
  297. stream<NextLayer, deflateSupported>::
  298. close(close_reason const& cr, error_code& ec)
  299. {
  300. static_assert(is_sync_stream<next_layer_type>::value,
  301. "SyncStream type requirements not met");
  302. using beast::detail::clamp;
  303. auto& impl = *impl_;
  304. ec = {};
  305. if(impl.check_stop_now(ec))
  306. return;
  307. BOOST_ASSERT(! impl.rd_close);
  308. // Can't call close twice
  309. // TODO return a custom error code
  310. BOOST_ASSERT(! impl.wr_close);
  311. // Send close frame
  312. {
  313. impl.wr_close = true;
  314. impl.change_status(status::closing);
  315. detail::frame_buffer fb;
  316. impl.template write_close<flat_static_buffer_base>(fb, cr);
  317. net::write(impl.stream(), fb.data(), ec);
  318. if(impl.check_stop_now(ec))
  319. return;
  320. }
  321. // Read until a receiving a close frame
  322. error_code ev;
  323. if(impl.rd_remain > 0)
  324. goto read_payload;
  325. for(;;)
  326. {
  327. // Read frame header
  328. while(! impl.parse_fh(
  329. impl.rd_fh, impl.rd_buf, ev))
  330. {
  331. if(ev)
  332. {
  333. // Protocol violation
  334. return do_fail(close_code::none, ev, ec);
  335. }
  336. impl.rd_buf.commit(impl.stream().read_some(
  337. impl.rd_buf.prepare(read_size(
  338. impl.rd_buf, impl.rd_buf.max_size())), ec));
  339. if(impl.check_stop_now(ec))
  340. return;
  341. }
  342. if(detail::is_control(impl.rd_fh.op))
  343. {
  344. // Discard ping/pong frame
  345. if(impl.rd_fh.op != detail::opcode::close)
  346. {
  347. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  348. continue;
  349. }
  350. // Handle close frame
  351. // TODO Should we invoke the control callback?
  352. BOOST_ASSERT(! impl.rd_close);
  353. impl.rd_close = true;
  354. auto const mb = buffers_prefix(
  355. clamp(impl.rd_fh.len),
  356. impl.rd_buf.data());
  357. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  358. detail::mask_inplace(mb, impl.rd_key);
  359. detail::read_close(impl.cr, mb, ev);
  360. if(ev)
  361. {
  362. // Protocol violation
  363. return do_fail(close_code::none, ev, ec);
  364. }
  365. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  366. break;
  367. }
  368. read_payload:
  369. // Discard message frame
  370. while(impl.rd_buf.size() < impl.rd_remain)
  371. {
  372. impl.rd_remain -= impl.rd_buf.size();
  373. impl.rd_buf.consume(impl.rd_buf.size());
  374. impl.rd_buf.commit(
  375. impl.stream().read_some(
  376. impl.rd_buf.prepare(
  377. read_size(
  378. impl.rd_buf,
  379. impl.rd_buf.max_size())),
  380. ec));
  381. if(impl.check_stop_now(ec))
  382. return;
  383. }
  384. BOOST_ASSERT(
  385. impl.rd_buf.size() >= impl.rd_remain);
  386. impl.rd_buf.consume(clamp(impl.rd_remain));
  387. impl.rd_remain = 0;
  388. }
  389. // _Close the WebSocket Connection_
  390. do_fail(close_code::none, error::closed, ec);
  391. if(ec == error::closed)
  392. ec = {};
  393. }
  394. template<class NextLayer, bool deflateSupported>
  395. template<BOOST_BEAST_ASYNC_TPARAM1 CloseHandler>
  396. BOOST_BEAST_ASYNC_RESULT1(CloseHandler)
  397. stream<NextLayer, deflateSupported>::
  398. async_close(close_reason const& cr, CloseHandler&& handler)
  399. {
  400. static_assert(is_async_stream<next_layer_type>::value,
  401. "AsyncStream type requirements not met");
  402. return net::async_initiate<
  403. CloseHandler,
  404. void(error_code)>(
  405. run_close_op{},
  406. handler,
  407. impl_,
  408. cr);
  409. }
  410. } // websocket
  411. } // beast
  412. } // boost
  413. #endif