async_publish.cpp 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. // async_publish.cpp
  2. //
  3. // This is a Paho MQTT C++ client, sample application.
  4. //
  5. // It's an example of how to send messages as an MQTT publisher using the
  6. // C++ asynchronous client interface.
  7. //
  8. // The sample demonstrates:
  9. // - Connecting to an MQTT server/broker
  10. // - Publishing messages
  11. // - Last will and testament
  12. // - Using asynchronous tokens
  13. // - Implementing callbacks and action listeners
  14. //
  15. /*******************************************************************************
  16. * Copyright (c) 2013-2017 Frank Pagliughi <fpagliughi@mindspring.com>
  17. *
  18. * All rights reserved. This program and the accompanying materials
  19. * are made available under the terms of the Eclipse Public License v1.0
  20. * and Eclipse Distribution License v1.0 which accompany this distribution.
  21. *
  22. * The Eclipse Public License is available at
  23. * http://www.eclipse.org/legal/epl-v10.html
  24. * and the Eclipse Distribution License is available at
  25. * http://www.eclipse.org/org/documents/edl-v10.php.
  26. *
  27. * Contributors:
  28. * Frank Pagliughi - initial implementation and documentation
  29. *******************************************************************************/
  30. #include <iostream>
  31. #include <cstdlib>
  32. #include <string>
  33. #include <thread> // For sleep
  34. #include <atomic>
  35. #include <chrono>
  36. #include <cstring>
  37. #include "mqtt/async_client.h"
  38. using namespace std;
  39. const std::string DFLT_SERVER_ADDRESS { "tcp://test.mosquitto.org:1883" };
  40. const std::string DFLT_CLIENT_ID { "async_publish" };
  41. const string TOPIC { "baiyu" };
  42. const char* PAYLOAD1 = "Hello World!";
  43. const char* PAYLOAD2 = "Hi there!";
  44. const char* PAYLOAD3 = "Is anyone listening?";
  45. const char* PAYLOAD4 = "Someone is always listening.";
  46. const char* LWT_PAYLOAD = "Last will and testament.";
  47. const int QOS = 1;
  48. const auto TIMEOUT = std::chrono::seconds(10);
  49. /////////////////////////////////////////////////////////////////////////////
  50. /**
  51. * A callback class for use with the main MQTT client.
  52. */
  53. class callback : public virtual mqtt::callback
  54. {
  55. public:
  56. void connection_lost(const string& cause) override {
  57. cout << "\nConnection lost" << endl;
  58. if (!cause.empty())
  59. cout << "\tcause: " << cause << endl;
  60. }
  61. void delivery_complete(mqtt::delivery_token_ptr tok) override {
  62. cout << "\tDelivery complete for token: "
  63. << (tok ? tok->get_message_id() : -1) << endl;
  64. }
  65. };
  66. /////////////////////////////////////////////////////////////////////////////
  67. /**
  68. * A base action listener.
  69. */
  70. class action_listener : public virtual mqtt::iaction_listener
  71. {
  72. protected:
  73. void on_failure(const mqtt::token& tok) override {
  74. cout << "\tListener failure for token: "
  75. << tok.get_message_id() << endl;
  76. }
  77. void on_success(const mqtt::token& tok) override {
  78. cout << "\tListener success for token: "
  79. << tok.get_message_id() << endl;
  80. }
  81. };
  82. /////////////////////////////////////////////////////////////////////////////
  83. /**
  84. * A derived action listener for publish events.
  85. */
  86. class delivery_action_listener : public action_listener
  87. {
  88. atomic<bool> done_;
  89. void on_failure(const mqtt::token& tok) override {
  90. action_listener::on_failure(tok);
  91. done_ = true;
  92. }
  93. void on_success(const mqtt::token& tok) override {
  94. action_listener::on_success(tok);
  95. done_ = true;
  96. }
  97. public:
  98. delivery_action_listener() : done_(false) {}
  99. bool is_done() const { return done_; }
  100. };
  101. /////////////////////////////////////////////////////////////////////////////
  102. int main(int argc, char* argv[])
  103. {
  104. string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS,
  105. clientID = (argc > 2) ? string(argv[2]) : DFLT_CLIENT_ID;
  106. cout << "Initializing for server '" << address << "'..." << endl;
  107. mqtt::async_client client(address, clientID);
  108. callback cb;
  109. client.set_callback(cb);
  110. mqtt::connect_options conopts;
  111. mqtt::message willmsg(TOPIC, LWT_PAYLOAD, 1, true);
  112. mqtt::will_options will(willmsg);
  113. conopts.set_will(will);
  114. cout << " ...OK" << endl;
  115. try {
  116. cout << "\nConnecting..." << endl;
  117. mqtt::token_ptr conntok = client.connect(conopts);
  118. cout << "Waiting for the connection..." << endl;
  119. conntok->wait();
  120. cout << " ...OK" << endl;
  121. // First use a message pointer.
  122. cout << "\nSending message..." << endl;
  123. mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC, PAYLOAD1);
  124. pubmsg->set_qos(QOS);
  125. client.publish(pubmsg)->wait_for(TIMEOUT);
  126. cout << " ...OK" << endl;
  127. // Now try with itemized publish.
  128. cout << "\nSending next message..." << endl;
  129. mqtt::delivery_token_ptr pubtok;
  130. pubtok = client.publish(TOPIC, PAYLOAD2, strlen(PAYLOAD2), QOS, false);
  131. cout << " ...with token: " << pubtok->get_message_id() << endl;
  132. cout << " ...for message with " << pubtok->get_message()->get_payload().size()
  133. << " bytes" << endl;
  134. pubtok->wait_for(TIMEOUT);
  135. cout << " ...OK" << endl;
  136. // Now try with a listener
  137. cout << "\nSending next message..." << endl;
  138. action_listener listener;
  139. pubmsg = mqtt::make_message(TOPIC, PAYLOAD3);
  140. pubtok = client.publish(pubmsg, nullptr, listener);
  141. pubtok->wait();
  142. cout << " ...OK" << endl;
  143. // Finally try with a listener, but no token
  144. cout << "\nSending final message..." << endl;
  145. delivery_action_listener deliveryListener;
  146. pubmsg = mqtt::make_message(TOPIC, PAYLOAD4);
  147. client.publish(pubmsg, nullptr, deliveryListener);
  148. while (!deliveryListener.is_done()) {
  149. this_thread::sleep_for(std::chrono::milliseconds(100));
  150. }
  151. cout << "OK" << endl;
  152. // Double check that there are no pending tokens
  153. auto toks = client.get_pending_delivery_tokens();
  154. if (!toks.empty())
  155. cout << "Error: There are pending delivery tokens!" << endl;
  156. // Disconnect
  157. cout << "\nDisconnecting..." << endl;
  158. conntok = client.disconnect();
  159. conntok->wait();
  160. cout << " ...OK" << endl;
  161. }
  162. catch (const mqtt::exception& exc) {
  163. cerr << exc.what() << endl;
  164. return 1;
  165. }
  166. return 0;
  167. }