async_publish_time.cpp 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. // async_publish_time.cpp
  2. //
  3. // This is a Paho MQTT C++ client, sample application.
  4. //
  5. // It's a fairly contrived, but useful example of an MQTT data monitor and
  6. // publisher, using the C++ asynchronous client interface. A fairly common
  7. // usage for MQTT applications to monitor a sensor and publish the reading
  8. // when it changes by a "significant" amount (whatever that may be).
  9. // This might be temperature, pressure, humidity, soil moisture, CO2 levels,
  10. // or anything like that.
  11. //
  12. // Since we don't have a universal sensor to use for this example, we simply
  13. // use time itself as out input data. We periodically "sample" the time
  14. // value and when it changes by more than our required delta amount, we
  15. // publish the time. In this case we use the system clock, measuring the
  16. // time with millisecond precision.
  17. //
  18. // The sample demonstrates:
  19. // - Connecting to an MQTT server/broker
  20. // - Sampling a value
  21. // - Publishing messages
  22. // - Last will and testament
  23. // - Callbacks with lambdas
  24. // - Implementing callbacks and action listeners
  25. //
  26. /*******************************************************************************
  27. * Copyright (c) 2019 Frank Pagliughi <fpagliughi@mindspring.com>
  28. *
  29. * All rights reserved. This program and the accompanying materials
  30. * are made available under the terms of the Eclipse Public License v1.0
  31. * and Eclipse Distribution License v1.0 which accompany this distribution.
  32. *
  33. * The Eclipse Public License is available at
  34. * http://www.eclipse.org/legal/epl-v10.html
  35. * and the Eclipse Distribution License is available at
  36. * http://www.eclipse.org/org/documents/edl-v10.php.
  37. *
  38. * Contributors:
  39. * Frank Pagliughi - initial implementation and documentation
  40. *******************************************************************************/
  41. #include <iostream>
  42. #include <cstdlib>
  43. #include <string>
  44. #include <thread> // For sleep
  45. #include <atomic>
  46. #include <chrono>
  47. #include <cstring>
  48. #include "mqtt/async_client.h"
  49. using namespace std;
  50. using namespace std::chrono;
  51. const std::string DFLT_SERVER_ADDRESS { "tcp://localhost:1883" };
  52. // The QoS for sending data
  53. const int QOS = 1;
  54. // How often to sample the "data"
  55. const auto SAMPLE_PERIOD = milliseconds(5);
  56. // How much the "data" needs to change before we publish a new value.
  57. const int DELTA_MS = 100;
  58. // How many to buffer while off-line
  59. const int MAX_BUFFERED_MESSAGES = 1200;
  60. // --------------------------------------------------------------------------
  61. // Gets the current time as the number of milliseconds since the epoch:
  62. // like a time_t with ms resolution.
  63. uint64_t timestamp()
  64. {
  65. auto now = system_clock::now();
  66. auto tse = now.time_since_epoch();
  67. auto msTm = duration_cast<milliseconds>(tse);
  68. return uint64_t(msTm.count());
  69. }
  70. // --------------------------------------------------------------------------
  71. int main(int argc, char* argv[])
  72. {
  73. string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS;
  74. uint64_t trun = (argc > 2) ? stoll(argv[2]) : 0LL;
  75. cout << "Initializing for server '" << address << "'..." << endl;
  76. mqtt::async_client cli(address, "", MAX_BUFFERED_MESSAGES);
  77. // Set callbacks for connected and connection lost.
  78. cli.set_connected_handler([&cli](const std::string&) {
  79. std::cout << "*** Connected ("
  80. << timestamp() << ") ***" << std::endl;
  81. });
  82. cli.set_connection_lost_handler([&cli](const std::string&) {
  83. std::cout << "*** Connection Lost ("
  84. << timestamp() << ") ***" << std::endl;
  85. });
  86. mqtt::connect_options connopts;
  87. mqtt::message willmsg("test/events", "Time publisher disconnected", 1, true);
  88. mqtt::will_options will(willmsg);
  89. connopts.set_will(will);
  90. connopts.set_automatic_reconnect(1, 10);
  91. try {
  92. cout << "Connecting..." << endl;
  93. cli.connect(connopts)->wait();
  94. mqtt::topic top(cli, "data/time", QOS);
  95. cout << "Publishing data..." << endl;
  96. uint64_t t = timestamp(),
  97. tlast = t,
  98. tstart = t;
  99. top.publish(to_string(t));
  100. while (true) {
  101. this_thread::sleep_for(SAMPLE_PERIOD);
  102. t = timestamp();
  103. if (abs(int(t - tlast)) >= DELTA_MS)
  104. top.publish(to_string(tlast = t));
  105. if (trun > 0 && t >= (trun + tstart))
  106. break;
  107. }
  108. // Disconnect
  109. cout << "\nDisconnecting..." << endl;
  110. cli.disconnect()->wait();
  111. cout << " ...OK" << endl;
  112. }
  113. catch (const mqtt::exception& exc) {
  114. cerr << exc.what() << endl;
  115. return 1;
  116. }
  117. return 0;
  118. }