client.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. /////////////////////////////////////////////////////////////////////////////
  2. /// @file client.h
  3. /// Declaration of MQTT client class
  4. /// @date May 1, 2013
  5. /// @author Frank Pagliughi
  6. /////////////////////////////////////////////////////////////////////////////
  7. /*******************************************************************************
  8. * Copyright (c) 2013-2017 Frank Pagliughi <fpagliughi@mindspring.com>
  9. *
  10. * All rights reserved. This program and the accompanying materials
  11. * are made available under the terms of the Eclipse Public License v1.0
  12. * and Eclipse Distribution License v1.0 which accompany this distribution.
  13. *
  14. * The Eclipse Public License is available at
  15. * http://www.eclipse.org/legal/epl-v10.html
  16. * and the Eclipse Distribution License is available at
  17. * http://www.eclipse.org/org/documents/edl-v10.php.
  18. *
  19. * Contributors:
  20. * Frank Pagliughi - initial implementation and documentation
  21. *******************************************************************************/
  22. #ifndef __mqtt_client_h
  23. #define __mqtt_client_h
  24. #include "mqtt/async_client.h"
  25. #include <future>
  26. namespace mqtt {
  27. /////////////////////////////////////////////////////////////////////////////
  28. /**
  29. * Lightweight client for talking to an MQTT server using methods that block
  30. * until an operation completes.
  31. */
  32. class client : private callback
  33. {
  34. /** An arbitrary, but relatively long timeout */
  35. static const std::chrono::minutes DFLT_TIMEOUT;
  36. /** The default quality of service */
  37. static constexpr int DFLT_QOS = 1;
  38. /** The actual client */
  39. async_client cli_;
  40. /** The longest time to wait for an operation to complete. */
  41. std::chrono::milliseconds timeout_;
  42. /** Callback supplied by the user (if any) */
  43. callback* userCallback_;
  44. /**
  45. * Creates a shared pointer to an existing non-heap object.
  46. * The shared pointer is given a no-op deleter, so it will not try to
  47. * destroy the object when it goes out of scope. It is up to the caller
  48. * to ensure that the object remains in memory for as long as there may
  49. * be pointers to it.
  50. * @param val A value which may live anywhere in memory (stack,
  51. * file-scope, etc).
  52. * @return A shared pointer to the object.
  53. */
  54. template <typename T>
  55. std::shared_ptr<T> ptr(const T& val) {
  56. return std::shared_ptr<T>(const_cast<T*>(&val), [](T*){});
  57. }
  58. // User callbacks
  59. // Most are launched in a separate thread, for convenience, except
  60. // message_arrived, for performance.
  61. void connected(const string& cause) override {
  62. std::async(std::launch::async, &callback::connected, userCallback_, cause);
  63. }
  64. void connection_lost(const string& cause) override {
  65. std::async(std::launch::async,
  66. &callback::connection_lost, userCallback_, cause);
  67. }
  68. void message_arrived(const_message_ptr msg) override {
  69. userCallback_->message_arrived(msg);
  70. }
  71. void delivery_complete(delivery_token_ptr tok) override {
  72. std::async(std::launch::async, &callback::delivery_complete, userCallback_, tok);
  73. }
  74. /** Non-copyable */
  75. client() =delete;
  76. client(const async_client&) =delete;
  77. client& operator=(const async_client&) =delete;
  78. public:
  79. /** Smart pointer type for this object */
  80. using ptr_t = std::shared_ptr<client>;
  81. /** Type for a collection of QOS values */
  82. using qos_collection = async_client::qos_collection;
  83. /**
  84. * Create a client that can be used to communicate with an MQTT server.
  85. * This allows the caller to specify a user-defined persistence object,
  86. * or use no persistence.
  87. * @param serverURI the address of the server to connect to, specified
  88. * as a URI.
  89. * @param clientId a client identifier that is unique on the server
  90. * being connected to
  91. * @param persistence The user persistence structure. If this is null,
  92. * then no persistence is used.
  93. */
  94. client(const string& serverURI, const string& clientId,
  95. iclient_persistence* persistence=nullptr);
  96. /**
  97. * Create an async_client that can be used to communicate with an MQTT
  98. * server.
  99. * This uses file-based persistence in the specified directory.
  100. * @param serverURI the address of the server to connect to, specified
  101. * as a URI.
  102. * @param clientId a client identifier that is unique on the server
  103. * being connected to
  104. * @param persistDir The directory to use for persistence data
  105. */
  106. client(const string& serverURI, const string& clientId,
  107. const string& persistDir);
  108. /**
  109. * Create a client that can be used to communicate with an MQTT server,
  110. * which allows for off-line message buffering.
  111. * This allows the caller to specify a user-defined persistence object,
  112. * or use no persistence.
  113. * @param serverURI the address of the server to connect to, specified
  114. * as a URI.
  115. * @param clientId a client identifier that is unique on the server
  116. * being connected to
  117. * @param maxBufferedMessages the maximum number of messages allowed to
  118. * be buffered while not connected
  119. * @param persistence The user persistence structure. If this is null,
  120. * then no persistence is used.
  121. */
  122. client(const string& serverURI, const string& clientId,
  123. int maxBufferedMessages, iclient_persistence* persistence=nullptr);
  124. /**
  125. * Create a client that can be used to communicate with an MQTT server,
  126. * which allows for off-line message buffering.
  127. * This uses file-based persistence in the specified directory.
  128. * @param serverURI the address of the server to connect to, specified
  129. * as a URI.
  130. * @param clientId a client identifier that is unique on the server
  131. * being connected to
  132. * @param maxBufferedMessages the maximum number of messages allowed to
  133. * be buffered while not connected
  134. * @param persistDir The directory to use for persistence data
  135. */
  136. client(const string& serverURI, const string& clientId,
  137. int maxBufferedMessages, const string& persistDir);
  138. /**
  139. * Virtual destructor
  140. */
  141. virtual ~client() {}
  142. /**
  143. * Connects to an MQTT server using the default options.
  144. */
  145. virtual connect_response connect();
  146. /**
  147. * Connects to an MQTT server using the specified options.
  148. * @param opts
  149. */
  150. virtual connect_response connect(connect_options opts);
  151. /**
  152. * Reconnects the client using options from the previous connect.
  153. * The client must have previously called connect() for this to work.
  154. */
  155. virtual connect_response reconnect();
  156. /**
  157. * Disconnects from the server.
  158. */
  159. virtual void disconnect();
  160. /**
  161. * Disconnects from the server.
  162. * @param timeoutMS the amount of time in milliseconds to allow for
  163. * existing work to finish before disconnecting. A value
  164. * of zero or less means the client will not quiesce.
  165. */
  166. virtual void disconnect(int timeoutMS) {
  167. cli_.stop_consuming();
  168. cli_.disconnect(timeoutMS)->wait_for(timeout_);
  169. }
  170. /**
  171. * Disconnects from the server.
  172. * @param to the amount of time in milliseconds to allow for
  173. * existing work to finish before disconnecting. A value
  174. * of zero or less means the client will not quiesce.
  175. */
  176. template <class Rep, class Period>
  177. void disconnect(const std::chrono::duration<Rep, Period>& to) {
  178. disconnect((int) to_milliseconds_count(to));
  179. }
  180. /**
  181. * Gets the client ID used by this client.
  182. * @return The client ID used by this client.
  183. */
  184. virtual string get_client_id() const { return cli_.get_client_id(); }
  185. /**
  186. * Gets the address of the server used by this client.
  187. * @return The address of the server used by this client, as a URI.
  188. */
  189. virtual string get_server_uri() const { return cli_.get_server_uri(); }
  190. /**
  191. * Return the maximum time to wait for an action to complete.
  192. * @return int
  193. */
  194. virtual std::chrono::milliseconds get_timeout() const { return timeout_; }
  195. /**
  196. * Get a topic object which can be used to publish messages on this
  197. * client.
  198. * @param top The topic name
  199. * @return A topic attached to this client.
  200. */
  201. virtual topic get_topic(const string& top) { return topic(cli_, top); }
  202. /**
  203. * Determines if this client is currently connected to the server.
  204. * @return @em true if the client is currently connected, @em false if
  205. * not.
  206. */
  207. virtual bool is_connected() const { return cli_.is_connected(); }
  208. /**
  209. * Publishes a message to a topic on the server and return once it is
  210. * delivered.
  211. * @param top The topic to publish
  212. * @param payload The data to publish
  213. * @param n The size in bytes of the data
  214. * @param qos The QoS for message delivery
  215. * @param retained Whether the broker should retain the message
  216. */
  217. virtual void publish(string_ref top, const void* payload, size_t n,
  218. int qos, bool retained) {
  219. cli_.publish(std::move(top), payload, n, qos, retained)->wait_for(timeout_);
  220. }
  221. /**
  222. * Publishes a message to a topic on the server and return once it is
  223. * delivered.
  224. * @param top The topic to publish
  225. * @param payload The data to publish
  226. * @param n The size in bytes of the data
  227. */
  228. virtual void publish(string_ref top, const void* payload, size_t n) {
  229. cli_.publish(std::move(top), payload, n)->wait_for(timeout_);
  230. }
  231. /**
  232. * Publishes a message to a topic on the server.
  233. * @param msg The message
  234. */
  235. virtual void publish(const_message_ptr msg) {
  236. cli_.publish(msg)->wait_for(timeout_);
  237. }
  238. /**
  239. * Publishes a message to a topic on the server.
  240. * This version will not timeout since that could leave the library with
  241. * a reference to memory that could disappear while the library is still
  242. * using it.
  243. * @param msg The message
  244. */
  245. virtual void publish(const message& msg) {
  246. cli_.publish(ptr(msg))->wait();
  247. }
  248. /**
  249. * Sets the callback listener to use for events that happen
  250. * asynchronously.
  251. * @param cb The callback functions
  252. */
  253. virtual void set_callback(callback& cb);
  254. /**
  255. * Set the maximum time to wait for an action to complete.
  256. * @param timeoutMS The timeout in milliseconds
  257. */
  258. virtual void set_timeout(int timeoutMS) {
  259. timeout_ = std::chrono::milliseconds(timeoutMS);
  260. }
  261. /**
  262. * Set the maximum time to wait for an action to complete.
  263. * @param to The timeout as a std::chrono duration.
  264. */
  265. template <class Rep, class Period>
  266. void set_timeout(const std::chrono::duration<Rep, Period>& to) {
  267. timeout_ = to_milliseconds(to);
  268. }
  269. /**
  270. * Subscribe to a topic, which may include wildcards using a QoS of 1.
  271. * @param topicFilter
  272. */
  273. virtual subscribe_response subscribe(const string& topicFilter);
  274. /**
  275. * Subscribe to a topic, which may include wildcards.
  276. * @param topicFilter A single topic to subscribe
  277. * @param qos The QoS of the subscription
  278. */
  279. virtual subscribe_response subscribe(const string& topicFilter, int qos);
  280. /**
  281. * Subscribes to a one or more topics, which may include wildcards using
  282. * a QoS of 1.
  283. * @param topicFilters A set of topics to subscribe
  284. */
  285. virtual subscribe_response subscribe(const string_collection& topicFilters);
  286. /**
  287. * Subscribes to multiple topics, each of which may include wildcards.
  288. * @param topicFilters A collection of topics to subscribe
  289. * @param qos A collection of QoS for each topic
  290. */
  291. virtual subscribe_response subscribe(const string_collection& topicFilters,
  292. const qos_collection& qos);
  293. /**
  294. * Requests the server unsubscribe the client from a topic.
  295. * @param topicFilter A single topic to unsubscribe.
  296. */
  297. virtual unsubscribe_response unsubscribe(const string& topicFilter);
  298. /**
  299. * Requests the server unsubscribe the client from one or more topics.
  300. * @param topicFilters A collection of topics to unsubscribe.
  301. */
  302. virtual unsubscribe_response unsubscribe(const string_collection& topicFilters);
  303. /**
  304. * Start consuming messages.
  305. * This initializes the client to receive messages through a queue that
  306. * can be read synchronously.
  307. */
  308. void start_consuming() { cli_.start_consuming(); }
  309. /**
  310. * Stop consuming messages.
  311. * This shuts down the internal callback and discards any unread
  312. * messages.
  313. */
  314. void stop_consuming() { cli_.stop_consuming(); }
  315. /**
  316. * Read the next message from the queue.
  317. * This blocks until a new message arrives.
  318. * @return The message and topic.
  319. */
  320. const_message_ptr consume_message() { return cli_.consume_message(); }
  321. /**
  322. * Try to read the next message from the queue without blocking.
  323. * @param msg Pointer to the value to receive the message
  324. * @return @em true is a message was read, @em false if no message was
  325. * available.
  326. */
  327. bool try_consume_message(const_message_ptr* msg) {
  328. return cli_.try_consume_message(msg);
  329. }
  330. /**
  331. * Waits a limited time for a message to arrive.
  332. * @param msg Pointer to the value to receive the message
  333. * @param relTime The maximum amount of time to wait for a message.
  334. * @return @em true if a message was read, @em false if a timeout
  335. * occurred.
  336. */
  337. template <typename Rep, class Period>
  338. bool try_consume_message_for(const_message_ptr* msg,
  339. const std::chrono::duration<Rep, Period>& relTime) {
  340. return cli_.try_consume_message_for(msg, relTime);
  341. }
  342. /**
  343. * Waits until a specific time for a message to occur.
  344. * @param msg Pointer to the value to receive the message
  345. * @param absTime The time point to wait until, before timing out.
  346. * @return @em true if a message was read, @em false if a timeout
  347. * occurred.
  348. */
  349. template <class Clock, class Duration>
  350. bool try_consume_message_until(const_message_ptr* msg,
  351. const std::chrono::time_point<Clock,Duration>& absTime) {
  352. return cli_.try_consume_message_until(msg, absTime);
  353. }
  354. };
  355. /** Smart/shared pointer to an MQTT synchronous client object */
  356. using client_ptr = client::ptr_t;
  357. /////////////////////////////////////////////////////////////////////////////
  358. // end namespace mqtt
  359. }
  360. #endif // __mqtt_client_h