token.cpp 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. // token.cpp
  2. /*******************************************************************************
  3. * Copyright (c) 2013-2019 Frank Pagliughi <fpagliughi@mindspring.com>
  4. *
  5. * All rights reserved. This program and the accompanying materials
  6. * are made available under the terms of the Eclipse Public License v1.0
  7. * and Eclipse Distribution License v1.0 which accompany this distribution.
  8. *
  9. * The Eclipse Public License is available at
  10. * http://www.eclipse.org/legal/epl-v10.html
  11. * and the Eclipse Distribution License is available at
  12. * http://www.eclipse.org/org/documents/edl-v10.php.
  13. *
  14. * Contributors:
  15. * Frank Pagliughi - initial implementation and documentation
  16. * Frank Pagliughi - MQTT v5 support
  17. *******************************************************************************/
  18. #include "mqtt/token.h"
  19. #include "mqtt/async_client.h"
  20. #include <cstring>
  21. #include <iostream>
  22. /////////////////////////////////////////////////////////////////////////////
  23. // Paho C logger
  24. enum LOG_LEVELS {
  25. INVALID_LEVEL = -1,
  26. TRACE_MAX = 1,
  27. TRACE_MED,
  28. TRACE_MIN,
  29. TRACE_PROTOCOL,
  30. LOG_PROTOCOL = TRACE_PROTOCOL,
  31. LOG_ERROR,
  32. LOG_SEVERE,
  33. LOG_FATAL,
  34. };
  35. extern "C" {
  36. void Log(enum LOG_LEVELS, int, const char *, ...);
  37. }
  38. /////////////////////////////////////////////////////////////////////////////
  39. namespace mqtt {
  40. // --------------------------------------------------------------------------
  41. // Constructors
  42. token::token(Type typ, iasync_client& cli, const_string_collection_ptr topics)
  43. : type_(typ), cli_(&cli), rc_(0), reasonCode_(ReasonCode::SUCCESS),
  44. msgId_(MQTTAsync_token(0)), topics_(topics),
  45. userContext_(nullptr), listener_(nullptr), nExpected_(0),
  46. complete_(false)
  47. {
  48. }
  49. token::token(Type typ, iasync_client& cli, const_string_collection_ptr topics,
  50. void* userContext, iaction_listener& cb)
  51. : type_(typ), cli_(&cli), rc_(0), reasonCode_(ReasonCode::SUCCESS),
  52. msgId_(MQTTAsync_token(0)), topics_(topics),
  53. userContext_(userContext), listener_(&cb), nExpected_(0),
  54. complete_(false)
  55. {
  56. }
  57. token::token(Type typ, iasync_client& cli, MQTTAsync_token tok)
  58. : type_(typ), cli_(&cli), rc_(0), reasonCode_(ReasonCode::SUCCESS),
  59. msgId_(tok), userContext_(nullptr),
  60. listener_(nullptr), nExpected_(0), complete_(false)
  61. {
  62. }
  63. // --------------------------------------------------------------------------
  64. // Class static callbacks.
  65. // These are the callbacks directly from the C library.
  66. // The 'context' is a raw pointer to the token object.
  67. void token::on_success(void* context, MQTTAsync_successData* rsp)
  68. {
  69. if (context)
  70. static_cast<token*>(context)->on_success(rsp);
  71. }
  72. void token::on_success5(void* context, MQTTAsync_successData5* rsp)
  73. {
  74. if (context)
  75. static_cast<token*>(context)->on_success5(rsp);
  76. }
  77. void token::on_failure(void* context, MQTTAsync_failureData* rsp)
  78. {
  79. if (context)
  80. static_cast<token*>(context)->on_failure(rsp);
  81. }
  82. void token::on_failure5(void* context, MQTTAsync_failureData5* rsp)
  83. {
  84. if (context)
  85. static_cast<token*>(context)->on_failure5(rsp);
  86. }
  87. // --------------------------------------------------------------------------
  88. // Object callbacks
  89. //
  90. // The success callback for MQTT v3 connections
  91. //
  92. void token::on_success(MQTTAsync_successData* rsp)
  93. {
  94. ::Log(TRACE_MIN, -1, "[cpp] on_success");
  95. unique_lock g(lock_);
  96. iaction_listener* listener = listener_;
  97. if (rsp) {
  98. msgId_ = rsp->token;
  99. switch (type_) {
  100. case Type::CONNECT:
  101. connRsp_.reset(new connect_response(rsp));
  102. break;
  103. case Type::SUBSCRIBE:
  104. subRsp_.reset(new subscribe_response(nExpected_, rsp));
  105. break;
  106. case Type::UNSUBSCRIBE:
  107. unsubRsp_.reset(new unsubscribe_response(rsp));
  108. break;
  109. default:
  110. // The others don't have responses
  111. break;
  112. }
  113. }
  114. rc_ = MQTTASYNC_SUCCESS;
  115. complete_ = true;
  116. g.unlock();
  117. // Note: callback always completes before the object is signaled.
  118. if (listener)
  119. listener->on_success(*this);
  120. cond_.notify_all();
  121. cli_->remove_token(this);
  122. }
  123. //
  124. // The success callback for MQTT v5 connections
  125. //
  126. void token::on_success5(MQTTAsync_successData5* rsp)
  127. {
  128. ::Log(TRACE_MIN, -1, "[cpp] on_success5");
  129. unique_lock g(lock_);
  130. iaction_listener* listener = listener_;
  131. if (rsp) {
  132. msgId_ = rsp->token;
  133. reasonCode_ = ReasonCode(rsp->reasonCode);
  134. switch (type_) {
  135. case Type::CONNECT:
  136. connRsp_.reset(new connect_response(rsp));
  137. break;
  138. case Type::SUBSCRIBE:
  139. subRsp_.reset(new subscribe_response(rsp));
  140. break;
  141. case Type::UNSUBSCRIBE:
  142. unsubRsp_.reset(new unsubscribe_response(rsp));
  143. break;
  144. default:
  145. // The others don't have responses
  146. break;
  147. }
  148. }
  149. rc_ = MQTTASYNC_SUCCESS;
  150. complete_ = true;
  151. g.unlock();
  152. // Note: callback always completes before the object is signaled.
  153. if (listener)
  154. listener->on_success(*this);
  155. cond_.notify_all();
  156. cli_->remove_token(this);
  157. }
  158. //
  159. // The failure callback for MQTT v3 connections
  160. //
  161. void token::on_failure(MQTTAsync_failureData* rsp)
  162. {
  163. ::Log(TRACE_MIN, -1, "[cpp] on_failure");
  164. unique_lock g(lock_);
  165. iaction_listener* listener = listener_;
  166. if (rsp) {
  167. msgId_ = rsp->token;
  168. rc_ = rsp->code;
  169. // HACK: For backward compatability with v3 connections
  170. reasonCode_ = ReasonCode(MQTTPP_V3_CODE);
  171. if (rsp->message)
  172. errMsg_ = string(rsp->message);
  173. }
  174. else {
  175. rc_ = -1;
  176. }
  177. complete_ = true;
  178. g.unlock();
  179. // Note: callback always completes before the obect is signaled.
  180. if (listener)
  181. listener->on_failure(*this);
  182. cond_.notify_all();
  183. cli_->remove_token(this);
  184. }
  185. //
  186. // The failure callback for MQTT v5 connections
  187. //
  188. void token::on_failure5(MQTTAsync_failureData5* rsp)
  189. {
  190. ::Log(TRACE_MIN, -1, "[cpp] on_failure5");
  191. unique_lock g(lock_);
  192. iaction_listener* listener = listener_;
  193. if (rsp) {
  194. msgId_ = rsp->token;
  195. reasonCode_ = ReasonCode(rsp->reasonCode);
  196. //props_ = properties(rsp->properties);
  197. rc_ = rsp->code;
  198. if (rsp->message)
  199. errMsg_ = string(rsp->message);
  200. }
  201. else {
  202. rc_ = -1;
  203. }
  204. complete_ = true;
  205. g.unlock();
  206. // Note: callback always completes before the obect is signaled.
  207. if (listener)
  208. listener->on_failure(*this);
  209. cond_.notify_all();
  210. cli_->remove_token(this);
  211. }
  212. // --------------------------------------------------------------------------
  213. // API
  214. void token::reset()
  215. {
  216. guard g(lock_);
  217. complete_ = false;
  218. rc_ = MQTTASYNC_SUCCESS;
  219. reasonCode_ = ReasonCode::SUCCESS;
  220. errMsg_.clear();
  221. }
  222. void token::wait()
  223. {
  224. unique_lock g(lock_);
  225. cond_.wait(g, [this]{return complete_;});
  226. check_ret();
  227. }
  228. connect_response token::get_connect_response() const
  229. {
  230. if (type_ != Type::CONNECT)
  231. throw bad_cast();
  232. unique_lock g(lock_);
  233. cond_.wait(g, [this]{return complete_;});
  234. check_ret();
  235. if (!connRsp_)
  236. throw missing_response("connect");
  237. return *connRsp_;
  238. }
  239. subscribe_response token::get_subscribe_response() const
  240. {
  241. if (type_ != Type::SUBSCRIBE)
  242. throw bad_cast();
  243. unique_lock g(lock_);
  244. cond_.wait(g, [this]{return complete_;});
  245. check_ret();
  246. if (!subRsp_)
  247. throw missing_response("subscribe");
  248. return *subRsp_;
  249. }
  250. unsubscribe_response token::get_unsubscribe_response() const
  251. {
  252. if (type_ != Type::UNSUBSCRIBE)
  253. throw bad_cast();
  254. unique_lock g(lock_);
  255. cond_.wait(g, [this]{return complete_;});
  256. check_ret();
  257. if (!unsubRsp_)
  258. throw missing_response("unsubscribe");
  259. return *unsubRsp_;
  260. }
  261. /////////////////////////////////////////////////////////////////////////////
  262. // end namespace mqtt
  263. }