async_subscribe.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. // async_subscribe.cpp
  2. //
  3. // This is a Paho MQTT C++ client, sample application.
  4. //
  5. // This application is an MQTT subscriber using the C++ asynchronous client
  6. // interface, employing callbacks to receive messages and status updates.
  7. //
  8. // The sample demonstrates:
  9. // - Connecting to an MQTT server/broker.
  10. // - Subscribing to a topic
  11. // - Receiving messages through the callback API
  12. // - Receiving network disconnect updates and attempting manual reconnects.
  13. // - Using a "clean session" and manually re-subscribing to topics on
  14. // reconnect.
  15. //
  16. /*******************************************************************************
  17. * Copyright (c) 2013-2017 Frank Pagliughi <fpagliughi@mindspring.com>
  18. *
  19. * All rights reserved. This program and the accompanying materials
  20. * are made available under the terms of the Eclipse Public License v1.0
  21. * and Eclipse Distribution License v1.0 which accompany this distribution.
  22. *
  23. * The Eclipse Public License is available at
  24. * http://www.eclipse.org/legal/epl-v10.html
  25. * and the Eclipse Distribution License is available at
  26. * http://www.eclipse.org/org/documents/edl-v10.php.
  27. *
  28. * Contributors:
  29. * Frank Pagliughi - initial implementation and documentation
  30. *******************************************************************************/
  31. #include <iostream>
  32. #include <cstdlib>
  33. #include <string>
  34. #include <cstring>
  35. #include <cctype>
  36. #include <thread>
  37. #include <chrono>
  38. #include "mqtt/async_client.h"
  39. const std::string SERVER_ADDRESS("tcp://localhost:1883");
  40. const std::string CLIENT_ID("async_subcribe_cpp");
  41. const std::string TOPIC("hello");
  42. const int QOS = 1;
  43. const int N_RETRY_ATTEMPTS = 5;
  44. /////////////////////////////////////////////////////////////////////////////
  45. // Callbacks for the success or failures of requested actions.
  46. // This could be used to initiate further action, but here we just log the
  47. // results to the console.
  48. class action_listener : public virtual mqtt::iaction_listener
  49. {
  50. std::string name_;
  51. void on_failure(const mqtt::token& tok) override {
  52. std::cout << name_ << " failure";
  53. if (tok.get_message_id() != 0)
  54. std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
  55. std::cout << std::endl;
  56. }
  57. void on_success(const mqtt::token& tok) override {
  58. std::cout << name_ << " success";
  59. if (tok.get_message_id() != 0)
  60. std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
  61. auto top = tok.get_topics();
  62. if (top && !top->empty())
  63. std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
  64. std::cout << std::endl;
  65. }
  66. public:
  67. action_listener(const std::string& name) : name_(name) {}
  68. };
  69. /////////////////////////////////////////////////////////////////////////////
  70. /**
  71. * Local callback & listener class for use with the client connection.
  72. * This is primarily intended to receive messages, but it will also monitor
  73. * the connection to the broker. If the connection is lost, it will attempt
  74. * to restore the connection and re-subscribe to the topic.
  75. */
  76. class callback : public virtual mqtt::callback,
  77. public virtual mqtt::iaction_listener
  78. {
  79. // Counter for the number of connection retries
  80. int nretry_;
  81. // The MQTT client
  82. mqtt::async_client& cli_;
  83. // Options to use if we need to reconnect
  84. mqtt::connect_options& connOpts_;
  85. // An action listener to display the result of actions.
  86. action_listener subListener_;
  87. // This deomonstrates manually reconnecting to the broker by calling
  88. // connect() again. This is a possibility for an application that keeps
  89. // a copy of it's original connect_options, or if the app wants to
  90. // reconnect with different options.
  91. // Another way this can be done manually, if using the same options, is
  92. // to just call the async_client::reconnect() method.
  93. void reconnect() {
  94. std::this_thread::sleep_for(std::chrono::milliseconds(2500));
  95. try {
  96. cli_.connect(connOpts_, nullptr, *this);
  97. }
  98. catch (const mqtt::exception& exc) {
  99. std::cerr << "Error: " << exc.what() << std::endl;
  100. exit(1);
  101. }
  102. }
  103. // Re-connection failure
  104. void on_failure(const mqtt::token& tok) override {
  105. std::cout << "Connection attempt failed" << std::endl;
  106. if (++nretry_ > N_RETRY_ATTEMPTS)
  107. exit(1);
  108. reconnect();
  109. }
  110. // (Re)connection success
  111. // Either this or connected() can be used for callbacks.
  112. void on_success(const mqtt::token& tok) override {}
  113. // (Re)connection success
  114. void connected(const std::string& cause) override {
  115. std::cout << "\nConnection success" << std::endl;
  116. std::cout << "\nSubscribing to topic '" << TOPIC << "'\n"
  117. << "\tfor client " << CLIENT_ID
  118. << " using QoS" << QOS << "\n"
  119. << "\nPress Q<Enter> to quit\n" << std::endl;
  120. cli_.subscribe(TOPIC, QOS, nullptr, subListener_);
  121. }
  122. // Callback for when the connection is lost.
  123. // This will initiate the attempt to manually reconnect.
  124. void connection_lost(const std::string& cause) override {
  125. std::cout << "\nConnection lost" << std::endl;
  126. if (!cause.empty())
  127. std::cout << "\tcause: " << cause << std::endl;
  128. std::cout << "Reconnecting..." << std::endl;
  129. nretry_ = 0;
  130. reconnect();
  131. }
  132. // Callback for when a message arrives.
  133. void message_arrived(mqtt::const_message_ptr msg) override {
  134. std::cout << "Message arrived" << std::endl;
  135. std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;
  136. std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl;
  137. }
  138. void delivery_complete(mqtt::delivery_token_ptr token) override {}
  139. public:
  140. callback(mqtt::async_client& cli, mqtt::connect_options& connOpts)
  141. : nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription") {}
  142. };
  143. /////////////////////////////////////////////////////////////////////////////
  144. int main(int argc, char* argv[])
  145. {
  146. mqtt::connect_options connOpts;
  147. connOpts.set_keep_alive_interval(20);
  148. connOpts.set_clean_session(true);
  149. mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID);
  150. callback cb(client, connOpts);
  151. client.set_callback(cb);
  152. // Start the connection.
  153. // When completed, the callback will subscribe to topic.
  154. try {
  155. std::cout << "Connecting to the MQTT server..." << std::flush;
  156. client.connect(connOpts, nullptr, cb);
  157. }
  158. catch (const mqtt::exception&) {
  159. std::cerr << "\nERROR: Unable to connect to MQTT server: '"
  160. << SERVER_ADDRESS << "'" << std::endl;
  161. return 1;
  162. }
  163. // Just block till user tells us to quit.
  164. while (std::tolower(std::cin.get()) != 'q')
  165. ;
  166. // Disconnect
  167. try {
  168. std::cout << "\nDisconnecting from the MQTT server..." << std::flush;
  169. client.disconnect()->wait();
  170. std::cout << "OK" << std::endl;
  171. }
  172. catch (const mqtt::exception& exc) {
  173. std::cerr << exc.what() << std::endl;
  174. return 1;
  175. }
  176. return 0;
  177. }