async_consume.cpp 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. // async_consume.cpp
  2. //
  3. // This is a Paho MQTT C++ client, sample application.
  4. //
  5. // This application is an MQTT consumer/subscriber using the C++
  6. // asynchronous client interface, employing the to receive messages
  7. // and status updates.
  8. //
  9. // The sample demonstrates:
  10. // - Connecting to an MQTT server/broker.
  11. // - Subscribing to a topic
  12. // - Receiving messages through the synchronous queuing API
  13. //
  14. /*******************************************************************************
  15. * Copyright (c) 2013-2017 Frank Pagliughi <fpagliughi@mindspring.com>
  16. *
  17. * All rights reserved. This program and the accompanying materials
  18. * are made available under the terms of the Eclipse Public License v1.0
  19. * and Eclipse Distribution License v1.0 which accompany this distribution.
  20. *
  21. * The Eclipse Public License is available at
  22. * http://www.eclipse.org/legal/epl-v10.html
  23. * and the Eclipse Distribution License is available at
  24. * http://www.eclipse.org/org/documents/edl-v10.php.
  25. *
  26. * Contributors:
  27. * Frank Pagliughi - initial implementation and documentation
  28. *******************************************************************************/
  29. #include <iostream>
  30. #include <cstdlib>
  31. #include <string>
  32. #include <cstring>
  33. #include <cctype>
  34. #include <thread>
  35. #include <chrono>
  36. #include "mqtt/async_client.h"
  37. using namespace std;
  38. const string SERVER_ADDRESS { "tcp://test.mosquitto.org:1883" };
  39. const string CLIENT_ID { "async_consume" };
  40. const string TOPIC { "baiyu" };
  41. const int QOS = 1;
  42. /////////////////////////////////////////////////////////////////////////////
  43. int main(int argc, char* argv[])
  44. {
  45. mqtt::connect_options connOpts;
  46. connOpts.set_keep_alive_interval(20);
  47. connOpts.set_clean_session(true);
  48. mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
  49. try {
  50. cout << "Connecting to the MQTT server..." << flush;
  51. cli.connect(connOpts)->wait();
  52. cli.start_consuming();
  53. cli.subscribe(TOPIC, QOS)->wait();
  54. cout << "OK" << endl;
  55. // Consume messages
  56. while (true) {
  57. auto msg = cli.consume_message();
  58. if (msg->to_string() == "quit") break;
  59. cout << msg->get_topic() << ": " << msg->to_string() << endl;
  60. }
  61. // Send messages
  62. // Disconnect
  63. cout << "\nShutting down and disconnecting from the MQTT server..." << flush;
  64. cli.unsubscribe(TOPIC)->wait();
  65. cli.stop_consuming();
  66. cli.disconnect()->wait();
  67. cout << "OK" << endl;
  68. }
  69. catch (const mqtt::exception& exc) {
  70. cerr << exc.what() << endl;
  71. return 1;
  72. }
  73. return 0;
  74. }