rpc_math_srvr.cpp 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. // rpc_math_srvr.cpp
  2. //
  3. // This is a Paho MQTT C++ client, sample application.
  4. //
  5. // This application is an MQTT consumer/subscriber using the C++ synchronous
  6. // client interface, which uses the queuing API to receive messages.
  7. //
  8. // The sample demonstrates:
  9. // - Connecting to an MQTT server/broker
  10. // - Subscribing to multiple topics
  11. // - Receiving messages through the queueing consumer API
  12. // - Recieving and acting upon commands via MQTT topics
  13. // - Manual reconnects
  14. // - Using a persistent (non-clean) session
  15. //
  16. /*******************************************************************************
  17. * Copyright (c) 2013-2017 Frank Pagliughi <fpagliughi@mindspring.com>
  18. *
  19. * All rights reserved. This program and the accompanying materials
  20. * are made available under the terms of the Eclipse Public License v1.0
  21. * and Eclipse Distribution License v1.0 which accompany this distribution.
  22. *
  23. * The Eclipse Public License is available at
  24. * http://www.eclipse.org/legal/epl-v10.html
  25. * and the Eclipse Distribution License is available at
  26. * http://www.eclipse.org/org/documents/edl-v10.php.
  27. *
  28. * Contributors:
  29. * Frank Pagliughi - initial implementation and documentation
  30. *******************************************************************************/
  31. #include <iostream>
  32. #include <sstream>
  33. #include <cstdlib>
  34. #include <string>
  35. #include <cstring>
  36. #include <cctype>
  37. #include <thread>
  38. #include <chrono>
  39. #include "mqtt/client.h"
  40. using namespace std;
  41. using namespace std::chrono;
  42. const string SERVER_ADDRESS { "tcp://localhost:1883" };
  43. const string CLIENT_ID { "rpc_math_srvr" };
  44. constexpr auto RESPONSE_TOPIC = mqtt::property::RESPONSE_TOPIC;
  45. constexpr auto CORRELATION_DATA = mqtt::property::CORRELATION_DATA;
  46. // --------------------------------------------------------------------------
  47. // Simple function to manually reconect a client.
  48. bool try_reconnect(mqtt::client& cli)
  49. {
  50. constexpr int N_ATTEMPT = 30;
  51. for (int i=0; i<N_ATTEMPT && !cli.is_connected(); ++i) {
  52. try {
  53. cli.reconnect();
  54. return true;
  55. }
  56. catch (const mqtt::exception&) {
  57. this_thread::sleep_for(seconds(1));
  58. }
  59. }
  60. return false;
  61. }
  62. // --------------------------------------------------------------------------
  63. // RPC function implementations
  64. double add(const std::vector<double>& nums)
  65. {
  66. double sum = 0.0;
  67. for (auto n : nums)
  68. sum += n;
  69. return sum;
  70. }
  71. double mult(const std::vector<double>& nums)
  72. {
  73. double prod = 1.0;
  74. for (auto n : nums)
  75. prod *= n;
  76. return prod;
  77. }
  78. /////////////////////////////////////////////////////////////////////////////
  79. int main(int argc, char* argv[])
  80. {
  81. mqtt::connect_options connOpts;
  82. connOpts.set_mqtt_version(MQTTVERSION_5);
  83. connOpts.set_keep_alive_interval(20);
  84. connOpts.set_clean_start(true);
  85. mqtt::client cli(SERVER_ADDRESS, CLIENT_ID);
  86. const vector<string> TOPICS { "requests/math", "requests/math/#" };
  87. const vector<int> QOS { 1, 1 };
  88. try {
  89. cout << "Connecting to the MQTT server..." << flush;
  90. cli.connect(connOpts);
  91. cli.subscribe(TOPICS, QOS);
  92. cout << "OK\n" << endl;
  93. // Consume messages
  94. while (true) {
  95. auto msg = cli.consume_message();
  96. if (!msg) {
  97. if (!cli.is_connected()) {
  98. cout << "Lost connection. Attempting reconnect" << endl;
  99. if (try_reconnect(cli)) {
  100. cli.subscribe(TOPICS, QOS);
  101. cout << "Reconnected" << endl;
  102. continue;
  103. }
  104. else {
  105. cout << "Reconnect failed." << endl;
  106. break;
  107. }
  108. }
  109. else
  110. break;
  111. }
  112. cout << "Received a request" << endl;
  113. const mqtt::properties& props = msg->get_properties();
  114. if (props.contains(RESPONSE_TOPIC) && props.contains(CORRELATION_DATA)) {
  115. mqtt::binary corr_id = mqtt::get<string>(props, CORRELATION_DATA);
  116. string reply_to = mqtt::get<string>(props, RESPONSE_TOPIC);
  117. cout << "Client wants a reply to [" << corr_id << "] on '"
  118. << reply_to << "'" << endl;
  119. cout << msg->get_topic() << ": " << msg->to_string() << endl;
  120. char c;
  121. double x;
  122. vector<double> nums;
  123. istringstream is(msg->to_string());
  124. if (!(is >> c) || c != '[') {
  125. cout << "Malformed arguments" << endl;
  126. // Maybe send an error message to client.
  127. continue;
  128. }
  129. c = ',';
  130. while (c == ',' && (is >> x >> c))
  131. nums.push_back(x);
  132. if (c != ']') {
  133. cout << "Bad closing delimiter" << endl;
  134. continue;
  135. }
  136. x = 0.0;
  137. if (msg->get_topic() == "requests/math/add")
  138. x = add(nums);
  139. else if (msg->get_topic() == "requests/math/mult")
  140. x = mult(nums);
  141. else {
  142. cout << "Unknown request: " << msg->get_topic() << endl;
  143. continue;
  144. }
  145. cout << " Result: " << x << endl;
  146. auto reply_msg = mqtt::message::create(reply_to, to_string(x), 1, false);
  147. cli.publish(reply_msg);
  148. }
  149. }
  150. // Disconnect
  151. cout << "\nDisconnecting from the MQTT server..." << flush;
  152. cli.disconnect();
  153. cout << "OK" << endl;
  154. }
  155. catch (const mqtt::exception& exc) {
  156. cerr << exc.what() << endl;
  157. return 1;
  158. }
  159. return 0;
  160. }