write.hpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_WRITE_HPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_WRITE_HPP
  11. #include <boost/beast/websocket/detail/mask.hpp>
  12. #include <boost/beast/core/async_base.hpp>
  13. #include <boost/beast/core/bind_handler.hpp>
  14. #include <boost/beast/core/buffer_traits.hpp>
  15. #include <boost/beast/core/buffers_cat.hpp>
  16. #include <boost/beast/core/buffers_prefix.hpp>
  17. #include <boost/beast/core/buffers_range.hpp>
  18. #include <boost/beast/core/buffers_suffix.hpp>
  19. #include <boost/beast/core/flat_static_buffer.hpp>
  20. #include <boost/beast/core/stream_traits.hpp>
  21. #include <boost/beast/core/detail/bind_continuation.hpp>
  22. #include <boost/beast/core/detail/clamp.hpp>
  23. #include <boost/beast/core/detail/config.hpp>
  24. #include <boost/beast/websocket/detail/frame.hpp>
  25. #include <boost/beast/websocket/impl/stream_impl.hpp>
  26. #include <boost/asio/coroutine.hpp>
  27. #include <boost/assert.hpp>
  28. #include <boost/config.hpp>
  29. #include <boost/throw_exception.hpp>
  30. #include <algorithm>
  31. #include <memory>
  32. namespace boost {
  33. namespace beast {
  34. namespace websocket {
  35. template<class NextLayer, bool deflateSupported>
  36. template<class Handler, class Buffers>
  37. class stream<NextLayer, deflateSupported>::write_some_op
  38. : public beast::async_base<
  39. Handler, beast::executor_type<stream>>
  40. , public asio::coroutine
  41. {
  42. enum
  43. {
  44. do_nomask_nofrag,
  45. do_nomask_frag,
  46. do_mask_nofrag,
  47. do_mask_frag,
  48. do_deflate
  49. };
  50. boost::weak_ptr<impl_type> wp_;
  51. buffers_suffix<Buffers> cb_;
  52. detail::frame_header fh_;
  53. detail::prepared_key key_;
  54. std::size_t bytes_transferred_ = 0;
  55. std::size_t remain_;
  56. std::size_t in_;
  57. int how_;
  58. bool fin_;
  59. bool more_ = false; // for ubsan
  60. bool cont_ = false;
  61. public:
  62. static constexpr int id = 2; // for soft_mutex
  63. template<class Handler_>
  64. write_some_op(
  65. Handler_&& h,
  66. boost::shared_ptr<impl_type> const& sp,
  67. bool fin,
  68. Buffers const& bs)
  69. : beast::async_base<Handler,
  70. beast::executor_type<stream>>(
  71. std::forward<Handler_>(h),
  72. sp->stream().get_executor())
  73. , wp_(sp)
  74. , cb_(bs)
  75. , fin_(fin)
  76. {
  77. auto& impl = *sp;
  78. // Set up the outgoing frame header
  79. if(! impl.wr_cont)
  80. {
  81. impl.begin_msg();
  82. fh_.rsv1 = impl.wr_compress;
  83. }
  84. else
  85. {
  86. fh_.rsv1 = false;
  87. }
  88. fh_.rsv2 = false;
  89. fh_.rsv3 = false;
  90. fh_.op = impl.wr_cont ?
  91. detail::opcode::cont : impl.wr_opcode;
  92. fh_.mask =
  93. impl.role == role_type::client;
  94. // Choose a write algorithm
  95. if(impl.wr_compress)
  96. {
  97. how_ = do_deflate;
  98. }
  99. else if(! fh_.mask)
  100. {
  101. if(! impl.wr_frag)
  102. {
  103. how_ = do_nomask_nofrag;
  104. }
  105. else
  106. {
  107. BOOST_ASSERT(impl.wr_buf_size != 0);
  108. remain_ = buffer_bytes(cb_);
  109. if(remain_ > impl.wr_buf_size)
  110. how_ = do_nomask_frag;
  111. else
  112. how_ = do_nomask_nofrag;
  113. }
  114. }
  115. else
  116. {
  117. if(! impl.wr_frag)
  118. {
  119. how_ = do_mask_nofrag;
  120. }
  121. else
  122. {
  123. BOOST_ASSERT(impl.wr_buf_size != 0);
  124. remain_ = buffer_bytes(cb_);
  125. if(remain_ > impl.wr_buf_size)
  126. how_ = do_mask_frag;
  127. else
  128. how_ = do_mask_nofrag;
  129. }
  130. }
  131. (*this)({}, 0, false);
  132. }
  133. void operator()(
  134. error_code ec = {},
  135. std::size_t bytes_transferred = 0,
  136. bool cont = true);
  137. };
  138. template<class NextLayer, bool deflateSupported>
  139. template<class Handler, class Buffers>
  140. void
  141. stream<NextLayer, deflateSupported>::
  142. write_some_op<Handler, Buffers>::
  143. operator()(
  144. error_code ec,
  145. std::size_t bytes_transferred,
  146. bool cont)
  147. {
  148. using beast::detail::clamp;
  149. std::size_t n;
  150. net::mutable_buffer b;
  151. auto sp = wp_.lock();
  152. if(! sp)
  153. {
  154. ec = net::error::operation_aborted;
  155. bytes_transferred_ = 0;
  156. return this->complete(cont, ec, bytes_transferred_);
  157. }
  158. auto& impl = *sp;
  159. BOOST_ASIO_CORO_REENTER(*this)
  160. {
  161. // Acquire the write lock
  162. if(! impl.wr_block.try_lock(this))
  163. {
  164. do_suspend:
  165. BOOST_ASIO_CORO_YIELD
  166. {
  167. BOOST_ASIO_HANDLER_LOCATION((
  168. __FILE__, __LINE__,
  169. fin_ ?
  170. "websocket::async_write" :
  171. "websocket::async_write_some"
  172. ));
  173. impl.op_wr.emplace(std::move(*this));
  174. }
  175. impl.wr_block.lock(this);
  176. BOOST_ASIO_CORO_YIELD
  177. {
  178. BOOST_ASIO_HANDLER_LOCATION((
  179. __FILE__, __LINE__,
  180. fin_ ?
  181. "websocket::async_write" :
  182. "websocket::async_write_some"
  183. ));
  184. net::post(std::move(*this));
  185. }
  186. BOOST_ASSERT(impl.wr_block.is_locked(this));
  187. }
  188. if(impl.check_stop_now(ec))
  189. goto upcall;
  190. //------------------------------------------------------------------
  191. if(how_ == do_nomask_nofrag)
  192. {
  193. // send a single frame
  194. fh_.fin = fin_;
  195. fh_.len = buffer_bytes(cb_);
  196. impl.wr_fb.clear();
  197. detail::write<flat_static_buffer_base>(
  198. impl.wr_fb, fh_);
  199. impl.wr_cont = ! fin_;
  200. BOOST_ASIO_CORO_YIELD
  201. {
  202. BOOST_ASIO_HANDLER_LOCATION((
  203. __FILE__, __LINE__,
  204. fin_ ?
  205. "websocket::async_write" :
  206. "websocket::async_write_some"
  207. ));
  208. net::async_write(impl.stream(),
  209. buffers_cat(
  210. net::const_buffer(impl.wr_fb.data()),
  211. net::const_buffer(0, 0),
  212. cb_,
  213. buffers_prefix(0, cb_)
  214. ),
  215. beast::detail::bind_continuation(std::move(*this)));
  216. }
  217. bytes_transferred_ += clamp(fh_.len);
  218. if(impl.check_stop_now(ec))
  219. goto upcall;
  220. goto upcall;
  221. }
  222. //------------------------------------------------------------------
  223. if(how_ == do_nomask_frag)
  224. {
  225. // send multiple frames
  226. for(;;)
  227. {
  228. n = clamp(remain_, impl.wr_buf_size);
  229. fh_.len = n;
  230. remain_ -= n;
  231. fh_.fin = fin_ ? remain_ == 0 : false;
  232. impl.wr_fb.clear();
  233. detail::write<flat_static_buffer_base>(
  234. impl.wr_fb, fh_);
  235. impl.wr_cont = ! fin_;
  236. // Send frame
  237. BOOST_ASIO_CORO_YIELD
  238. {
  239. BOOST_ASIO_HANDLER_LOCATION((
  240. __FILE__, __LINE__,
  241. fin_ ?
  242. "websocket::async_write" :
  243. "websocket::async_write_some"
  244. ));
  245. buffers_suffix<Buffers> empty_cb(cb_);
  246. empty_cb.consume(~std::size_t(0));
  247. net::async_write(impl.stream(),
  248. buffers_cat(
  249. net::const_buffer(impl.wr_fb.data()),
  250. net::const_buffer(0, 0),
  251. empty_cb,
  252. buffers_prefix(clamp(fh_.len), cb_)
  253. ),
  254. beast::detail::bind_continuation(std::move(*this)));
  255. }
  256. n = clamp(fh_.len); // restore `n` on yield
  257. bytes_transferred_ += n;
  258. if(impl.check_stop_now(ec))
  259. goto upcall;
  260. if(remain_ == 0)
  261. break;
  262. cb_.consume(n);
  263. fh_.op = detail::opcode::cont;
  264. // Give up the write lock in between each frame
  265. // so that outgoing control frames might be sent.
  266. impl.wr_block.unlock(this);
  267. if( impl.op_close.maybe_invoke()
  268. || impl.op_idle_ping.maybe_invoke()
  269. || impl.op_rd.maybe_invoke()
  270. || impl.op_ping.maybe_invoke())
  271. {
  272. BOOST_ASSERT(impl.wr_block.is_locked());
  273. goto do_suspend;
  274. }
  275. impl.wr_block.lock(this);
  276. }
  277. goto upcall;
  278. }
  279. //------------------------------------------------------------------
  280. if(how_ == do_mask_nofrag)
  281. {
  282. // send a single frame using multiple writes
  283. remain_ = beast::buffer_bytes(cb_);
  284. fh_.fin = fin_;
  285. fh_.len = remain_;
  286. fh_.key = impl.create_mask();
  287. detail::prepare_key(key_, fh_.key);
  288. impl.wr_fb.clear();
  289. detail::write<flat_static_buffer_base>(
  290. impl.wr_fb, fh_);
  291. n = clamp(remain_, impl.wr_buf_size);
  292. net::buffer_copy(net::buffer(
  293. impl.wr_buf.get(), n), cb_);
  294. detail::mask_inplace(net::buffer(
  295. impl.wr_buf.get(), n), key_);
  296. remain_ -= n;
  297. impl.wr_cont = ! fin_;
  298. // write frame header and some payload
  299. BOOST_ASIO_CORO_YIELD
  300. {
  301. BOOST_ASIO_HANDLER_LOCATION((
  302. __FILE__, __LINE__,
  303. fin_ ?
  304. "websocket::async_write" :
  305. "websocket::async_write_some"
  306. ));
  307. buffers_suffix<Buffers> empty_cb(cb_);
  308. empty_cb.consume(~std::size_t(0));
  309. net::async_write(impl.stream(),
  310. buffers_cat(
  311. net::const_buffer(impl.wr_fb.data()),
  312. net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
  313. empty_cb,
  314. buffers_prefix(0, empty_cb)
  315. ),
  316. beast::detail::bind_continuation(std::move(*this)));
  317. }
  318. // VFALCO What about consuming the buffer on error?
  319. bytes_transferred_ +=
  320. bytes_transferred - impl.wr_fb.size();
  321. if(impl.check_stop_now(ec))
  322. goto upcall;
  323. while(remain_ > 0)
  324. {
  325. cb_.consume(impl.wr_buf_size);
  326. n = clamp(remain_, impl.wr_buf_size);
  327. net::buffer_copy(net::buffer(
  328. impl.wr_buf.get(), n), cb_);
  329. detail::mask_inplace(net::buffer(
  330. impl.wr_buf.get(), n), key_);
  331. remain_ -= n;
  332. // write more payload
  333. BOOST_ASIO_CORO_YIELD
  334. {
  335. BOOST_ASIO_HANDLER_LOCATION((
  336. __FILE__, __LINE__,
  337. fin_ ?
  338. "websocket::async_write" :
  339. "websocket::async_write_some"
  340. ));
  341. buffers_suffix<Buffers> empty_cb(cb_);
  342. empty_cb.consume(~std::size_t(0));
  343. net::async_write(impl.stream(),
  344. buffers_cat(
  345. net::const_buffer(0, 0),
  346. net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
  347. empty_cb,
  348. buffers_prefix(0, empty_cb)
  349. ),
  350. beast::detail::bind_continuation(std::move(*this)));
  351. }
  352. bytes_transferred_ += bytes_transferred;
  353. if(impl.check_stop_now(ec))
  354. goto upcall;
  355. }
  356. goto upcall;
  357. }
  358. //------------------------------------------------------------------
  359. if(how_ == do_mask_frag)
  360. {
  361. // send multiple frames
  362. for(;;)
  363. {
  364. n = clamp(remain_, impl.wr_buf_size);
  365. remain_ -= n;
  366. fh_.len = n;
  367. fh_.key = impl.create_mask();
  368. fh_.fin = fin_ ? remain_ == 0 : false;
  369. detail::prepare_key(key_, fh_.key);
  370. net::buffer_copy(net::buffer(
  371. impl.wr_buf.get(), n), cb_);
  372. detail::mask_inplace(net::buffer(
  373. impl.wr_buf.get(), n), key_);
  374. impl.wr_fb.clear();
  375. detail::write<flat_static_buffer_base>(
  376. impl.wr_fb, fh_);
  377. impl.wr_cont = ! fin_;
  378. // Send frame
  379. BOOST_ASIO_CORO_YIELD
  380. {
  381. BOOST_ASIO_HANDLER_LOCATION((
  382. __FILE__, __LINE__,
  383. fin_ ?
  384. "websocket::async_write" :
  385. "websocket::async_write_some"
  386. ));
  387. buffers_suffix<Buffers> empty_cb(cb_);
  388. empty_cb.consume(~std::size_t(0));
  389. net::async_write(impl.stream(),
  390. buffers_cat(
  391. net::const_buffer(impl.wr_fb.data()),
  392. net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
  393. empty_cb,
  394. buffers_prefix(0, empty_cb)
  395. ),
  396. beast::detail::bind_continuation(std::move(*this)));
  397. }
  398. n = bytes_transferred - impl.wr_fb.size();
  399. bytes_transferred_ += n;
  400. if(impl.check_stop_now(ec))
  401. goto upcall;
  402. if(remain_ == 0)
  403. break;
  404. cb_.consume(n);
  405. fh_.op = detail::opcode::cont;
  406. // Give up the write lock in between each frame
  407. // so that outgoing control frames might be sent.
  408. impl.wr_block.unlock(this);
  409. if( impl.op_close.maybe_invoke()
  410. || impl.op_idle_ping.maybe_invoke()
  411. || impl.op_rd.maybe_invoke()
  412. || impl.op_ping.maybe_invoke())
  413. {
  414. BOOST_ASSERT(impl.wr_block.is_locked());
  415. goto do_suspend;
  416. }
  417. impl.wr_block.lock(this);
  418. }
  419. goto upcall;
  420. }
  421. //------------------------------------------------------------------
  422. if(how_ == do_deflate)
  423. {
  424. // send compressed frames
  425. for(;;)
  426. {
  427. b = net::buffer(impl.wr_buf.get(),
  428. impl.wr_buf_size);
  429. more_ = impl.deflate(b, cb_, fin_, in_, ec);
  430. if(impl.check_stop_now(ec))
  431. goto upcall;
  432. n = buffer_bytes(b);
  433. if(n == 0)
  434. {
  435. // The input was consumed, but there is
  436. // no output due to compression latency.
  437. BOOST_ASSERT(! fin_);
  438. BOOST_ASSERT(buffer_bytes(cb_) == 0);
  439. goto upcall;
  440. }
  441. if(fh_.mask)
  442. {
  443. fh_.key = impl.create_mask();
  444. detail::prepared_key key;
  445. detail::prepare_key(key, fh_.key);
  446. detail::mask_inplace(b, key);
  447. }
  448. fh_.fin = ! more_;
  449. fh_.len = n;
  450. impl.wr_fb.clear();
  451. detail::write<
  452. flat_static_buffer_base>(impl.wr_fb, fh_);
  453. impl.wr_cont = ! fin_;
  454. // Send frame
  455. BOOST_ASIO_CORO_YIELD
  456. {
  457. BOOST_ASIO_HANDLER_LOCATION((
  458. __FILE__, __LINE__,
  459. fin_ ?
  460. "websocket::async_write" :
  461. "websocket::async_write_some"
  462. ));
  463. buffers_suffix<Buffers> empty_cb(cb_);
  464. empty_cb.consume(~std::size_t(0));
  465. net::async_write(impl.stream(),
  466. buffers_cat(
  467. net::const_buffer(impl.wr_fb.data()),
  468. net::const_buffer(b),
  469. empty_cb,
  470. buffers_prefix(0, empty_cb)
  471. ),
  472. beast::detail::bind_continuation(std::move(*this)));
  473. }
  474. bytes_transferred_ += in_;
  475. if(impl.check_stop_now(ec))
  476. goto upcall;
  477. if(more_)
  478. {
  479. fh_.op = detail::opcode::cont;
  480. fh_.rsv1 = false;
  481. // Give up the write lock in between each frame
  482. // so that outgoing control frames might be sent.
  483. impl.wr_block.unlock(this);
  484. if( impl.op_close.maybe_invoke()
  485. || impl.op_idle_ping.maybe_invoke()
  486. || impl.op_rd.maybe_invoke()
  487. || impl.op_ping.maybe_invoke())
  488. {
  489. BOOST_ASSERT(impl.wr_block.is_locked());
  490. goto do_suspend;
  491. }
  492. impl.wr_block.lock(this);
  493. }
  494. else
  495. {
  496. if(fh_.fin)
  497. impl.do_context_takeover_write(impl.role);
  498. goto upcall;
  499. }
  500. }
  501. }
  502. //--------------------------------------------------------------------------
  503. upcall:
  504. impl.wr_block.unlock(this);
  505. impl.op_close.maybe_invoke()
  506. || impl.op_idle_ping.maybe_invoke()
  507. || impl.op_rd.maybe_invoke()
  508. || impl.op_ping.maybe_invoke();
  509. this->complete(cont, ec, bytes_transferred_);
  510. }
  511. }
  512. template<class NextLayer, bool deflateSupported>
  513. struct stream<NextLayer, deflateSupported>::
  514. run_write_some_op
  515. {
  516. template<
  517. class WriteHandler,
  518. class ConstBufferSequence>
  519. void
  520. operator()(
  521. WriteHandler&& h,
  522. boost::shared_ptr<impl_type> const& sp,
  523. bool fin,
  524. ConstBufferSequence const& b)
  525. {
  526. // If you get an error on the following line it means
  527. // that your handler does not meet the documented type
  528. // requirements for the handler.
  529. static_assert(
  530. beast::detail::is_invocable<WriteHandler,
  531. void(error_code, std::size_t)>::value,
  532. "WriteHandler type requirements not met");
  533. write_some_op<
  534. typename std::decay<WriteHandler>::type,
  535. ConstBufferSequence>(
  536. std::forward<WriteHandler>(h),
  537. sp,
  538. fin,
  539. b);
  540. }
  541. };
  542. //------------------------------------------------------------------------------
  543. template<class NextLayer, bool deflateSupported>
  544. template<class ConstBufferSequence>
  545. std::size_t
  546. stream<NextLayer, deflateSupported>::
  547. write_some(bool fin, ConstBufferSequence const& buffers)
  548. {
  549. static_assert(is_sync_stream<next_layer_type>::value,
  550. "SyncStream type requirements not met");
  551. static_assert(net::is_const_buffer_sequence<
  552. ConstBufferSequence>::value,
  553. "ConstBufferSequence type requirements not met");
  554. error_code ec;
  555. auto const bytes_transferred =
  556. write_some(fin, buffers, ec);
  557. if(ec)
  558. BOOST_THROW_EXCEPTION(system_error{ec});
  559. return bytes_transferred;
  560. }
  561. template<class NextLayer, bool deflateSupported>
  562. template<class ConstBufferSequence>
  563. std::size_t
  564. stream<NextLayer, deflateSupported>::
  565. write_some(bool fin,
  566. ConstBufferSequence const& buffers, error_code& ec)
  567. {
  568. static_assert(is_sync_stream<next_layer_type>::value,
  569. "SyncStream type requirements not met");
  570. static_assert(net::is_const_buffer_sequence<
  571. ConstBufferSequence>::value,
  572. "ConstBufferSequence type requirements not met");
  573. using beast::detail::clamp;
  574. auto& impl = *impl_;
  575. std::size_t bytes_transferred = 0;
  576. ec = {};
  577. if(impl.check_stop_now(ec))
  578. return bytes_transferred;
  579. detail::frame_header fh;
  580. if(! impl.wr_cont)
  581. {
  582. impl.begin_msg();
  583. fh.rsv1 = impl.wr_compress;
  584. }
  585. else
  586. {
  587. fh.rsv1 = false;
  588. }
  589. fh.rsv2 = false;
  590. fh.rsv3 = false;
  591. fh.op = impl.wr_cont ?
  592. detail::opcode::cont : impl.wr_opcode;
  593. fh.mask = impl.role == role_type::client;
  594. auto remain = buffer_bytes(buffers);
  595. if(impl.wr_compress)
  596. {
  597. buffers_suffix<
  598. ConstBufferSequence> cb(buffers);
  599. for(;;)
  600. {
  601. auto b = net::buffer(
  602. impl.wr_buf.get(), impl.wr_buf_size);
  603. auto const more = impl.deflate(
  604. b, cb, fin, bytes_transferred, ec);
  605. if(impl.check_stop_now(ec))
  606. return bytes_transferred;
  607. auto const n = buffer_bytes(b);
  608. if(n == 0)
  609. {
  610. // The input was consumed, but there
  611. // is no output due to compression
  612. // latency.
  613. BOOST_ASSERT(! fin);
  614. BOOST_ASSERT(buffer_bytes(cb) == 0);
  615. fh.fin = false;
  616. break;
  617. }
  618. if(fh.mask)
  619. {
  620. fh.key = this->impl_->create_mask();
  621. detail::prepared_key key;
  622. detail::prepare_key(key, fh.key);
  623. detail::mask_inplace(b, key);
  624. }
  625. fh.fin = ! more;
  626. fh.len = n;
  627. detail::fh_buffer fh_buf;
  628. detail::write<
  629. flat_static_buffer_base>(fh_buf, fh);
  630. impl.wr_cont = ! fin;
  631. net::write(impl.stream(),
  632. buffers_cat(fh_buf.data(), b), ec);
  633. if(impl.check_stop_now(ec))
  634. return bytes_transferred;
  635. if(! more)
  636. break;
  637. fh.op = detail::opcode::cont;
  638. fh.rsv1 = false;
  639. }
  640. if(fh.fin)
  641. impl.do_context_takeover_write(impl.role);
  642. }
  643. else if(! fh.mask)
  644. {
  645. if(! impl.wr_frag)
  646. {
  647. // no mask, no autofrag
  648. fh.fin = fin;
  649. fh.len = remain;
  650. detail::fh_buffer fh_buf;
  651. detail::write<
  652. flat_static_buffer_base>(fh_buf, fh);
  653. impl.wr_cont = ! fin;
  654. net::write(impl.stream(),
  655. buffers_cat(fh_buf.data(), buffers), ec);
  656. if(impl.check_stop_now(ec))
  657. return bytes_transferred;
  658. bytes_transferred += remain;
  659. }
  660. else
  661. {
  662. // no mask, autofrag
  663. BOOST_ASSERT(impl.wr_buf_size != 0);
  664. buffers_suffix<
  665. ConstBufferSequence> cb{buffers};
  666. for(;;)
  667. {
  668. auto const n = clamp(remain, impl.wr_buf_size);
  669. remain -= n;
  670. fh.len = n;
  671. fh.fin = fin ? remain == 0 : false;
  672. detail::fh_buffer fh_buf;
  673. detail::write<
  674. flat_static_buffer_base>(fh_buf, fh);
  675. impl.wr_cont = ! fin;
  676. net::write(impl.stream(),
  677. beast::buffers_cat(fh_buf.data(),
  678. beast::buffers_prefix(n, cb)), ec);
  679. bytes_transferred += n;
  680. if(impl.check_stop_now(ec))
  681. return bytes_transferred;
  682. if(remain == 0)
  683. break;
  684. fh.op = detail::opcode::cont;
  685. cb.consume(n);
  686. }
  687. }
  688. }
  689. else if(! impl.wr_frag)
  690. {
  691. // mask, no autofrag
  692. fh.fin = fin;
  693. fh.len = remain;
  694. fh.key = this->impl_->create_mask();
  695. detail::prepared_key key;
  696. detail::prepare_key(key, fh.key);
  697. detail::fh_buffer fh_buf;
  698. detail::write<
  699. flat_static_buffer_base>(fh_buf, fh);
  700. buffers_suffix<
  701. ConstBufferSequence> cb{buffers};
  702. {
  703. auto const n =
  704. clamp(remain, impl.wr_buf_size);
  705. auto const b =
  706. net::buffer(impl.wr_buf.get(), n);
  707. net::buffer_copy(b, cb);
  708. cb.consume(n);
  709. remain -= n;
  710. detail::mask_inplace(b, key);
  711. impl.wr_cont = ! fin;
  712. net::write(impl.stream(),
  713. buffers_cat(fh_buf.data(), b), ec);
  714. bytes_transferred += n;
  715. if(impl.check_stop_now(ec))
  716. return bytes_transferred;
  717. }
  718. while(remain > 0)
  719. {
  720. auto const n =
  721. clamp(remain, impl.wr_buf_size);
  722. auto const b =
  723. net::buffer(impl.wr_buf.get(), n);
  724. net::buffer_copy(b, cb);
  725. cb.consume(n);
  726. remain -= n;
  727. detail::mask_inplace(b, key);
  728. net::write(impl.stream(), b, ec);
  729. bytes_transferred += n;
  730. if(impl.check_stop_now(ec))
  731. return bytes_transferred;
  732. }
  733. }
  734. else
  735. {
  736. // mask, autofrag
  737. BOOST_ASSERT(impl.wr_buf_size != 0);
  738. buffers_suffix<
  739. ConstBufferSequence> cb(buffers);
  740. for(;;)
  741. {
  742. fh.key = this->impl_->create_mask();
  743. detail::prepared_key key;
  744. detail::prepare_key(key, fh.key);
  745. auto const n =
  746. clamp(remain, impl.wr_buf_size);
  747. auto const b =
  748. net::buffer(impl.wr_buf.get(), n);
  749. net::buffer_copy(b, cb);
  750. detail::mask_inplace(b, key);
  751. fh.len = n;
  752. remain -= n;
  753. fh.fin = fin ? remain == 0 : false;
  754. impl.wr_cont = ! fh.fin;
  755. detail::fh_buffer fh_buf;
  756. detail::write<
  757. flat_static_buffer_base>(fh_buf, fh);
  758. net::write(impl.stream(),
  759. buffers_cat(fh_buf.data(), b), ec);
  760. bytes_transferred += n;
  761. if(impl.check_stop_now(ec))
  762. return bytes_transferred;
  763. if(remain == 0)
  764. break;
  765. fh.op = detail::opcode::cont;
  766. cb.consume(n);
  767. }
  768. }
  769. return bytes_transferred;
  770. }
  771. template<class NextLayer, bool deflateSupported>
  772. template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
  773. BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
  774. stream<NextLayer, deflateSupported>::
  775. async_write_some(bool fin,
  776. ConstBufferSequence const& bs, WriteHandler&& handler)
  777. {
  778. static_assert(is_async_stream<next_layer_type>::value,
  779. "AsyncStream type requirements not met");
  780. static_assert(net::is_const_buffer_sequence<
  781. ConstBufferSequence>::value,
  782. "ConstBufferSequence type requirements not met");
  783. return net::async_initiate<
  784. WriteHandler,
  785. void(error_code, std::size_t)>(
  786. run_write_some_op{},
  787. handler,
  788. impl_,
  789. fin,
  790. bs);
  791. }
  792. //------------------------------------------------------------------------------
  793. template<class NextLayer, bool deflateSupported>
  794. template<class ConstBufferSequence>
  795. std::size_t
  796. stream<NextLayer, deflateSupported>::
  797. write(ConstBufferSequence const& buffers)
  798. {
  799. static_assert(is_sync_stream<next_layer_type>::value,
  800. "SyncStream type requirements not met");
  801. static_assert(net::is_const_buffer_sequence<
  802. ConstBufferSequence>::value,
  803. "ConstBufferSequence type requirements not met");
  804. error_code ec;
  805. auto const bytes_transferred = write(buffers, ec);
  806. if(ec)
  807. BOOST_THROW_EXCEPTION(system_error{ec});
  808. return bytes_transferred;
  809. }
  810. template<class NextLayer, bool deflateSupported>
  811. template<class ConstBufferSequence>
  812. std::size_t
  813. stream<NextLayer, deflateSupported>::
  814. write(ConstBufferSequence const& buffers, error_code& ec)
  815. {
  816. static_assert(is_sync_stream<next_layer_type>::value,
  817. "SyncStream type requirements not met");
  818. static_assert(net::is_const_buffer_sequence<
  819. ConstBufferSequence>::value,
  820. "ConstBufferSequence type requirements not met");
  821. return write_some(true, buffers, ec);
  822. }
  823. template<class NextLayer, bool deflateSupported>
  824. template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
  825. BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
  826. stream<NextLayer, deflateSupported>::
  827. async_write(
  828. ConstBufferSequence const& bs, WriteHandler&& handler)
  829. {
  830. static_assert(is_async_stream<next_layer_type>::value,
  831. "AsyncStream type requirements not met");
  832. static_assert(net::is_const_buffer_sequence<
  833. ConstBufferSequence>::value,
  834. "ConstBufferSequence type requirements not met");
  835. return net::async_initiate<
  836. WriteHandler,
  837. void(error_code, std::size_t)>(
  838. run_write_some_op{},
  839. handler,
  840. impl_,
  841. true,
  842. bs);
  843. }
  844. } // websocket
  845. } // beast
  846. } // boost
  847. #endif