123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- // rpc_math_cli.cpp
- //
- // This is a Paho MQTT v5 C++ sample application.
- //
- // It's an example of how to create a client for performing remote procedure
- // calls using MQTT with the 'response topic' and 'correlation data'
- // properties.
- //
- // The sample demonstrates:
- // - Connecting to an MQTT server/broker
- // - Using MQTT v5 properties
- // - Publishing RPC request messages
- // - Using asynchronous tokens
- // - Subscribing to reply topic
- //
- /*******************************************************************************
- * Copyright (c) 2019 Frank Pagliughi <fpagliughi@mindspring.com>
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v10.html
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Frank Pagliughi - initial implementation and documentation
- *******************************************************************************/
- #include <iostream>
- #include <sstream>
- #include <cstdlib>
- #include <string>
- #include <thread> // For sleep
- #include <atomic>
- #include <chrono>
- #include <cstring>
- #include "mqtt/async_client.h"
- #include "mqtt/properties.h"
- using namespace std;
- using namespace std::chrono;
- const string SERVER_ADDRESS { "tcp://localhost:1883" };
- const auto TIMEOUT = std::chrono::seconds(10);
- /////////////////////////////////////////////////////////////////////////////
- int main(int argc, char* argv[])
- {
- if (argc < 4) {
- cout << "USAGE: rpc_math_cli <add|mult> <num1> <num2> [... numN]" << endl;
- return 1;
- }
- constexpr int QOS = 1;
- const string REQ_TOPIC_HDR { "requests/math/" };
- mqtt::async_client cli(SERVER_ADDRESS, "");
- mqtt::connect_options connopts;
- connopts.set_mqtt_version(MQTTVERSION_5);
- connopts.set_clean_start(true);
- cli.start_consuming();
- try {
- cout << "Connecting..." << flush;
- mqtt::token_ptr tok = cli.connect(connopts);
- auto connRsp = tok->get_connect_response();
- cout << "OK (" << connRsp.get_server_uri() << ")" << endl;
- // Since we gave an empty client ID, the server should create a
- // unique one for us and send it back as ASSIGNED_CLIENT_IDENTIFER
- // in the connect properties.
- string clientId = get<string>(connRsp.get_properties(),
- mqtt::property::ASSIGNED_CLIENT_IDENTIFER);
- // So now we can create a unique RPC response topic using
- // the assigned (unique) client ID.
- string repTopic = "replies/" + clientId + "/math";
- cout << " Reply topic: " << repTopic << endl;
- // Subscribe to the reply topic and verify the QoS
- tok = cli.subscribe(repTopic, QOS);
- tok->wait();
- if (int(tok->get_reason_code()) != QOS) {
- cerr << "Error: Server doesn't support reply QoS: ["
- << tok->get_reason_code() << "]" << endl;
- return 2;
- }
- // Create and send the request message
- string req { argv[1] },
- reqTopic { REQ_TOPIC_HDR + req };
- mqtt::properties props {
- { mqtt::property::RESPONSE_TOPIC, repTopic },
- { mqtt::property::CORRELATION_DATA, "1" }
- };
- ostringstream os;
- os << "[ ";
- for (int i=2; i<argc-1; ++i)
- os << argv[i] << ", ";
- os << argv[argc-1] << " ]";
- string reqArgs { os.str() };
- cout << "\nSending '" << req << "' request " << os.str() << "..." << flush;
- mqtt::message_ptr pubmsg = mqtt::make_message(reqTopic, reqArgs);
- pubmsg->set_qos(QOS);
- pubmsg->set_properties(props);
- cli.publish(pubmsg)->wait_for(TIMEOUT);
- cout << "OK" << endl;
- // Wait for reply.
- auto msg = cli.try_consume_message_for(seconds(5));
- if (!msg) {
- cerr << "Didn't receive a reply from the service." << endl;
- return 1;
- }
- cout << " Result: " << msg->to_string() << endl;
- // Unsubscribe
- cli.unsubscribe(repTopic)->wait();
- // Disconnect
- cout << "\nDisconnecting..." << flush;
- cli.disconnect()->wait();
- cout << "OK" << endl;
- }
- catch (const mqtt::exception& exc) {
- cerr << exc.what() << endl;
- return 1;
- }
- return 0;
- }
|