123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- // async_subscribe.cpp
- //
- // This is a Paho MQTT C++ client, sample application.
- //
- // This application is an MQTT subscriber using the C++ asynchronous client
- // interface, employing callbacks to receive messages and status updates.
- //
- // The sample demonstrates:
- // - Connecting to an MQTT server/broker.
- // - Subscribing to a topic
- // - Receiving messages through the callback API
- // - Receiving network disconnect updates and attempting manual reconnects.
- // - Using a "clean session" and manually re-subscribing to topics on
- // reconnect.
- //
- /*******************************************************************************
- * Copyright (c) 2013-2017 Frank Pagliughi <fpagliughi@mindspring.com>
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v10.html
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Frank Pagliughi - initial implementation and documentation
- *******************************************************************************/
- #include <iostream>
- #include <cstdlib>
- #include <string>
- #include <cstring>
- #include <cctype>
- #include <thread>
- #include <chrono>
- #include "mqtt/async_client.h"
- const std::string SERVER_ADDRESS("tcp://localhost:1883");
- const std::string CLIENT_ID("async_subcribe_cpp");
- const std::string TOPIC("hello");
- const int QOS = 1;
- const int N_RETRY_ATTEMPTS = 5;
- /////////////////////////////////////////////////////////////////////////////
- // Callbacks for the success or failures of requested actions.
- // This could be used to initiate further action, but here we just log the
- // results to the console.
- class action_listener : public virtual mqtt::iaction_listener
- {
- std::string name_;
- void on_failure(const mqtt::token& tok) override {
- std::cout << name_ << " failure";
- if (tok.get_message_id() != 0)
- std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
- std::cout << std::endl;
- }
- void on_success(const mqtt::token& tok) override {
- std::cout << name_ << " success";
- if (tok.get_message_id() != 0)
- std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
- auto top = tok.get_topics();
- if (top && !top->empty())
- std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
- std::cout << std::endl;
- }
- public:
- action_listener(const std::string& name) : name_(name) {}
- };
- /////////////////////////////////////////////////////////////////////////////
- /**
- * Local callback & listener class for use with the client connection.
- * This is primarily intended to receive messages, but it will also monitor
- * the connection to the broker. If the connection is lost, it will attempt
- * to restore the connection and re-subscribe to the topic.
- */
- class callback : public virtual mqtt::callback,
- public virtual mqtt::iaction_listener
- {
- // Counter for the number of connection retries
- int nretry_;
- // The MQTT client
- mqtt::async_client& cli_;
- // Options to use if we need to reconnect
- mqtt::connect_options& connOpts_;
- // An action listener to display the result of actions.
- action_listener subListener_;
- // This deomonstrates manually reconnecting to the broker by calling
- // connect() again. This is a possibility for an application that keeps
- // a copy of it's original connect_options, or if the app wants to
- // reconnect with different options.
- // Another way this can be done manually, if using the same options, is
- // to just call the async_client::reconnect() method.
- void reconnect() {
- std::this_thread::sleep_for(std::chrono::milliseconds(2500));
- try {
- cli_.connect(connOpts_, nullptr, *this);
- }
- catch (const mqtt::exception& exc) {
- std::cerr << "Error: " << exc.what() << std::endl;
- exit(1);
- }
- }
- // Re-connection failure
- void on_failure(const mqtt::token& tok) override {
- std::cout << "Connection attempt failed" << std::endl;
- if (++nretry_ > N_RETRY_ATTEMPTS)
- exit(1);
- reconnect();
- }
- // (Re)connection success
- // Either this or connected() can be used for callbacks.
- void on_success(const mqtt::token& tok) override {}
- // (Re)connection success
- void connected(const std::string& cause) override {
- std::cout << "\nConnection success" << std::endl;
- std::cout << "\nSubscribing to topic '" << TOPIC << "'\n"
- << "\tfor client " << CLIENT_ID
- << " using QoS" << QOS << "\n"
- << "\nPress Q<Enter> to quit\n" << std::endl;
- cli_.subscribe(TOPIC, QOS, nullptr, subListener_);
- }
- // Callback for when the connection is lost.
- // This will initiate the attempt to manually reconnect.
- void connection_lost(const std::string& cause) override {
- std::cout << "\nConnection lost" << std::endl;
- if (!cause.empty())
- std::cout << "\tcause: " << cause << std::endl;
- std::cout << "Reconnecting..." << std::endl;
- nretry_ = 0;
- reconnect();
- }
- // Callback for when a message arrives.
- void message_arrived(mqtt::const_message_ptr msg) override {
- std::cout << "Message arrived" << std::endl;
- std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;
- std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl;
- }
- void delivery_complete(mqtt::delivery_token_ptr token) override {}
- public:
- callback(mqtt::async_client& cli, mqtt::connect_options& connOpts)
- : nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription") {}
- };
- /////////////////////////////////////////////////////////////////////////////
- int main(int argc, char* argv[])
- {
- mqtt::connect_options connOpts;
- connOpts.set_keep_alive_interval(20);
- connOpts.set_clean_session(true);
- mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID);
- callback cb(client, connOpts);
- client.set_callback(cb);
- // Start the connection.
- // When completed, the callback will subscribe to topic.
- try {
- std::cout << "Connecting to the MQTT server..." << std::flush;
- client.connect(connOpts, nullptr, cb);
- }
- catch (const mqtt::exception&) {
- std::cerr << "\nERROR: Unable to connect to MQTT server: '"
- << SERVER_ADDRESS << "'" << std::endl;
- return 1;
- }
- // Just block till user tells us to quit.
- while (std::tolower(std::cin.get()) != 'q')
- ;
- // Disconnect
- try {
- std::cout << "\nDisconnecting from the MQTT server..." << std::flush;
- client.disconnect()->wait();
- std::cout << "OK" << std::endl;
- }
- catch (const mqtt::exception& exc) {
- std::cerr << exc.what() << std::endl;
- return 1;
- }
- return 0;
- }
|