rpc_math_cli.cpp 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // rpc_math_cli.cpp
  2. //
  3. // This is a Paho MQTT v5 C++ sample application.
  4. //
  5. // It's an example of how to create a client for performing remote procedure
  6. // calls using MQTT with the 'response topic' and 'correlation data'
  7. // properties.
  8. //
  9. // The sample demonstrates:
  10. // - Connecting to an MQTT server/broker
  11. // - Using MQTT v5 properties
  12. // - Publishing RPC request messages
  13. // - Using asynchronous tokens
  14. // - Subscribing to reply topic
  15. //
  16. /*******************************************************************************
  17. * Copyright (c) 2019 Frank Pagliughi <fpagliughi@mindspring.com>
  18. *
  19. * All rights reserved. This program and the accompanying materials
  20. * are made available under the terms of the Eclipse Public License v1.0
  21. * and Eclipse Distribution License v1.0 which accompany this distribution.
  22. *
  23. * The Eclipse Public License is available at
  24. * http://www.eclipse.org/legal/epl-v10.html
  25. * and the Eclipse Distribution License is available at
  26. * http://www.eclipse.org/org/documents/edl-v10.php.
  27. *
  28. * Contributors:
  29. * Frank Pagliughi - initial implementation and documentation
  30. *******************************************************************************/
  31. #include <iostream>
  32. #include <sstream>
  33. #include <cstdlib>
  34. #include <string>
  35. #include <thread> // For sleep
  36. #include <atomic>
  37. #include <chrono>
  38. #include <cstring>
  39. #include "mqtt/async_client.h"
  40. #include "mqtt/properties.h"
  41. using namespace std;
  42. using namespace std::chrono;
  43. const string SERVER_ADDRESS { "tcp://localhost:1883" };
  44. const auto TIMEOUT = std::chrono::seconds(10);
  45. /////////////////////////////////////////////////////////////////////////////
  46. int main(int argc, char* argv[])
  47. {
  48. if (argc < 4) {
  49. cout << "USAGE: rpc_math_cli <add|mult> <num1> <num2> [... numN]" << endl;
  50. return 1;
  51. }
  52. constexpr int QOS = 1;
  53. const string REQ_TOPIC_HDR { "requests/math/" };
  54. mqtt::async_client cli(SERVER_ADDRESS, "");
  55. mqtt::connect_options connopts;
  56. connopts.set_mqtt_version(MQTTVERSION_5);
  57. connopts.set_clean_start(true);
  58. cli.start_consuming();
  59. try {
  60. cout << "Connecting..." << flush;
  61. mqtt::token_ptr tok = cli.connect(connopts);
  62. auto connRsp = tok->get_connect_response();
  63. cout << "OK (" << connRsp.get_server_uri() << ")" << endl;
  64. // Since we gave an empty client ID, the server should create a
  65. // unique one for us and send it back as ASSIGNED_CLIENT_IDENTIFER
  66. // in the connect properties.
  67. string clientId = get<string>(connRsp.get_properties(),
  68. mqtt::property::ASSIGNED_CLIENT_IDENTIFER);
  69. // So now we can create a unique RPC response topic using
  70. // the assigned (unique) client ID.
  71. string repTopic = "replies/" + clientId + "/math";
  72. cout << " Reply topic: " << repTopic << endl;
  73. // Subscribe to the reply topic and verify the QoS
  74. tok = cli.subscribe(repTopic, QOS);
  75. tok->wait();
  76. if (int(tok->get_reason_code()) != QOS) {
  77. cerr << "Error: Server doesn't support reply QoS: ["
  78. << tok->get_reason_code() << "]" << endl;
  79. return 2;
  80. }
  81. // Create and send the request message
  82. string req { argv[1] },
  83. reqTopic { REQ_TOPIC_HDR + req };
  84. mqtt::properties props {
  85. { mqtt::property::RESPONSE_TOPIC, repTopic },
  86. { mqtt::property::CORRELATION_DATA, "1" }
  87. };
  88. ostringstream os;
  89. os << "[ ";
  90. for (int i=2; i<argc-1; ++i)
  91. os << argv[i] << ", ";
  92. os << argv[argc-1] << " ]";
  93. string reqArgs { os.str() };
  94. cout << "\nSending '" << req << "' request " << os.str() << "..." << flush;
  95. mqtt::message_ptr pubmsg = mqtt::make_message(reqTopic, reqArgs);
  96. pubmsg->set_qos(QOS);
  97. pubmsg->set_properties(props);
  98. cli.publish(pubmsg)->wait_for(TIMEOUT);
  99. cout << "OK" << endl;
  100. // Wait for reply.
  101. auto msg = cli.try_consume_message_for(seconds(5));
  102. if (!msg) {
  103. cerr << "Didn't receive a reply from the service." << endl;
  104. return 1;
  105. }
  106. cout << " Result: " << msg->to_string() << endl;
  107. // Unsubscribe
  108. cli.unsubscribe(repTopic)->wait();
  109. // Disconnect
  110. cout << "\nDisconnecting..." << flush;
  111. cli.disconnect()->wait();
  112. cout << "OK" << endl;
  113. }
  114. catch (const mqtt::exception& exc) {
  115. cerr << exc.what() << endl;
  116. return 1;
  117. }
  118. return 0;
  119. }