pub_speed_test.cpp 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. // pub_speed_test.cpp
  2. //
  3. // Paho C++ sample client application to do a simple test of the speed at
  4. // which messages can be published.
  5. //
  6. /*******************************************************************************
  7. * Copyright (c) 2013-2017 Frank Pagliughi <fpagliughi@mindspring.com>
  8. *
  9. * All rights reserved. This program and the accompanying materials
  10. * are made available under the terms of the Eclipse Public License v1.0
  11. * and Eclipse Distribution License v1.0 which accompany this distribution.
  12. *
  13. * The Eclipse Public License is available at
  14. * http://www.eclipse.org/legal/epl-v10.html
  15. * and the Eclipse Distribution License is available at
  16. * http://www.eclipse.org/org/documents/edl-v10.php.
  17. *
  18. * Contributors:
  19. * Frank Pagliughi - initial implementation and documentation
  20. *******************************************************************************/
  21. #include <iostream>
  22. #include <cstdlib>
  23. #include <string>
  24. #include <thread>
  25. #include <future>
  26. #include <atomic>
  27. #include <chrono>
  28. #include <cstring>
  29. #include "mqtt/async_client.h"
  30. #include "mqtt/thread_queue.h"
  31. using namespace std;
  32. using namespace std::chrono;
  33. const std::string DFLT_SERVER_ADDRESS { "tcp://localhost:1883" };
  34. const size_t DFLT_PAYLOAD_SIZE = 1024;
  35. const int DFLT_N_MSG = 1000,
  36. DFLT_QOS = 1;
  37. const string TOPIC {"test/speed"};
  38. const char* LWT_PAYLOAD = "pub_speed_test died unexpectedly.";
  39. // Queue for passing tokens to the wait thread
  40. mqtt::thread_queue<mqtt::delivery_token_ptr> que;
  41. // Get the current time on the steady clock
  42. steady_clock::time_point now() { return steady_clock::now(); }
  43. // Convert a duration to a count of milliseconds
  44. template <class Rep, class Period>
  45. int64_t msec(const std::chrono::duration<Rep, Period>& dur) {
  46. return (int64_t) duration_cast<milliseconds>(dur).count();
  47. }
  48. // --------------------------------------------------------------------------
  49. // Thread function will wait for all the tokens to complete.
  50. // Any exceptions thrown from here will be caught in main().
  51. void token_wait_func()
  52. {
  53. while (true) {
  54. mqtt::delivery_token_ptr tok = que.get();
  55. if (!tok) break;
  56. //cout.put('x');
  57. tok->wait();
  58. }
  59. }
  60. // --------------------------------------------------------------------------
  61. int main(int argc, char* argv[])
  62. {
  63. string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS;
  64. int nMsg = (argc > 2) ? atoi(argv[2]) : DFLT_N_MSG;
  65. size_t msgSz = (size_t) ((argc > 3) ? atol(argv[3]) : DFLT_PAYLOAD_SIZE);
  66. int qos = (argc > 4) ? atoi(argv[4]) : DFLT_QOS;
  67. cout << "Initializing for server '" << address << "'..." << flush;
  68. mqtt::async_client cli(address, "");
  69. mqtt::message willmsg(TOPIC, LWT_PAYLOAD, 1, true);
  70. mqtt::will_options will(willmsg);
  71. mqtt::connect_options connOpts;
  72. connOpts.set_clean_session(true);
  73. connOpts.set_will(will);
  74. // Create a payload
  75. mqtt::binary payload;
  76. for (size_t i=0; i<msgSz; ++i)
  77. payload.push_back('a' + i%26);
  78. cout << "OK" << endl;
  79. try {
  80. // Create the message (move payload into it)
  81. auto msg = mqtt::make_message(TOPIC, std::move(payload), qos, false);
  82. // Connect to the broker
  83. cout << "\nConnecting..." << flush;
  84. auto start = now();
  85. cli.connect(connOpts)->wait();
  86. auto end = now();
  87. cout << "OK" << endl;
  88. cout << "Connected in " << msec(end - start) << "ms" << endl;
  89. auto fut = std::async(launch::async, token_wait_func);
  90. // Publish the messages
  91. cout << "\nPublishing " << nMsg << " messages..." << flush;
  92. start = now();
  93. for (int i=0; i<nMsg; ++i) {
  94. auto dtok = cli.publish(msg);
  95. //cout.put('^');
  96. que.put(std::move(dtok));
  97. }
  98. auto pubend = now();
  99. que.put(mqtt::delivery_token_ptr());
  100. // Wait for all the tokens to complete
  101. fut.get();
  102. end = now();
  103. cout << "OK" << endl;
  104. auto ms = msec(pubend - start);
  105. cout << "Published in " << ms << "ms " << (nMsg/ms) << "k msg/sec" << endl;
  106. ms = msec(end - start);
  107. cout << "Acknowledged in " << ms << "ms " << (nMsg/ms) << "k msg/sec" << endl;
  108. // Disconnect
  109. cout << "\nDisconnecting..." << flush;
  110. start = now();
  111. cli.disconnect(seconds(10))->wait();
  112. end = now();
  113. cout << "OK" << endl;
  114. cout << "Disconnected in " << msec(end - start) << "ms" << endl;
  115. }
  116. catch (const mqtt::exception& exc) {
  117. que.put(mqtt::delivery_token_ptr{});
  118. cerr << exc.what() << endl;
  119. return 1;
  120. }
  121. return 0;
  122. }