async_client.h 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. /////////////////////////////////////////////////////////////////////////////
  2. /// @file async_client.h
  3. /// Declaration of MQTT async_client class
  4. /// @date May 1, 2013
  5. /// @author Frank Pagliughi
  6. /////////////////////////////////////////////////////////////////////////////
  7. /*******************************************************************************
  8. * Copyright (c) 2013-2019 Frank Pagliughi <fpagliughi@mindspring.com>
  9. *
  10. * All rights reserved. This program and the accompanying materials
  11. * are made available under the terms of the Eclipse Public License v1.0
  12. * and Eclipse Distribution License v1.0 which accompany this distribution.
  13. *
  14. * The Eclipse Public License is available at
  15. * http://www.eclipse.org/legal/epl-v10.html
  16. * and the Eclipse Distribution License is available at
  17. * http://www.eclipse.org/org/documents/edl-v10.php.
  18. *
  19. * Contributors:
  20. * Frank Pagliughi - initial implementation and documentation
  21. * Frank Pagliughi - MQTT v5 support
  22. *******************************************************************************/
  23. #ifndef __mqtt_async_client_h
  24. #define __mqtt_async_client_h
  25. #include "MQTTAsync.h"
  26. #include "mqtt/types.h"
  27. #include "mqtt/token.h"
  28. #include "mqtt/string_collection.h"
  29. #include "mqtt/delivery_token.h"
  30. #include "mqtt/iclient_persistence.h"
  31. #include "mqtt/iaction_listener.h"
  32. #include "mqtt/exception.h"
  33. #include "mqtt/message.h"
  34. #include "mqtt/callback.h"
  35. #include "mqtt/thread_queue.h"
  36. #include "mqtt/iasync_client.h"
  37. #include <vector>
  38. #include <list>
  39. #include <memory>
  40. #include <tuple>
  41. #include <functional>
  42. #include <stdexcept>
  43. namespace mqtt {
  44. // OBSOLETE: The legacy constants that lacked the "PAHO_MQTTPP_" prefix
  45. // clashed with #define's from other libraries and will be removed at the
  46. // next major version upgrade.
  47. #if defined(PAHO_MQTTPP_VERSIONS)
  48. /** The version number for the client library. */
  49. const uint32_t PAHO_MQTTPP_VERSION = 0x01010000;
  50. /** The version string for the client library */
  51. const string PAHO_MQTTPP_VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.1");
  52. /** Copyright notice for the client library */
  53. const string PAHO_MQTTPP_COPYRIGHT("Copyright (c) 2013-2019 Frank Pagliughi");
  54. #else
  55. /** The version number for the client library. */
  56. const uint32_t VERSION = 0x01010000;
  57. /** The version string for the client library */
  58. const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.1");
  59. /** Copyright notice for the client library */
  60. const string COPYRIGHT("Copyright (c) 2013-2019 Frank Pagliughi");
  61. #endif
  62. /////////////////////////////////////////////////////////////////////////////
  63. /**
  64. * Lightweight client for talking to an MQTT server using non-blocking
  65. * methods that allow an operation to run in the background.
  66. */
  67. class async_client : public virtual iasync_client
  68. {
  69. public:
  70. /** Smart/shared pointer for an object of this class */
  71. using ptr_t = std::shared_ptr<async_client>;
  72. /** Type for a thread-safe queue to consume messages synchronously */
  73. using consumer_queue_type = std::unique_ptr<thread_queue<const_message_ptr>>;
  74. /** Handler type for registering an individual message callback */
  75. using message_handler = std::function<void(const_message_ptr)>;
  76. /** Handler type for when a connecion is made or lost */
  77. using connection_handler = std::function<void(const string& cause)>;
  78. private:
  79. /** Lock guard type for this class */
  80. using guard = std::unique_lock<std::mutex>;
  81. /** Unique lock type for this class */
  82. using unique_lock = std::unique_lock<std::mutex>;
  83. /** Object monitor mutex */
  84. mutable std::mutex lock_;
  85. /** The underlying C-lib client. */
  86. MQTTAsync cli_;
  87. /** The server URI string. */
  88. string serverURI_;
  89. /** The client ID string that we provided to the server. */
  90. string clientId_;
  91. /** The MQTT protocol version we're connected at */
  92. int mqttVersion_;
  93. /** A user persistence wrapper (if any) */
  94. std::unique_ptr<MQTTClient_persistence> persist_;
  95. /** Callback supplied by the user (if any) */
  96. callback* userCallback_;
  97. /** Connection handler */
  98. connection_handler connHandler_;
  99. /** Connection lost handler */
  100. connection_handler connLostHandler_;
  101. /** Message handler (if any) */
  102. message_handler msgHandler_;
  103. /** Copy of connect token (for re-connects) */
  104. token_ptr connTok_;
  105. /** A list of tokens that are in play */
  106. std::list<token_ptr> pendingTokens_;
  107. /** A list of delivery tokens that are in play */
  108. std::list<delivery_token_ptr> pendingDeliveryTokens_;
  109. /** A queue of messages for consumer API */
  110. consumer_queue_type que_;
  111. /** Callbacks from the C library */
  112. static void on_connected(void* context, char* cause);
  113. static void on_connection_lost(void *context, char *cause);
  114. static int on_message_arrived(void* context, char* topicName, int topicLen,
  115. MQTTAsync_message* msg);
  116. static void on_delivery_complete(void* context, MQTTAsync_token tok);
  117. /** Manage internal list of active tokens */
  118. friend class token;
  119. virtual void add_token(token_ptr tok);
  120. virtual void add_token(delivery_token_ptr tok);
  121. virtual void remove_token(token* tok) override;
  122. virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
  123. void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
  124. /** Non-copyable */
  125. async_client() =delete;
  126. async_client(const async_client&) =delete;
  127. async_client& operator=(const async_client&) =delete;
  128. /** Checks a function return code and throws on error. */
  129. static void check_ret(int rc) {
  130. if (rc != MQTTASYNC_SUCCESS)
  131. throw exception(rc);
  132. }
  133. public:
  134. /**
  135. * Create an async_client that can be used to communicate with an MQTT
  136. * server.
  137. * This uses file-based persistence in the specified directory.
  138. * @param serverURI the address of the server to connect to, specified
  139. * as a URI.
  140. * @param clientId a client identifier that is unique on the server
  141. * being connected to
  142. * @param persistDir The directory to use for persistence data
  143. * @throw exception if an argument is invalid
  144. */
  145. async_client(const string& serverURI, const string& clientId,
  146. const string& persistDir);
  147. /**
  148. * Create an async_client that can be used to communicate with an MQTT
  149. * server.
  150. * This allows the caller to specify a user-defined persistence object,
  151. * or use no persistence.
  152. * @param serverURI the address of the server to connect to, specified
  153. * as a URI.
  154. * @param clientId a client identifier that is unique on the server
  155. * being connected to
  156. * @param persistence The user persistence structure. If this is null,
  157. * then no persistence is used.
  158. * @throw exception if an argument is invalid
  159. */
  160. async_client(const string& serverURI, const string& clientId,
  161. iclient_persistence* persistence=nullptr);
  162. /**
  163. * Create an async_client that can be used to communicate with an MQTT
  164. * server, which allows for off-line message buffering.
  165. * This uses file-based persistence in the specified directory.
  166. * @param serverURI the address of the server to connect to, specified
  167. * as a URI.
  168. * @param clientId a client identifier that is unique on the server
  169. * being connected to
  170. * @param maxBufferedMessages the maximum number of messages allowed to
  171. * be buffered while not connected
  172. * @param persistDir The directory to use for persistence data
  173. * @throw exception if an argument is invalid
  174. */
  175. async_client(const string& serverURI, const string& clientId,
  176. int maxBufferedMessages, const string& persistDir);
  177. /**
  178. * Create an async_client that can be used to communicate with an MQTT
  179. * server, which allows for off-line message buffering.
  180. * This allows the caller to specify a user-defined persistence object,
  181. * or use no persistence.
  182. * @param serverURI the address of the server to connect to, specified
  183. * as a URI.
  184. * @param clientId a client identifier that is unique on the server
  185. * being connected to
  186. * @param maxBufferedMessages the maximum number of messages allowed to
  187. * be buffered while not connected
  188. * @param persistence The user persistence structure. If this is null,
  189. * then no persistence is used.
  190. * @throw exception if an argument is invalid
  191. */
  192. async_client(const string& serverURI, const string& clientId,
  193. int maxBufferedMessages, iclient_persistence* persistence=nullptr);
  194. /**
  195. * Destructor
  196. */
  197. ~async_client() override;
  198. /**
  199. * Sets a callback listener to use for events that happen
  200. * asynchronously.
  201. * @param cb callback receiver which will be invoked for certain
  202. * asynchronous events
  203. */
  204. void set_callback(callback& cb) override;
  205. /**
  206. * Stops callbacks.
  207. * This is not normally called by the application. It should be used
  208. * cautiously as it may cause the application to lose messages.
  209. */
  210. void disable_callbacks() override;
  211. /**
  212. * Callback for when a connection is made.
  213. * @param cb Callback functor for when the connection is made.
  214. */
  215. void set_connected_handler(connection_handler cb) /*override*/;
  216. /**
  217. * Callback for when a connection is lost.
  218. * @param cb Callback functor for when the connection is lost.
  219. */
  220. void set_connection_lost_handler(connection_handler cb) /*override*/;
  221. /**
  222. * Sets the callback for when a message arrives from the broker.
  223. * Note that the application can only have one message handler which can
  224. * be installed individually using this method, or installled as a
  225. * listener object.
  226. * @param cb The callback functor to register with the library.
  227. */
  228. void set_message_callback(message_handler cb) /*override*/;
  229. /**
  230. * Connects to an MQTT server using the default options.
  231. * @return token used to track and wait for the connect to complete. The
  232. * token will be passed to any callback that has been set.
  233. * @throw exception for non security related problems
  234. * @throw security_exception for security related problems
  235. */
  236. token_ptr connect() override;
  237. /**
  238. * Connects to an MQTT server using the provided connect options.
  239. * @param options a set of connection parameters that override the
  240. * defaults.
  241. * @return token used to track and wait for the connect to complete. The
  242. * token will be passed to any callback that has been set.
  243. * @throw exception for non security related problems
  244. * @throw security_exception for security related problems
  245. */
  246. token_ptr connect(connect_options options) override;
  247. /**
  248. * Connects to an MQTT server using the specified options.
  249. * @param options a set of connection parameters that override the
  250. * defaults.
  251. * @param userContext optional object used to pass context to the
  252. * callback. Use @em nullptr if not required.
  253. * @param cb callback listener that will be notified when the connect
  254. * completes.
  255. * @return token used to track and wait for the connect to complete. The
  256. * token will be passed to any callback that has been set.
  257. * @throw exception for non security related problems
  258. * @throw security_exception for security related problems
  259. */
  260. token_ptr connect(connect_options options, void* userContext,
  261. iaction_listener& cb) override;
  262. /**
  263. *
  264. * @param userContext optional object used to pass context to the
  265. * callback. Use @em nullptr if not required.
  266. * @param cb callback listener that will be notified when the connect
  267. * completes.
  268. * @return token used to track and wait for the connect to complete. The
  269. * token will be passed to any callback that has been set.
  270. * @throw exception for non security related problems
  271. * @throw security_exception for security related problems
  272. */
  273. token_ptr connect(void* userContext, iaction_listener& cb) override {
  274. return connect(connect_options{}, userContext, cb);
  275. }
  276. /**
  277. * Reconnects the client using options from the previous connect.
  278. * The client must have previously called connect() for this to work.
  279. * @return token used to track the progress of the reconnect.
  280. */
  281. token_ptr reconnect() override;
  282. /**
  283. * Disconnects from the server.
  284. * @return token used to track and wait for the disconnect to complete.
  285. * The token will be passed to any callback that has been set.
  286. * @throw exception for problems encountered while disconnecting
  287. */
  288. token_ptr disconnect() override { return disconnect(disconnect_options()); }
  289. /**
  290. * Disconnects from the server.
  291. * @param opts Options for disconnecting.
  292. * @return token used to track and wait for the disconnect to complete.
  293. * The token will be passed to any callback that has been set.
  294. * @throw exception for problems encountered while disconnecting
  295. */
  296. token_ptr disconnect(disconnect_options opts) override;
  297. /**
  298. * Disconnects from the server.
  299. * @param timeout the amount of time in milliseconds to allow for
  300. * existing work to finish before disconnecting. A value
  301. * of zero or less means the client will not quiesce.
  302. * @return Token used to track and wait for disconnect to complete. The
  303. * token will be passed to the callback methods if a callback is
  304. * set.
  305. * @throw exception for problems encountered while disconnecting
  306. */
  307. token_ptr disconnect(int timeout) override {
  308. return disconnect(disconnect_options(timeout));
  309. }
  310. /**
  311. * Disconnects from the server.
  312. * @param timeout the amount of time in milliseconds to allow for
  313. * existing work to finish before disconnecting. A value
  314. * of zero or less means the client will not quiesce.
  315. * @return Token used to track and wait for disconnect to complete. The
  316. * token will be passed to the callback methods if a callback is
  317. * set.
  318. * @throw exception for problems encountered while disconnecting
  319. */
  320. template <class Rep, class Period>
  321. token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout) {
  322. // TODO: check range
  323. return disconnect((int) to_milliseconds_count(timeout));
  324. }
  325. /**
  326. * Disconnects from the server.
  327. * @param timeout the amount of time in milliseconds to allow for
  328. * existing work to finish before disconnecting. A value
  329. * of zero or less means the client will not quiesce.
  330. * @param userContext optional object used to pass context to the
  331. * callback. Use @em nullptr if not required.
  332. * @param cb callback listener that will be notified when the disconnect
  333. * completes.
  334. * @return token_ptr Token used to track and wait for disconnect to
  335. * complete. The token will be passed to the callback methods if
  336. * a callback is set.
  337. * @throw exception for problems encountered while disconnecting
  338. */
  339. token_ptr disconnect(int timeout, void* userContext,
  340. iaction_listener& cb) override;
  341. /**
  342. * Disconnects from the server.
  343. * @param timeout the amount of time in milliseconds to allow for
  344. * existing work to finish before disconnecting. A value
  345. * of zero or less means the client will not quiesce.
  346. * @param userContext optional object used to pass context to the
  347. * callback. Use @em nullptr if not required.
  348. * @param cb callback listener that will be notified when the disconnect
  349. * completes.
  350. * @return token_ptr Token used to track and wait for disconnect to
  351. * complete. The token will be passed to the callback methods if
  352. * a callback is set.
  353. * @throw exception for problems encountered while disconnecting
  354. */
  355. template <class Rep, class Period>
  356. token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout,
  357. void* userContext, iaction_listener& cb) {
  358. // TODO: check range
  359. return disconnect((int) to_milliseconds_count(timeout), userContext, cb);
  360. }
  361. /**
  362. * Disconnects from the server.
  363. * @param userContext optional object used to pass context to the
  364. * callback. Use @em nullptr if not required.
  365. * @param cb callback listener that will be notified when the disconnect
  366. * completes.
  367. * @return token_ptr Token used to track and wait for disconnect to
  368. * complete. The token will be passed to the callback methods if
  369. * a callback is set.
  370. * @throw exception for problems encountered while disconnecting
  371. */
  372. token_ptr disconnect(void* userContext, iaction_listener& cb) override {
  373. return disconnect(0L, userContext, cb);
  374. }
  375. /**
  376. * Returns the delivery token for the specified message ID.
  377. * @return delivery_token
  378. */
  379. delivery_token_ptr get_pending_delivery_token(int msgID) const override;
  380. /**
  381. * Returns the delivery tokens for any outstanding publish operations.
  382. * @return delivery_token[]
  383. */
  384. std::vector<delivery_token_ptr> get_pending_delivery_tokens() const override;
  385. /**
  386. * Returns the client ID used by this client.
  387. * @return The client ID used by this client.
  388. */
  389. string get_client_id() const override { return clientId_; }
  390. /**
  391. * Returns the address of the server used by this client.
  392. * @return The server's address, as a URI String.
  393. */
  394. string get_server_uri() const override { return serverURI_; }
  395. /**
  396. * Determines if this client is currently connected to the server.
  397. * @return true if connected, false otherwise.
  398. */
  399. bool is_connected() const override { return to_bool(MQTTAsync_isConnected(cli_)); }
  400. /**
  401. * Publishes a message to a topic on the server
  402. * @param topic The topic to deliver the message to
  403. * @param payload the bytes to use as the message payload
  404. * @param n the number of bytes in the payload
  405. * @param qos the Quality of Service to deliver the message at. Valid
  406. * values are 0, 1 or 2.
  407. * @param retained whether or not this message should be retained by the
  408. * server.
  409. * @return token used to track and wait for the publish to complete. The
  410. * token will be passed to callback methods if set.
  411. */
  412. delivery_token_ptr publish(string_ref topic, const void* payload, size_t n,
  413. int qos, bool retained) override;
  414. /**
  415. * Publishes a message to a topic on the server
  416. * @param topic The topic to deliver the message to
  417. * @param payload the bytes to use as the message payload
  418. * @param n the number of bytes in the payload
  419. * @return token used to track and wait for the publish to complete. The
  420. * token will be passed to callback methods if set.
  421. */
  422. delivery_token_ptr publish(string_ref topic, const void* payload, size_t n) override {
  423. return publish(std::move(topic), payload, n,
  424. message::DFLT_QOS, message::DFLT_RETAINED);
  425. }
  426. /**
  427. * Publishes a message to a topic on the server
  428. * @param topic The topic to deliver the message to
  429. * @param payload the bytes to use as the message payload
  430. * @param qos the Quality of Service to deliver the message at. Valid
  431. * values are 0, 1 or 2.
  432. * @param retained whether or not this message should be retained by the
  433. * server.
  434. * @return token used to track and wait for the publish to complete. The
  435. * token will be passed to callback methods if set.
  436. */
  437. delivery_token_ptr publish(string_ref topic, binary_ref payload,
  438. int qos, bool retained) override;
  439. /**
  440. * Publishes a message to a topic on the server
  441. * @param topic The topic to deliver the message to
  442. * @param payload the bytes to use as the message payload
  443. * @return token used to track and wait for the publish to complete. The
  444. * token will be passed to callback methods if set.
  445. */
  446. delivery_token_ptr publish(string_ref topic, binary_ref payload) override {
  447. return publish(std::move(topic), std::move(payload),
  448. message::DFLT_QOS, message::DFLT_RETAINED);
  449. }
  450. /**
  451. * Publishes a message to a topic on the server
  452. * @param topic The topic to deliver the message to
  453. * @param payload the bytes to use as the message payload
  454. * @param n the number of bytes in the payload
  455. * @param qos the Quality of Service to deliver the message at. Valid
  456. * values are 0, 1 or 2.
  457. * @param retained whether or not this message should be retained by the
  458. * server.
  459. * @param userContext optional object used to pass context to the
  460. * callback. Use @em nullptr if not required.
  461. * @param cb
  462. * @return token used to track and wait for the publish to complete. The
  463. * token will be passed to callback methods if set.
  464. */
  465. delivery_token_ptr publish(string_ref topic,
  466. const void* payload, size_t n,
  467. int qos, bool retained,
  468. void* userContext, iaction_listener& cb) override;
  469. /**
  470. * Publishes a message to a topic on the server Takes an Message
  471. * message and delivers it to the server at the requested quality of
  472. * service.
  473. * @param msg the message to deliver to the server
  474. * @return token used to track and wait for the publish to complete. The
  475. * token will be passed to callback methods if set.
  476. */
  477. delivery_token_ptr publish(const_message_ptr msg) override;
  478. /**
  479. * Publishes a message to a topic on the server.
  480. * @param msg the message to deliver to the server
  481. * @param userContext optional object used to pass context to the
  482. * callback. Use @em nullptr if not required.
  483. * @param cb callback optional listener that will be notified when message
  484. * delivery has completed to the requested quality of
  485. * service
  486. * @return token used to track and wait for the publish to complete. The
  487. * token will be passed to callback methods if set.
  488. */
  489. delivery_token_ptr publish(const_message_ptr msg,
  490. void* userContext, iaction_listener& cb) override;
  491. /**
  492. * Subscribe to a topic, which may include wildcards.
  493. * @param topicFilter the topic to subscribe to, which can include
  494. * wildcards.
  495. * @param qos
  496. *
  497. * @return token used to track and wait for the subscribe to complete.
  498. * The token will be passed to callback methods if set.
  499. */
  500. token_ptr subscribe(const string& topicFilter, int qos,
  501. const subscribe_options& opts=subscribe_options()) override;
  502. /**
  503. * Subscribe to a topic, which may include wildcards.
  504. * @param topicFilter the topic to subscribe to, which can include
  505. * wildcards.
  506. * @param qos the maximum quality of service at which to subscribe.
  507. * Messages published at a lower quality of service will be
  508. * received at the published QoS. Messages published at a
  509. * higher quality of service will be received using the QoS
  510. * specified on the subscribe.
  511. * @param userContext optional object used to pass context to the
  512. * callback. Use @em nullptr if not required.
  513. * @param cb listener that will be notified when subscribe has completed
  514. * @return token used to track and wait for the subscribe to complete.
  515. * The token will be passed to callback methods if set.
  516. */
  517. token_ptr subscribe(const string& topicFilter, int qos,
  518. void* userContext, iaction_listener& cb,
  519. const subscribe_options& opts=subscribe_options()) override;
  520. /**
  521. * Subscribe to multiple topics, each of which may include wildcards.
  522. * @param topicFilters
  523. * @param qos the maximum quality of service at which to subscribe.
  524. * Messages published at a lower quality of service will be
  525. * received at the published QoS. Messages published at a
  526. * higher quality of service will be received using the QoS
  527. * specified on the subscribe.
  528. * @return token used to track and wait for the subscribe to complete.
  529. * The token will be passed to callback methods if set.
  530. */
  531. token_ptr subscribe(const_string_collection_ptr topicFilters,
  532. const qos_collection& qos,
  533. const std::vector<subscribe_options>& opts=std::vector<subscribe_options>()) override;
  534. /**
  535. * Subscribes to multiple topics, each of which may include wildcards.
  536. * @param topicFilters
  537. * @param qos the maximum quality of service at which to subscribe.
  538. * Messages published at a lower quality of service will be
  539. * received at the published QoS. Messages published at a
  540. * higher quality of service will be received using the QoS
  541. * specified on the subscribe.
  542. * @param userContext optional object used to pass context to the
  543. * callback. Use @em nullptr if not required.
  544. * @param cb listener that will be notified when subscribe has completed
  545. * @return token used to track and wait for the subscribe to complete.
  546. * The token will be passed to callback methods if set.
  547. */
  548. token_ptr subscribe(const_string_collection_ptr topicFilters,
  549. const qos_collection& qos,
  550. void* userContext, iaction_listener& cb,
  551. const std::vector<subscribe_options>& opts=std::vector<subscribe_options>()) override;
  552. /**
  553. * Requests the server unsubscribe the client from a topic.
  554. * @param topicFilter the topic to unsubscribe from. It must match a
  555. * topicFilter specified on an earlier subscribe.
  556. * @return token used to track and wait for the unsubscribe to complete.
  557. * The token will be passed to callback methods if set.
  558. */
  559. token_ptr unsubscribe(const string& topicFilter) override;
  560. /**
  561. * Requests the server unsubscribe the client from one or more topics.
  562. * @param topicFilters one or more topics to unsubscribe from. Each
  563. * topicFilter must match one specified on an
  564. * earlier subscribe.
  565. * @return token used to track and wait for the unsubscribe to complete.
  566. * The token will be passed to callback methods if set.
  567. */
  568. token_ptr unsubscribe(const_string_collection_ptr topicFilters) override;
  569. /**
  570. * Requests the server unsubscribe the client from one or more topics.
  571. * @param topicFilters
  572. * @param userContext optional object used to pass context to the
  573. * callback. Use @em nullptr if not required.
  574. * @param cb listener that will be notified when unsubscribe has
  575. * completed
  576. * @return token used to track and wait for the unsubscribe to complete.
  577. * The token will be passed to callback methods if set.
  578. */
  579. token_ptr unsubscribe(const_string_collection_ptr topicFilters,
  580. void* userContext, iaction_listener& cb) override;
  581. /**
  582. * Requests the server unsubscribe the client from a topics.
  583. * @param topicFilter the topic to unsubscribe from. It must match a
  584. * topicFilter specified on an earlier subscribe.
  585. * @param userContext optional object used to pass context to the
  586. * callback. Use @em nullptr if not required.
  587. * @param cb listener that will be notified when unsubscribe has
  588. * completed
  589. * @return token used to track and wait for the unsubscribe to complete.
  590. * The token will be passed to callback methods if set.
  591. */
  592. token_ptr unsubscribe(const string& topicFilter,
  593. void* userContext, iaction_listener& cb) override;
  594. /**
  595. * Start consuming messages.
  596. * This initializes the client to receive messages through a queue that
  597. * can be read synchronously.
  598. */
  599. void start_consuming();
  600. /**
  601. * Stop consuming messages.
  602. * This shuts down the internal callback and discards any unread
  603. * messages.
  604. */
  605. void stop_consuming();
  606. /**
  607. * Read the next message from the queue.
  608. * This blocks until a new message arrives.
  609. * @return The message and topic.
  610. */
  611. const_message_ptr consume_message() { return que_->get(); }
  612. /**
  613. * Try to read the next message from the queue without blocking.
  614. * @param msg Pointer to the value to receive the message
  615. * @return @em true is a message was read, @em false if no message was
  616. * available.
  617. */
  618. bool try_consume_message(const_message_ptr* msg) {
  619. return que_->try_get(msg);
  620. }
  621. /**
  622. * Waits a limited time for a message to arrive.
  623. * @param msg Pointer to the value to receive the message
  624. * @param relTime The maximum amount of time to wait for a message.
  625. * @return @em true if a message was read, @em false if a timeout
  626. * occurred.
  627. */
  628. template <typename Rep, class Period>
  629. bool try_consume_message_for(const_message_ptr* msg,
  630. const std::chrono::duration<Rep, Period>& relTime) {
  631. return que_->try_get_for(msg, relTime);
  632. }
  633. /**
  634. * Waits a limited time for a message to arrive.
  635. * @param relTime The maximum amount of time to wait for a message.
  636. * @return A shared pointer to the message that was received. It will be
  637. * empty on timeout.
  638. */
  639. template <typename Rep, class Period>
  640. const_message_ptr try_consume_message_for(const std::chrono::duration<Rep, Period>& relTime) {
  641. const_message_ptr msg;
  642. que_->try_get_for(&msg, relTime);
  643. return msg;
  644. }
  645. /**
  646. * Waits until a specific time for a message to appear.
  647. * @param msg Pointer to the value to receive the message
  648. * @param absTime The time point to wait until, before timing out.
  649. * @return @em true if a message was read, @em false if a timeout
  650. * occurred.
  651. */
  652. template <class Clock, class Duration>
  653. bool try_consume_message_until(const_message_ptr* msg,
  654. const std::chrono::time_point<Clock,Duration>& absTime) {
  655. return que_->try_get_until(msg, absTime);
  656. }
  657. /**
  658. * Waits until a specific time for a message to appear.
  659. * @param msg Pointer to the value to receive the message
  660. * @param absTime The time point to wait until, before timing out.
  661. * @return @em true if a message was read, @em false if a timeout
  662. * occurred.
  663. */
  664. template <class Clock, class Duration>
  665. const_message_ptr try_consume_message_until(const std::chrono::time_point<Clock,Duration>& absTime) {
  666. const_message_ptr msg;
  667. que_->try_get_until(msg, absTime);
  668. return msg;
  669. }
  670. };
  671. /** Smart/shared pointer to an asynchronous MQTT client object */
  672. using async_client_ptr = async_client::ptr_t;
  673. /////////////////////////////////////////////////////////////////////////////
  674. // end namespace mqtt
  675. }
  676. #endif // __mqtt_async_client_h