async_client.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824
  1. // async_client.cpp
  2. /*******************************************************************************
  3. * Copyright (c) 2013-2019 Frank Pagliughi <fpagliughi@mindspring.com>
  4. *
  5. * All rights reserved. This program and the accompanying materials
  6. * are made available under the terms of the Eclipse Public License v1.0
  7. * and Eclipse Distribution License v1.0 which accompany this distribution.
  8. *
  9. * The Eclipse Public License is available at
  10. * http://www.eclipse.org/legal/epl-v10.html
  11. * and the Eclipse Distribution License is available at
  12. * http://www.eclipse.org/org/documents/edl-v10.php.
  13. *
  14. * Contributors:
  15. * Frank Pagliughi - initial implementation and documentation
  16. * Frank Pagliughi - MQTT v5 support
  17. *******************************************************************************/
  18. #include "mqtt/async_client.h"
  19. #include "mqtt/token.h"
  20. #include "mqtt/message.h"
  21. #include "mqtt/response_options.h"
  22. #include "mqtt/disconnect_options.h"
  23. #include <thread>
  24. #include <mutex>
  25. #include <condition_variable>
  26. #include <chrono>
  27. #include <cstring>
  28. #include <cstdio>
  29. // TODO: Delete this when #680 is merged into the Paho C lib
  30. #if !defined(MQTTAsync_createOptions_initializer5)
  31. #define MQTTAsync_createOptions_initializer5 { {'M', 'Q', 'C', 'O'}, 0, 0, 100, MQTTVERSION_5 }
  32. #endif
  33. /////////////////////////////////////////////////////////////////////////////
  34. // Paho C logger
  35. enum LOG_LEVELS {
  36. INVALID_LEVEL = -1,
  37. TRACE_MAX = 1,
  38. TRACE_MED,
  39. TRACE_MIN,
  40. TRACE_PROTOCOL,
  41. LOG_PROTOCOL = TRACE_PROTOCOL,
  42. LOG_ERROR,
  43. LOG_SEVERE,
  44. LOG_FATAL,
  45. };
  46. extern "C" {
  47. void Log(enum LOG_LEVELS, int, const char *, ...);
  48. }
  49. /////////////////////////////////////////////////////////////////////////////
  50. namespace mqtt {
  51. /////////////////////////////////////////////////////////////////////////////
  52. // Constructors
  53. async_client::async_client(const string& serverURI, const string& clientId,
  54. const string& persistDir)
  55. : async_client(serverURI, clientId, 0, persistDir)
  56. {
  57. }
  58. async_client::async_client(const string& serverURI, const string& clientId,
  59. iclient_persistence* persistence /*=nullptr*/)
  60. : async_client(serverURI, clientId, 0, persistence)
  61. {
  62. }
  63. async_client::async_client(const string& serverURI, const string& clientId,
  64. int maxBufferedMessages, const string& persistDir)
  65. : serverURI_(serverURI), clientId_(clientId), mqttVersion_(MQTTVERSION_DEFAULT),
  66. persist_(nullptr), userCallback_(nullptr)
  67. {
  68. MQTTAsync_createOptions opts MQTTAsync_createOptions_initializer5;
  69. if (maxBufferedMessages != 0) {
  70. opts.sendWhileDisconnected = to_int(true);
  71. opts.maxBufferedMessages = maxBufferedMessages;
  72. }
  73. int rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
  74. MQTTCLIENT_PERSISTENCE_DEFAULT,
  75. const_cast<char*>(persistDir.c_str()),
  76. &opts);
  77. if (rc != 0)
  78. throw exception(rc);
  79. }
  80. async_client::async_client(const string& serverURI, const string& clientId,
  81. int maxBufferedMessages, iclient_persistence* persistence /*=nullptr*/)
  82. : serverURI_(serverURI), clientId_(clientId), mqttVersion_(MQTTVERSION_DEFAULT),
  83. persist_(nullptr), userCallback_(nullptr)
  84. {
  85. MQTTAsync_createOptions opts MQTTAsync_createOptions_initializer5;
  86. if (maxBufferedMessages != 0) {
  87. opts.sendWhileDisconnected = to_int(true);
  88. opts.maxBufferedMessages = maxBufferedMessages;
  89. }
  90. int rc = MQTTASYNC_SUCCESS;
  91. if (!persistence) {
  92. rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
  93. MQTTCLIENT_PERSISTENCE_NONE, nullptr, &opts);
  94. }
  95. else {
  96. persist_.reset(new MQTTClient_persistence {
  97. persistence,
  98. &iclient_persistence::persistence_open,
  99. &iclient_persistence::persistence_close,
  100. &iclient_persistence::persistence_put,
  101. &iclient_persistence::persistence_get,
  102. &iclient_persistence::persistence_remove,
  103. &iclient_persistence::persistence_keys,
  104. &iclient_persistence::persistence_clear,
  105. &iclient_persistence::persistence_containskey
  106. });
  107. rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
  108. MQTTCLIENT_PERSISTENCE_USER, persist_.get(),
  109. &opts);
  110. }
  111. if (rc != 0)
  112. throw exception(rc);
  113. }
  114. async_client::~async_client()
  115. {
  116. MQTTAsync_destroy(&cli_);
  117. }
  118. // --------------------------------------------------------------------------
  119. // Class static callbacks.
  120. // These are the callbacks directly from the C-lib. In each case the
  121. // 'context' should be the address of the async_client object that
  122. // registered the callback.
  123. // Callback for MQTTAsync_setConnected()
  124. // This is installed with the normall callbacks and with a call to
  125. // reconnect() to indicate that it succeeded. It is called after the token
  126. // is notified of success on a normal connect with callbacks.
  127. void async_client::on_connected(void* context, char* cause)
  128. {
  129. ::Log(TRACE_MIN, -1, "[cpp] on_connected");
  130. if (context) {
  131. async_client* cli = static_cast<async_client*>(context);
  132. callback* cb = cli->userCallback_;
  133. auto& connHandler = cli->connHandler_;
  134. string cause_str = cause ? string(cause) : string();
  135. if (cb)
  136. cb->connected(cause_str);
  137. if (connHandler)
  138. connHandler(cause_str);
  139. }
  140. }
  141. // Callback for when the connection is lost.
  142. // This is called from the MQTTAsync_connectionLost registered via
  143. // MQTTAsync_setCallbacks().
  144. void async_client::on_connection_lost(void *context, char *cause)
  145. {
  146. ::Log(TRACE_MIN, -1, "[cpp] on_connection lost");
  147. if (context) {
  148. async_client* cli = static_cast<async_client*>(context);
  149. callback* cb = cli->userCallback_;
  150. consumer_queue_type& que = cli->que_;
  151. auto& connLostHandler = cli->connLostHandler_;
  152. string cause_str = cause ? string(cause) : string();
  153. if (cb)
  154. cb->connection_lost(cause_str);
  155. if (connLostHandler)
  156. connLostHandler(cause_str);
  157. if (que)
  158. que->put(const_message_ptr{});
  159. }
  160. }
  161. // Callback for when a subscribed message arrives.
  162. // This is called from the MQTTAsync_messageArrived registered via
  163. // MQTTAsync_setCallbacks().
  164. int async_client::on_message_arrived(void* context, char* topicName, int topicLen,
  165. MQTTAsync_message* msg)
  166. {
  167. if (context) {
  168. async_client* cli = static_cast<async_client*>(context);
  169. callback* cb = cli->userCallback_;
  170. consumer_queue_type& que = cli->que_;
  171. message_handler& msgHandler = cli->msgHandler_;
  172. if (cb || que || msgHandler) {
  173. size_t len = (topicLen == 0) ? strlen(topicName) : size_t(topicLen);
  174. string topic(topicName, topicName+len);
  175. auto m = message::create(std::move(topic), *msg);
  176. if (msgHandler)
  177. msgHandler(m);
  178. if (cb)
  179. cb->message_arrived(m);
  180. if (que)
  181. que->put(m);
  182. }
  183. }
  184. MQTTAsync_freeMessage(&msg);
  185. MQTTAsync_free(topicName);
  186. // TODO: Should the user code determine the return value?
  187. // The Java version does doesn't seem to...
  188. return to_int(true);
  189. }
  190. // Callback to indicate that a message was delivered to the server.
  191. // It is called for a message with a QOS >= 1, but it happens before the
  192. // on_success() call for the token. Thus we don't have the underlying
  193. // MQTTAsync_token of the outgoing message at the time of this callback.
  194. //
  195. // *** So using the Async C library we have no way to match this msgID with
  196. // a delivery_token object. So this is useless to us.
  197. //
  198. // So, all in all, this callback in it's current implementation seems rather
  199. // redundant.
  200. //
  201. #if 0
  202. void async_client::on_delivery_complete(void* context, MQTTAsync_token msgID)
  203. {
  204. if (context) {
  205. async_client* m = static_cast<async_client*>(context);
  206. callback* cb = m->get_callback();
  207. if (cb) {
  208. delivery_token_ptr tok = m->get_pending_delivery_token(msgID);
  209. cb->delivery_complete(tok);
  210. }
  211. }
  212. }
  213. #endif
  214. // --------------------------------------------------------------------------
  215. // Private methods
  216. void async_client::add_token(token_ptr tok)
  217. {
  218. if (tok) {
  219. guard g(lock_);
  220. pendingTokens_.push_back(tok);
  221. }
  222. }
  223. void async_client::add_token(delivery_token_ptr tok)
  224. {
  225. if (tok) {
  226. guard g(lock_);
  227. pendingDeliveryTokens_.push_back(tok);
  228. }
  229. }
  230. // Note that we uniquely identify a token by the address of its raw pointer,
  231. // since the message ID is not unique.
  232. void async_client::remove_token(token* tok)
  233. {
  234. if (!tok)
  235. return;
  236. guard g(lock_);
  237. for (auto p=pendingDeliveryTokens_.begin();
  238. p!=pendingDeliveryTokens_.end(); ++p) {
  239. if (p->get() == tok) {
  240. delivery_token_ptr dtok = *p;
  241. pendingDeliveryTokens_.erase(p);
  242. // If there's a user callback registered, we can now call
  243. // delivery_complete()
  244. if (userCallback_) {
  245. const_message_ptr msg = dtok->get_message();
  246. if (msg && msg->get_qos() > 0) {
  247. callback* cb = userCallback_;
  248. g.unlock();
  249. cb->delivery_complete(dtok);
  250. }
  251. }
  252. return;
  253. }
  254. }
  255. for (auto p=pendingTokens_.begin(); p!=pendingTokens_.end(); ++p) {
  256. if (p->get() == tok) {
  257. pendingTokens_.erase(p);
  258. return;
  259. }
  260. }
  261. }
  262. // --------------------------------------------------------------------------
  263. // Callback management
  264. void async_client::set_callback(callback& cb)
  265. {
  266. guard g(lock_);
  267. userCallback_ = &cb;
  268. int rc = MQTTAsync_setConnected(cli_, this, &async_client::on_connected);
  269. if (rc == MQTTASYNC_SUCCESS) {
  270. rc = MQTTAsync_setCallbacks(cli_, this,
  271. &async_client::on_connection_lost,
  272. &async_client::on_message_arrived,
  273. nullptr /*&async_client::on_delivery_complete*/);
  274. }
  275. else
  276. MQTTAsync_setConnected(cli_, nullptr, nullptr);
  277. if (rc != MQTTASYNC_SUCCESS) {
  278. userCallback_ = nullptr;
  279. throw exception(rc);
  280. }
  281. }
  282. void async_client::disable_callbacks()
  283. {
  284. // TODO: It would be nice to disable callbacks at the C library level,
  285. // but the setCallback function currently does not accept a nullptr for
  286. // the "message arrived" parameter. So, for now we send it an empty
  287. // lambda function.
  288. int rc = MQTTAsync_setCallbacks(cli_, this, nullptr,
  289. [](void*,char*,int,MQTTAsync_message*) -> int {return to_int(true);},
  290. nullptr);
  291. if (rc != MQTTASYNC_SUCCESS)
  292. throw exception(rc);
  293. }
  294. void async_client::set_connected_handler(connection_handler cb)
  295. {
  296. connHandler_ = cb;
  297. check_ret(::MQTTAsync_setConnected(cli_, this,
  298. &async_client::on_connected));
  299. }
  300. void async_client::set_connection_lost_handler(connection_handler cb)
  301. {
  302. connLostHandler_ = cb;
  303. check_ret(::MQTTAsync_setConnectionLostCallback(cli_, this,
  304. &async_client::on_connection_lost));
  305. }
  306. void async_client::set_message_callback(message_handler cb)
  307. {
  308. msgHandler_ = cb;
  309. check_ret(::MQTTAsync_setMessageArrivedCallback(cli_, this,
  310. &async_client::on_message_arrived));
  311. }
  312. // --------------------------------------------------------------------------
  313. // Connect
  314. token_ptr async_client::connect()
  315. {
  316. return connect(connect_options());
  317. }
  318. token_ptr async_client::connect(connect_options opts)
  319. {
  320. // TODO: We really should get (or update) this value from the response
  321. // (when the server confirms the requested version)
  322. mqttVersion_ = opts.opts_.MQTTVersion;
  323. // TODO: If connTok_ is non-null, there could be a pending connect
  324. // which might complete after creating/assigning a new one. If that
  325. // happened, the callback would have the context address of the previous
  326. // token which was destroyed. So for now, keep the old one alive within
  327. // this function, and check the behavior of the C library...
  328. auto tmpTok = connTok_;
  329. connTok_ = token::create(token::Type::CONNECT, *this);
  330. add_token(connTok_);
  331. opts.set_token(connTok_);
  332. int rc = MQTTAsync_connect(cli_, &opts.opts_);
  333. if (rc != MQTTASYNC_SUCCESS) {
  334. remove_token(connTok_);
  335. connTok_.reset();
  336. throw exception(rc);
  337. }
  338. return connTok_;
  339. }
  340. token_ptr async_client::connect(connect_options opts, void* userContext,
  341. iaction_listener& cb)
  342. {
  343. // TODO: We really should get this value from the response (when
  344. // the server confirms the requested version)
  345. mqttVersion_ = opts.opts_.MQTTVersion;
  346. auto tmpTok = connTok_;
  347. connTok_ = token::create(token::Type::CONNECT, *this, userContext, cb);
  348. add_token(connTok_);
  349. opts.set_token(connTok_);
  350. int rc = MQTTAsync_connect(cli_, &opts.opts_);
  351. if (rc != MQTTASYNC_SUCCESS) {
  352. remove_token(connTok_);
  353. connTok_.reset();
  354. throw exception(rc);
  355. }
  356. return connTok_;
  357. }
  358. // --------------------------------------------------------------------------
  359. // Re-connect
  360. token_ptr async_client::reconnect()
  361. {
  362. auto tok = connTok_;
  363. if (!tok)
  364. throw exception(MQTTASYNC_FAILURE, "Can't reconnect before a successful connect");
  365. tok->reset();
  366. add_token(tok);
  367. int rc = MQTTAsync_setConnected(cli_, this, &async_client::on_connected);
  368. if (rc == MQTTASYNC_SUCCESS)
  369. rc = MQTTAsync_reconnect(cli_);
  370. if (rc != MQTTASYNC_SUCCESS) {
  371. remove_token(tok);
  372. throw exception(rc);
  373. }
  374. return tok;
  375. }
  376. // --------------------------------------------------------------------------
  377. // Disconnect
  378. token_ptr async_client::disconnect(disconnect_options opts)
  379. {
  380. auto tok = token::create(token::Type::DISCONNECT, *this);
  381. add_token(tok);
  382. opts.set_token(tok, mqttVersion_);
  383. int rc = MQTTAsync_disconnect(cli_, &opts.opts_);
  384. if (rc != MQTTASYNC_SUCCESS) {
  385. remove_token(tok);
  386. throw exception(rc);
  387. }
  388. return tok;
  389. }
  390. token_ptr async_client::disconnect(int timeout, void* userContext, iaction_listener& cb)
  391. {
  392. auto tok = token::create(token::Type::DISCONNECT, *this, userContext, cb);
  393. add_token(tok);
  394. disconnect_options opts(timeout);
  395. opts.set_token(tok, mqttVersion_);
  396. int rc = MQTTAsync_disconnect(cli_, &opts.opts_);
  397. if (rc != MQTTASYNC_SUCCESS) {
  398. remove_token(tok);
  399. throw exception(rc);
  400. }
  401. return tok;
  402. }
  403. // --------------------------------------------------------------------------
  404. // Queries
  405. delivery_token_ptr async_client::get_pending_delivery_token(int msgID) const
  406. {
  407. // Messages with QOS=1 or QOS=2 that require a response/acknowledge should
  408. // have a non-zero 16-bit message ID. The library keeps the token objects
  409. // for all of these messages that are in flight. When the acknowledge comes
  410. // back from the broker, the C++ library can look up the token from the
  411. // msgID and signal it, indicating completion.
  412. if (msgID > 0) {
  413. guard g(lock_);
  414. for (const auto& t : pendingDeliveryTokens_) {
  415. if (t->get_message_id() == msgID)
  416. return t;
  417. }
  418. }
  419. return delivery_token_ptr();
  420. }
  421. std::vector<delivery_token_ptr> async_client::get_pending_delivery_tokens() const
  422. {
  423. std::vector<delivery_token_ptr> toks;
  424. guard g(lock_);
  425. for (const auto& t : pendingDeliveryTokens_) {
  426. if (t->get_message_id() > 0) {
  427. toks.push_back(t);
  428. }
  429. }
  430. return toks;
  431. }
  432. // --------------------------------------------------------------------------
  433. // Publish
  434. delivery_token_ptr async_client::publish(string_ref topic, const void* payload,
  435. size_t n, int qos, bool retained)
  436. {
  437. auto msg = message::create(std::move(topic), payload, n, qos, retained);
  438. return publish(std::move(msg));
  439. }
  440. delivery_token_ptr async_client::publish(string_ref topic, binary_ref payload,
  441. int qos, bool retained)
  442. {
  443. auto msg = message::create(std::move(topic), std::move(payload), qos, retained);
  444. return publish(std::move(msg));
  445. }
  446. delivery_token_ptr async_client::publish(string_ref topic,
  447. const void* payload, size_t n,
  448. int qos, bool retained, void* userContext,
  449. iaction_listener& cb)
  450. {
  451. auto msg = message::create(std::move(topic), payload, n, qos, retained);
  452. return publish(std::move(msg), userContext, cb);
  453. }
  454. delivery_token_ptr async_client::publish(const_message_ptr msg)
  455. {
  456. auto tok = delivery_token::create(*this, msg);
  457. add_token(tok);
  458. delivery_response_options rspOpts(tok, mqttVersion_);
  459. int rc = MQTTAsync_sendMessage(cli_, msg->get_topic().c_str(),
  460. &(msg->msg_), &rspOpts.opts_);
  461. if (rc == MQTTASYNC_SUCCESS) {
  462. tok->set_message_id(rspOpts.opts_.token);
  463. }
  464. else {
  465. remove_token(tok);
  466. throw exception(rc);
  467. }
  468. return tok;
  469. }
  470. delivery_token_ptr async_client::publish(const_message_ptr msg,
  471. void* userContext, iaction_listener& cb)
  472. {
  473. delivery_token_ptr tok = delivery_token::create(*this, msg, userContext, cb);
  474. add_token(tok);
  475. delivery_response_options rspOpts(tok, mqttVersion_);
  476. int rc = MQTTAsync_sendMessage(cli_, msg->get_topic().c_str(),
  477. &(msg->msg_), &rspOpts.opts_);
  478. if (rc == MQTTASYNC_SUCCESS) {
  479. tok->set_message_id(rspOpts.opts_.token);
  480. }
  481. else {
  482. remove_token(tok);
  483. throw exception(rc);
  484. }
  485. return tok;
  486. }
  487. // --------------------------------------------------------------------------
  488. // Subscribe
  489. token_ptr async_client::subscribe(const string& topicFilter, int qos,
  490. const subscribe_options& opts /*=subscribe_options()*/)
  491. {
  492. auto tok = token::create(token::Type::SUBSCRIBE, *this, topicFilter);
  493. tok->set_num_expected(0); // Indicates non-array response for single val
  494. add_token(tok);
  495. response_options rspOpts(tok, mqttVersion_);
  496. rspOpts.set_subscribe_options(opts);
  497. int rc = MQTTAsync_subscribe(cli_, topicFilter.c_str(), qos, &rspOpts.opts_);
  498. if (rc != MQTTASYNC_SUCCESS) {
  499. remove_token(tok);
  500. throw exception(rc);
  501. }
  502. return tok;
  503. }
  504. token_ptr async_client::subscribe(const string& topicFilter, int qos,
  505. void* userContext, iaction_listener& cb,
  506. const subscribe_options& opts /*=subscribe_options()*/)
  507. {
  508. auto tok = token::create(token::Type::SUBSCRIBE, *this, topicFilter,
  509. userContext, cb);
  510. tok->set_num_expected(0);
  511. add_token(tok);
  512. response_options rspOpts(tok, mqttVersion_);
  513. rspOpts.set_subscribe_options(opts);
  514. int rc = MQTTAsync_subscribe(cli_, topicFilter.c_str(), qos, &rspOpts.opts_);
  515. if (rc != MQTTASYNC_SUCCESS) {
  516. remove_token(tok);
  517. throw exception(rc);
  518. }
  519. return tok;
  520. }
  521. token_ptr async_client::subscribe(const_string_collection_ptr topicFilters,
  522. const qos_collection& qos,
  523. const std::vector<subscribe_options>& opts
  524. /*=std::vector<subscribe_options>()*/)
  525. {
  526. size_t n = topicFilters->size();
  527. if (n != qos.size())
  528. throw std::invalid_argument("Collection sizes don't match");
  529. auto tok = token::create(token::Type::SUBSCRIBE, *this, topicFilters);
  530. tok->set_num_expected(n);
  531. add_token(tok);
  532. response_options rspOpts(tok, mqttVersion_);
  533. rspOpts.set_subscribe_options(opts);
  534. int rc = MQTTAsync_subscribeMany(cli_, int(n), topicFilters->c_arr(),
  535. const_cast<int*>(qos.data()), &rspOpts.opts_);
  536. if (rc != MQTTASYNC_SUCCESS) {
  537. remove_token(tok);
  538. throw exception(rc);
  539. }
  540. return tok;
  541. }
  542. token_ptr async_client::subscribe(const_string_collection_ptr topicFilters,
  543. const qos_collection& qos,
  544. void* userContext, iaction_listener& cb,
  545. const std::vector<subscribe_options>& opts
  546. /*=std::vector<subscribe_options>()*/)
  547. {
  548. size_t n = topicFilters->size();
  549. if (n != qos.size())
  550. throw std::invalid_argument("Collection sizes don't match");
  551. auto tok = token::create(token::Type::SUBSCRIBE, *this,
  552. topicFilters, userContext, cb);
  553. tok->set_num_expected(n);
  554. add_token(tok);
  555. response_options rspOpts(tok, mqttVersion_);
  556. rspOpts.set_subscribe_options(opts);
  557. int rc = MQTTAsync_subscribeMany(cli_, int(n), topicFilters->c_arr(),
  558. const_cast<int*>(qos.data()), &rspOpts.opts_);
  559. if (rc != MQTTASYNC_SUCCESS) {
  560. remove_token(tok);
  561. throw exception(rc);
  562. }
  563. return tok;
  564. }
  565. // --------------------------------------------------------------------------
  566. // Unsubscribe
  567. token_ptr async_client::unsubscribe(const string& topicFilter)
  568. {
  569. auto tok = token::create(token::Type::UNSUBSCRIBE, *this, topicFilter);
  570. tok->set_num_expected(0); // Indicates non-array response for single val
  571. add_token(tok);
  572. response_options rspOpts(tok, mqttVersion_);
  573. int rc = MQTTAsync_unsubscribe(cli_, topicFilter.c_str(), &rspOpts.opts_);
  574. if (rc != MQTTASYNC_SUCCESS) {
  575. remove_token(tok);
  576. throw exception(rc);
  577. }
  578. return tok;
  579. }
  580. token_ptr async_client::unsubscribe(const_string_collection_ptr topicFilters)
  581. {
  582. size_t n = topicFilters->size();
  583. auto tok = token::create(token::Type::UNSUBSCRIBE, *this, topicFilters);
  584. tok->set_num_expected(n);
  585. add_token(tok);
  586. response_options rspOpts(tok, mqttVersion_);
  587. int rc = MQTTAsync_unsubscribeMany(cli_, int(n),
  588. topicFilters->c_arr(), &rspOpts.opts_);
  589. if (rc != MQTTASYNC_SUCCESS) {
  590. remove_token(tok);
  591. throw exception(rc);
  592. }
  593. return tok;
  594. }
  595. token_ptr async_client::unsubscribe(const_string_collection_ptr topicFilters,
  596. void* userContext, iaction_listener& cb)
  597. {
  598. size_t n = topicFilters->size();
  599. auto tok = token::create(token::Type::UNSUBSCRIBE, *this, topicFilters,
  600. userContext, cb);
  601. tok->set_num_expected(n);
  602. add_token(tok);
  603. response_options rspOpts(tok, mqttVersion_);
  604. int rc = MQTTAsync_unsubscribeMany(cli_, int(n), topicFilters->c_arr(), &rspOpts.opts_);
  605. if (rc != MQTTASYNC_SUCCESS) {
  606. remove_token(tok);
  607. throw exception(rc);
  608. }
  609. return tok;
  610. }
  611. token_ptr async_client::unsubscribe(const string& topicFilter,
  612. void* userContext, iaction_listener& cb)
  613. {
  614. auto tok = token::create(token::Type::UNSUBSCRIBE , *this, topicFilter,
  615. userContext, cb);
  616. add_token(tok);
  617. response_options rspOpts(tok, mqttVersion_);
  618. int rc = MQTTAsync_unsubscribe(cli_, topicFilter.c_str(), &rspOpts.opts_);
  619. if (rc != MQTTASYNC_SUCCESS) {
  620. remove_token(tok);
  621. throw exception(rc);
  622. }
  623. return tok;
  624. }
  625. // --------------------------------------------------------------------------
  626. void async_client::start_consuming()
  627. {
  628. // Make sure callbacks don't happen while we update the que, etc
  629. disable_callbacks();
  630. // TODO: Should we replace user callback?
  631. //userCallback_ = nullptr;
  632. que_.reset(new thread_queue<const_message_ptr>);
  633. int rc = MQTTAsync_setCallbacks(cli_, this,
  634. &async_client::on_connection_lost,
  635. &async_client::on_message_arrived,
  636. nullptr);
  637. if (rc != MQTTASYNC_SUCCESS)
  638. throw exception(rc);
  639. }
  640. void async_client::stop_consuming()
  641. {
  642. try {
  643. disable_callbacks();
  644. que_.reset();
  645. }
  646. catch (...) {
  647. que_.reset();
  648. throw;
  649. }
  650. }
  651. /////////////////////////////////////////////////////////////////////////////
  652. // end namespace mqtt
  653. }