sync_consume.cpp 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. // sync_consume.cpp
  2. //
  3. // This is a Paho MQTT C++ client, sample application.
  4. //
  5. // This application is an MQTT consumer/subscriber using the C++ synchronous
  6. // client interface, which uses the queuing API to receive messages.
  7. //
  8. // The sample demonstrates:
  9. // - Connecting to an MQTT server/broker
  10. // - Subscribing to multiple topics
  11. // - Receiving messages through the queueing consumer API
  12. // - Recieving and acting upon commands via MQTT topics
  13. // - Manual reconnects
  14. // - Using a persistent (non-clean) session
  15. //
  16. /*******************************************************************************
  17. * Copyright (c) 2013-2017 Frank Pagliughi <fpagliughi@mindspring.com>
  18. *
  19. * All rights reserved. This program and the accompanying materials
  20. * are made available under the terms of the Eclipse Public License v1.0
  21. * and Eclipse Distribution License v1.0 which accompany this distribution.
  22. *
  23. * The Eclipse Public License is available at
  24. * http://www.eclipse.org/legal/epl-v10.html
  25. * and the Eclipse Distribution License is available at
  26. * http://www.eclipse.org/org/documents/edl-v10.php.
  27. *
  28. * Contributors:
  29. * Frank Pagliughi - initial implementation and documentation
  30. *******************************************************************************/
  31. #include <iostream>
  32. #include <cstdlib>
  33. #include <string>
  34. #include <cstring>
  35. #include <cctype>
  36. #include <thread>
  37. #include <chrono>
  38. #include "mqtt/client.h"
  39. using namespace std;
  40. using namespace std::chrono;
  41. const string SERVER_ADDRESS { "tcp://localhost:1883" };
  42. const string CLIENT_ID { "sync_consume_cpp" };
  43. // --------------------------------------------------------------------------
  44. // Simple function to manually reconect a client.
  45. bool try_reconnect(mqtt::client& cli)
  46. {
  47. constexpr int N_ATTEMPT = 30;
  48. for (int i=0; i<N_ATTEMPT && !cli.is_connected(); ++i) {
  49. try {
  50. cli.reconnect();
  51. return true;
  52. }
  53. catch (const mqtt::exception&) {
  54. this_thread::sleep_for(seconds(1));
  55. }
  56. }
  57. return false;
  58. }
  59. /////////////////////////////////////////////////////////////////////////////
  60. int main(int argc, char* argv[])
  61. {
  62. mqtt::connect_options connOpts;
  63. connOpts.set_keep_alive_interval(20);
  64. connOpts.set_clean_session(false);
  65. mqtt::client cli(SERVER_ADDRESS, CLIENT_ID);
  66. const vector<string> TOPICS { "data/#", "command" };
  67. const vector<int> QOS { 0, 1 };
  68. try {
  69. cout << "Connecting to the MQTT server..." << flush;
  70. mqtt::connect_response rsp = cli.connect(connOpts);
  71. cout << "OK\n" << endl;
  72. if (!rsp.is_session_present()) {
  73. std::cout << "Subscribing to topics..." << std::flush;
  74. cli.subscribe(TOPICS, QOS);
  75. std::cout << "OK" << std::endl;
  76. }
  77. else {
  78. cout << "Session already present. Skipping subscribe." << std::endl;
  79. }
  80. // Consume messages
  81. while (true) {
  82. auto msg = cli.consume_message();
  83. if (!msg) {
  84. if (!cli.is_connected()) {
  85. cout << "Lost connection. Attempting reconnect" << endl;
  86. if (try_reconnect(cli)) {
  87. cli.subscribe(TOPICS, QOS);
  88. cout << "Reconnected" << endl;
  89. continue;
  90. }
  91. else {
  92. cout << "Reconnect failed." << endl;
  93. }
  94. }
  95. else {
  96. cout << "An error occurred retrieving messages." << endl;
  97. }
  98. break;
  99. }
  100. if (msg->get_topic() == "command" &&
  101. msg->to_string() == "exit") {
  102. cout << "Exit command received" << endl;
  103. break;
  104. }
  105. cout << msg->get_topic() << ": " << msg->to_string() << endl;
  106. }
  107. // Disconnect
  108. cout << "\nDisconnecting from the MQTT server..." << flush;
  109. cli.disconnect();
  110. cout << "OK" << endl;
  111. }
  112. catch (const mqtt::exception& exc) {
  113. cerr << exc.what() << endl;
  114. return 1;
  115. }
  116. return 0;
  117. }