test95.c 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277
  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2022 IBM Corp., Ian Craggs
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs - correct some compile warnings
  16. * Ian Craggs - add binary will message test
  17. * Ian Craggs - MQTT V5 updates
  18. *******************************************************************************/
  19. /**
  20. * @file
  21. * Offline buffering and automatic reconnect tests for the Paho Asynchronous MQTT C client
  22. *
  23. */
  24. #include "MQTTAsync.h"
  25. #include <string.h>
  26. #include <stdlib.h>
  27. #include "Thread.h"
  28. #if !defined(_WINDOWS)
  29. #include <sys/time.h>
  30. #include <sys/socket.h>
  31. #include <unistd.h>
  32. #include <errno.h>
  33. #else
  34. #include <windows.h>
  35. #endif
  36. char unique[50]; // unique suffix/prefix to add to clientid/topic etc
  37. #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
  38. void usage(void)
  39. {
  40. printf("help!!\n");
  41. exit(EXIT_FAILURE);
  42. }
  43. struct Options
  44. {
  45. char* connection; /**< connection to system under test. */
  46. char* proxy_connection; /**< connection to proxy */
  47. int verbose;
  48. int test_no;
  49. } options =
  50. {
  51. "localhost:1883",
  52. "localhost:1884",
  53. 0,
  54. 0,
  55. };
  56. void getopts(int argc, char** argv)
  57. {
  58. int count = 1;
  59. while (count < argc)
  60. {
  61. if (strcmp(argv[count], "--test_no") == 0)
  62. {
  63. if (++count < argc)
  64. options.test_no = atoi(argv[count]);
  65. else
  66. usage();
  67. }
  68. else if (strcmp(argv[count], "--connection") == 0)
  69. {
  70. if (++count < argc)
  71. options.connection = argv[count];
  72. else
  73. usage();
  74. }
  75. else if (strcmp(argv[count], "--proxy_connection") == 0)
  76. {
  77. if (++count < argc)
  78. options.proxy_connection = argv[count];
  79. else
  80. usage();
  81. }
  82. else if (strcmp(argv[count], "--verbose") == 0)
  83. options.verbose = 1;
  84. count++;
  85. }
  86. }
  87. #define LOGA_DEBUG 0
  88. #define LOGA_INFO 1
  89. #include <stdarg.h>
  90. #include <time.h>
  91. #include <sys/timeb.h>
  92. void MyLog(int LOGA_level, char* format, ...)
  93. {
  94. static char msg_buf[256];
  95. va_list args;
  96. #if defined(_WIN32) || defined(_WINDOWS)
  97. struct timeb ts;
  98. #else
  99. struct timeval ts;
  100. #endif
  101. struct tm timeinfo;
  102. if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
  103. return;
  104. #if defined(_WIN32) || defined(_WINDOWS)
  105. ftime(&ts);
  106. localtime_s(&timeinfo, &ts.time);
  107. #else
  108. gettimeofday(&ts, NULL);
  109. localtime_r(&ts.tv_sec, &timeinfo);
  110. #endif
  111. strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo);
  112. #if defined(_WIN32) || defined(_WINDOWS)
  113. sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
  114. #else
  115. sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000L);
  116. #endif
  117. va_start(args, format);
  118. vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
  119. va_end(args);
  120. printf("%s\n", msg_buf);
  121. fflush(stdout);
  122. }
  123. void MySleep(long milliseconds)
  124. {
  125. #if defined(_WIN32) || defined(_WIN64)
  126. Sleep(milliseconds);
  127. #else
  128. usleep(milliseconds*1000);
  129. #endif
  130. }
  131. #if defined(_WIN32) || defined(_WINDOWS)
  132. #define START_TIME_TYPE DWORD
  133. static DWORD start_time = 0;
  134. START_TIME_TYPE start_clock(void)
  135. {
  136. return GetTickCount();
  137. }
  138. #elif defined(AIX)
  139. #define START_TIME_TYPE struct timespec
  140. START_TIME_TYPE start_clock(void)
  141. {
  142. static struct timespec start;
  143. clock_gettime(CLOCK_REALTIME, &start);
  144. return start;
  145. }
  146. #else
  147. #define START_TIME_TYPE struct timeval
  148. /* TODO - unused - remove? static struct timeval start_time; */
  149. START_TIME_TYPE start_clock(void)
  150. {
  151. struct timeval start_time;
  152. gettimeofday(&start_time, NULL);
  153. return start_time;
  154. }
  155. #endif
  156. #if defined(_WIN32)
  157. long elapsed(START_TIME_TYPE start_time)
  158. {
  159. return GetTickCount() - start_time;
  160. }
  161. #elif defined(AIX)
  162. #define assert(a)
  163. long elapsed(struct timespec start)
  164. {
  165. struct timespec now, res;
  166. clock_gettime(CLOCK_REALTIME, &now);
  167. ntimersub(now, start, res);
  168. return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
  169. }
  170. #else
  171. long elapsed(START_TIME_TYPE start_time)
  172. {
  173. struct timeval now, res;
  174. gettimeofday(&now, NULL);
  175. timersub(&now, &start_time, &res);
  176. return (res.tv_sec) * 1000 + (res.tv_usec) / 1000;
  177. }
  178. #endif
  179. #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
  180. #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
  181. #define MAXMSGS 30;
  182. int tests = 0;
  183. int failures = 0;
  184. FILE* xml;
  185. START_TIME_TYPE global_start_time;
  186. char output[3000];
  187. char* cur_output = output;
  188. void write_test_result(void)
  189. {
  190. long duration = elapsed(global_start_time);
  191. fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
  192. if (cur_output != output)
  193. {
  194. fprintf(xml, "%s", output);
  195. cur_output = output;
  196. }
  197. fprintf(xml, "</testcase>\n");
  198. }
  199. void myassert(char* filename, int lineno, char* description, int value,
  200. char* format, ...)
  201. {
  202. ++tests;
  203. if (!value)
  204. {
  205. va_list args;
  206. ++failures;
  207. MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s", filename,
  208. lineno, description);
  209. va_start(args, format);
  210. vprintf(format, args);
  211. va_end(args);
  212. cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
  213. description, filename, lineno);
  214. }
  215. else
  216. MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s",
  217. filename, lineno, description);
  218. }
  219. void logProperties(MQTTProperties *props)
  220. {
  221. int i = 0;
  222. for (i = 0; i < props->count; ++i)
  223. {
  224. int id = props->array[i].identifier;
  225. const char* name = MQTTPropertyName(id);
  226. char* intformat = "Property name %s value %d";
  227. switch (MQTTProperty_getType(id))
  228. {
  229. case MQTTPROPERTY_TYPE_BYTE:
  230. MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.byte);
  231. break;
  232. case MQTTPROPERTY_TYPE_TWO_BYTE_INTEGER:
  233. MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer2);
  234. break;
  235. case MQTTPROPERTY_TYPE_FOUR_BYTE_INTEGER:
  236. MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer4);
  237. break;
  238. case MQTTPROPERTY_TYPE_VARIABLE_BYTE_INTEGER:
  239. MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer4);
  240. break;
  241. case MQTTPROPERTY_TYPE_BINARY_DATA:
  242. case MQTTPROPERTY_TYPE_UTF_8_ENCODED_STRING:
  243. MyLog(LOGA_DEBUG, "Property name %s value len %.*s", name,
  244. props->array[i].value.data.len, props->array[i].value.data.data);
  245. break;
  246. case MQTTPROPERTY_TYPE_UTF_8_STRING_PAIR:
  247. MyLog(LOGA_DEBUG, "Property name %s key %.*s value %.*s", name,
  248. props->array[i].value.data.len, props->array[i].value.data.data,
  249. props->array[i].value.value.len, props->array[i].value.value.data);
  250. break;
  251. }
  252. }
  253. }
  254. void waitForNoPendingTokens(MQTTAsync c)
  255. {
  256. int i = 0, rc = 0, count = 0;
  257. MQTTAsync_token *tokens;
  258. /* acks for outgoing messages could arrive after incoming exchanges are complete */
  259. do
  260. {
  261. rc = MQTTAsync_getPendingTokens(c, &tokens);
  262. assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  263. i = 0;
  264. if (tokens)
  265. {
  266. while (tokens[i] != -1)
  267. ++i;
  268. MQTTAsync_free(tokens);
  269. }
  270. if (i > 0)
  271. MySleep(100);
  272. }
  273. while (i > 0 && ++count < 10);
  274. assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
  275. }
  276. void assert3PendingTokens(MQTTAsync c)
  277. {
  278. int i = 0, rc = 0;
  279. MQTTAsync_token *tokens;
  280. rc = MQTTAsync_getPendingTokens(c, &tokens);
  281. assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  282. i = 0;
  283. if (tokens)
  284. {
  285. while (tokens[i] != -1)
  286. ++i;
  287. MQTTAsync_free(tokens);
  288. }
  289. assert("Number of getPendingTokens should be 3", i == 3, "i was %d ", i);
  290. }
  291. /*********************************************************************
  292. Tests: offline buffering - sending messages while disconnected
  293. 1. send some messages while disconnected, check that they are sent
  294. 2. repeat test 1 using serverURIs
  295. 3. repeat test 1 using auto reconnect
  296. 4. repeat test 2 using auto reconnect
  297. 5. check max-buffered
  298. 6. check auto-reconnect parms alter behaviour as expected
  299. Tests: automatic reconnect
  300. - check that connected() is called
  301. - check that reconnect() causes reconnect attempt
  302. - check that reconnect() fails if no connect has been previously attempted
  303. *********************************************************************/
  304. /*********************************************************************
  305. Test1: offline buffering - sending messages while disconnected
  306. 1. call connect
  307. 2. use proxy to disconnect the client
  308. 3. while the client is disconnected, send more messages
  309. 4. when the client reconnects, check that those messages are sent
  310. *********************************************************************/
  311. int test1_will_message_received = 0;
  312. int test1_messages_received = 0;
  313. int test1_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  314. {
  315. MQTTAsync c = (MQTTAsync)context;
  316. static int message_count = 0;
  317. static int first = 1;
  318. if (first == 1)
  319. {
  320. first = 0;
  321. return 0; /* to force queue persistence */
  322. }
  323. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  324. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  325. test1_will_message_received = 1;
  326. else
  327. test1_messages_received++;
  328. if (message->struct_version == 1)
  329. {
  330. assert("Properties count should be > 0", message->properties.count > 0,
  331. "Properties count was %d\n", message->properties.count);
  332. logProperties(&message->properties);
  333. }
  334. MQTTAsync_freeMessage(&message);
  335. MQTTAsync_free(topicName);
  336. return 1;
  337. }
  338. int test1Finished = 0;
  339. int test1OnFailureCalled = 0;
  340. void test1cOnFailure(void* context, MQTTAsync_failureData5* response)
  341. {
  342. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  343. test1OnFailureCalled++;
  344. test1Finished = 1;
  345. }
  346. void test1dOnFailure(void* context, MQTTAsync_failureData5* response)
  347. {
  348. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  349. test1OnFailureCalled++;
  350. test1Finished = 1;
  351. }
  352. void test1cOnConnect(void* context, MQTTAsync_successData5* response)
  353. {
  354. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  355. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
  356. MQTTAsync c = (MQTTAsync)context;
  357. int rc;
  358. static int done = 0;
  359. if (done == 0)
  360. {
  361. /* send a message to the proxy to break the connection */
  362. pubmsg.payload = "TERMINATE";
  363. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  364. pubmsg.qos = 0;
  365. pubmsg.retained = 0;
  366. rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  367. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  368. done = 1; /* only do this once */
  369. }
  370. }
  371. int test1dReady = 0;
  372. char willTopic[100];
  373. char test_topic[100];
  374. void test1donSubscribe(void* context, MQTTAsync_successData5* response)
  375. {
  376. MQTTAsync c = (MQTTAsync)context;
  377. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
  378. response->reasonCode);
  379. test1dReady = 1;
  380. }
  381. void test1dOnConnect(void* context, MQTTAsync_successData5* response)
  382. {
  383. MQTTAsync c = (MQTTAsync)context;
  384. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  385. int rc;
  386. int qoss[2] = {2, 2};
  387. char* topics[2] = {willTopic, test_topic};
  388. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  389. opts.onSuccess5 = test1donSubscribe;
  390. opts.context = c;
  391. rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  392. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  393. if (rc != MQTTASYNC_SUCCESS)
  394. test1Finished = 1;
  395. }
  396. int test1c_connected = 0;
  397. void test1cConnected(void* context, char* cause)
  398. {
  399. MQTTAsync c = (MQTTAsync)context;
  400. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  401. test1c_connected = 1;
  402. }
  403. int test1(struct Options options)
  404. {
  405. char* testname = "test1";
  406. int subsqos = 2;
  407. MQTTAsync c, d;
  408. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
  409. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  410. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  411. int rc = 0;
  412. int count = 0;
  413. char clientidc[70];
  414. char clientidd[70];
  415. int i = 0;
  416. MQTTProperties props = MQTTProperties_initializer;
  417. MQTTProperties willProps = MQTTProperties_initializer;
  418. MQTTProperty property;
  419. sprintf(willTopic, "paho-test95-1-%s", unique);
  420. sprintf(clientidc, "paho-test95-1-c-%s", unique);
  421. sprintf(clientidd, "paho-test95-1-d-%s", unique);
  422. sprintf(test_topic, "paho-test95-1-test topic %s", unique);
  423. test1Finished = 0;
  424. failures = 0;
  425. MyLog(LOGA_INFO, "Starting Offline buffering 1 - messages while disconnected");
  426. fprintf(xml, "<testcase classname=\"test1\" name=\"%s\"", testname);
  427. global_start_time = start_clock();
  428. createOptions.sendWhileDisconnected = 1;
  429. createOptions.MQTTVersion = MQTTVERSION_5;
  430. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc,
  431. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOptions);
  432. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  433. if (rc != MQTTASYNC_SUCCESS)
  434. {
  435. MQTTAsync_destroy(&c);
  436. goto exit;
  437. }
  438. createOptions.sendWhileDisconnected = 0;
  439. createOptions.MQTTVersion = MQTTVERSION_5;
  440. rc = MQTTAsync_createWithOptions(&d, options.connection, clientidd,
  441. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOptions);
  442. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  443. if (rc != MQTTASYNC_SUCCESS)
  444. {
  445. MQTTAsync_destroy(&c);
  446. goto exit;
  447. }
  448. opts.keepAliveInterval = 20;
  449. opts.cleanstart = 1;
  450. rc = MQTTAsync_setCallbacks(d, d, NULL, test1_messageArrived, NULL);
  451. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  452. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  453. opts.context = d;
  454. opts.onSuccess5 = test1dOnConnect;
  455. opts.onFailure5 = test1dOnFailure;
  456. MyLog(LOGA_DEBUG, "Connecting client d");
  457. rc = MQTTAsync_connect(d, &opts);
  458. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  459. if (rc != MQTTASYNC_SUCCESS)
  460. {
  461. failures++;
  462. goto exit;
  463. }
  464. /* wait until d is ready: connected and subscribed */
  465. count = 0;
  466. while (!test1dReady && ++count < 10000)
  467. MySleep(100);
  468. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  469. rc = MQTTAsync_setConnected(c, c, test1cConnected);
  470. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  471. /* let client c go: connect, and send disconnect command to proxy */
  472. opts.will = &wopts;
  473. opts.will->message = "will message";
  474. opts.will->qos = 1;
  475. opts.will->retained = 0;
  476. opts.will->topicName = willTopic;
  477. property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
  478. property.value.data.data = "test user property";
  479. property.value.data.len = (int)strlen(property.value.data.data);
  480. property.value.value.data = "test user property value";
  481. property.value.value.len = (int)strlen(property.value.value.data);
  482. MQTTProperties_add(&willProps, &property);
  483. opts.willProperties = &willProps;
  484. opts.onSuccess5 = test1cOnConnect;
  485. opts.onFailure5 = test1cOnFailure;
  486. opts.context = c;
  487. opts.cleanstart = 0;
  488. property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
  489. property.value.integer4 = 30;
  490. MQTTProperties_add(&props, &property);
  491. opts.connectProperties = &props;
  492. MyLog(LOGA_DEBUG, "Connecting client c");
  493. rc = MQTTAsync_connect(c, &opts);
  494. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  495. MQTTProperties_free(&props);
  496. MQTTProperties_free(&willProps);
  497. if (rc != MQTTASYNC_SUCCESS)
  498. {
  499. failures++;
  500. goto exit;
  501. }
  502. /* wait for will message */
  503. while (!test1_will_message_received && ++count < 10000)
  504. MySleep(100);
  505. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  506. test1c_connected = 0;
  507. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  508. for (i = 0; i < 3; ++i)
  509. {
  510. char buf[50];
  511. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  512. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  513. MQTTProperty property;
  514. MQTTProperties props = MQTTProperties_initializer;
  515. sprintf(buf, "QoS %d message", i);
  516. pubmsg.payload = buf;
  517. pubmsg.payloadlen = (int)strlen(pubmsg.payload) + 1;
  518. pubmsg.qos = i;
  519. pubmsg.retained = 0;
  520. MyLog(LOGA_DEBUG, "Sending qos %d message", pubmsg.qos);
  521. property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
  522. property.value.data.data = "test user property";
  523. property.value.data.len = (int)strlen(property.value.data.data);
  524. property.value.value.data = "test user property value";
  525. property.value.value.len = (int)strlen(property.value.value.data);
  526. MQTTProperties_add(&props, &property);
  527. pubmsg.properties = props;
  528. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  529. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  530. MQTTProperties_free(&props);
  531. MyLog(LOGA_DEBUG, "Sent qos %d message, token %d", pubmsg.qos, opts.token);
  532. }
  533. assert3PendingTokens(c);
  534. /* destroy and recreate to read from persistence */
  535. MyLog(LOGA_DEBUG, "Destroy and recreate client c");
  536. MQTTAsync_destroy(&c);
  537. createOptions.sendWhileDisconnected = 1;
  538. createOptions.MQTTVersion = MQTTVERSION_5;
  539. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc,
  540. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOptions);
  541. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  542. if (rc != MQTTASYNC_SUCCESS)
  543. {
  544. MQTTAsync_destroy(&c);
  545. goto exit;
  546. }
  547. assert3PendingTokens(c);
  548. rc = MQTTAsync_setConnected(c, c, test1cConnected);
  549. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  550. opts.will = &wopts;
  551. opts.onSuccess5 = test1cOnConnect;
  552. opts.onFailure5 = test1cOnFailure;
  553. opts.context = c;
  554. opts.cleanstart = 0;
  555. MyLog(LOGA_DEBUG, "Reconnecting client c");
  556. test1c_connected = 0;
  557. rc = MQTTAsync_connect(c, &opts);
  558. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  559. MQTTProperties_free(&props);
  560. if (rc != MQTTASYNC_SUCCESS)
  561. {
  562. failures++;
  563. goto exit;
  564. }
  565. /* wait for client to be reconnected */
  566. while (!test1c_connected && ++count < 10000)
  567. MySleep(100);
  568. /* wait for messages to be received */
  569. while (test1_messages_received < 3 && ++count < 10000)
  570. MySleep(100);
  571. waitForNoPendingTokens(c);
  572. rc = MQTTAsync_disconnect(c, NULL);
  573. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  574. rc = MQTTAsync_disconnect(d, NULL);
  575. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  576. exit:
  577. MQTTAsync_destroy(&c);
  578. MQTTAsync_destroy(&d);
  579. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  580. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  581. write_test_result();
  582. return failures;
  583. }
  584. /*********************************************************************
  585. Test2: offline buffering - sending messages while disconnected
  586. 1. call connect
  587. 2. use proxy to disconnect the client
  588. 3. while the client is disconnected, send more messages
  589. 4. when the client reconnects, check that those messages are sent
  590. *********************************************************************/
  591. int test2_will_message_received = 0;
  592. int test2_messages_received = 0;
  593. int test2_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  594. {
  595. MQTTAsync c = (MQTTAsync)context;
  596. static int message_count = 0;
  597. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  598. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  599. test2_will_message_received = 1;
  600. else
  601. test2_messages_received++;
  602. MQTTAsync_freeMessage(&message);
  603. MQTTAsync_free(topicName);
  604. return 1;
  605. }
  606. int test2Finished = 0;
  607. int test2OnFailureCalled = 0;
  608. void test2cOnFailure(void* context, MQTTAsync_failureData5* response)
  609. {
  610. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  611. test2OnFailureCalled++;
  612. test2Finished = 1;
  613. }
  614. void test2dOnFailure(void* context, MQTTAsync_failureData5* response)
  615. {
  616. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  617. test2OnFailureCalled++;
  618. test2Finished = 1;
  619. }
  620. void test2cOnConnect(void* context, MQTTAsync_successData5* response)
  621. {
  622. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  623. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
  624. MQTTAsync c = (MQTTAsync)context;
  625. int rc;
  626. /* send a message to the proxy to break the connection */
  627. pubmsg.payload = "TERMINATE";
  628. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  629. pubmsg.qos = 0;
  630. pubmsg.retained = 0;
  631. rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  632. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  633. }
  634. int test2dReady = 0;
  635. char willTopic[100];
  636. char test_topic[100];
  637. void test2donSubscribe(void* context, MQTTAsync_successData5* response)
  638. {
  639. MQTTAsync c = (MQTTAsync)context;
  640. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
  641. response->reasonCode);
  642. test2dReady = 1;
  643. }
  644. void test2dOnConnect(void* context, MQTTAsync_successData5* response)
  645. {
  646. MQTTAsync c = (MQTTAsync)context;
  647. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  648. int rc;
  649. int qoss[2] = {2, 2};
  650. char* topics[2] = {willTopic, test_topic};
  651. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  652. opts.onSuccess5 = test2donSubscribe;
  653. opts.context = c;
  654. rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  655. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  656. if (rc != MQTTASYNC_SUCCESS)
  657. test2Finished = 1;
  658. }
  659. int test2c_connected = 0;
  660. void test2cConnected(void* context, char* cause)
  661. {
  662. MQTTAsync c = (MQTTAsync)context;
  663. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  664. test2c_connected = 1;
  665. }
  666. int test2(struct Options options)
  667. {
  668. char* testname = "test2";
  669. int subsqos = 2;
  670. MQTTAsync c, d;
  671. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
  672. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  673. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  674. int rc = 0;
  675. int count = 0;
  676. char clientidc[70];
  677. char clientidd[70];
  678. int i = 0;
  679. char *URIs[2] = {"rubbish", options.proxy_connection};
  680. MQTTProperties props = MQTTProperties_initializer;
  681. MQTTProperty property;
  682. sprintf(willTopic, "paho-test95-2-%s", unique);
  683. sprintf(clientidc, "paho-test95-2-c-%s", unique);
  684. sprintf(clientidd, "paho-test95-2-d-%s", unique);
  685. sprintf(test_topic, "paho-test95-2-test topic %s", unique);
  686. test2Finished = 0;
  687. failures = 0;
  688. MyLog(LOGA_INFO, "Starting Offline buffering 2 - messages while disconnected with serverURIs");
  689. fprintf(xml, "<testcase classname=\"test2\" name=\"%s\"", testname);
  690. global_start_time = start_clock();
  691. createOptions.sendWhileDisconnected = 1;
  692. createOptions.MQTTVersion = MQTTVERSION_5;
  693. rc = MQTTAsync_createWithOptions(&c, "not used", clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  694. NULL, &createOptions);
  695. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  696. if (rc != MQTTASYNC_SUCCESS)
  697. {
  698. MQTTAsync_destroy(&c);
  699. goto exit;
  700. }
  701. createOptions.sendWhileDisconnected = 0;
  702. createOptions.MQTTVersion = MQTTVERSION_5;
  703. rc = MQTTAsync_createWithOptions(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT,
  704. NULL, &createOptions);
  705. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  706. if (rc != MQTTASYNC_SUCCESS)
  707. {
  708. MQTTAsync_destroy(&c);
  709. goto exit;
  710. }
  711. opts.keepAliveInterval = 20;
  712. opts.cleanstart = 1;
  713. opts.MQTTVersion = MQTTVERSION_5;
  714. rc = MQTTAsync_setCallbacks(d, d, NULL, test2_messageArrived, NULL);
  715. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  716. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  717. opts.context = d;
  718. opts.onSuccess5 = test2dOnConnect;
  719. opts.onFailure5 = test2dOnFailure;
  720. MyLog(LOGA_DEBUG, "Connecting client d");
  721. rc = MQTTAsync_connect(d, &opts);
  722. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  723. if (rc != MQTTASYNC_SUCCESS)
  724. {
  725. failures++;
  726. goto exit;
  727. }
  728. /* wait until d is ready: connected and subscribed */
  729. count = 0;
  730. while (!test2dReady && ++count < 10000)
  731. MySleep(100);
  732. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  733. rc = MQTTAsync_setConnected(c, c, test2cConnected);
  734. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  735. /* let client c go: connect, and send disconnect command to proxy */
  736. opts.will = &wopts;
  737. opts.will->message = "will message";
  738. opts.will->qos = 1;
  739. opts.will->retained = 0;
  740. opts.will->topicName = willTopic;
  741. opts.onSuccess5 = test2cOnConnect;
  742. opts.onFailure5 = test2cOnFailure;
  743. opts.context = c;
  744. opts.MQTTVersion = MQTTVERSION_5;
  745. opts.cleanstart = 0;
  746. property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
  747. property.value.integer4 = 30;
  748. MQTTProperties_add(&props, &property);
  749. opts.connectProperties = &props;
  750. opts.serverURIs = URIs;
  751. opts.serverURIcount = 2;
  752. MyLog(LOGA_DEBUG, "Connecting client c");
  753. rc = MQTTAsync_connect(c, &opts);
  754. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  755. if (rc != MQTTASYNC_SUCCESS)
  756. {
  757. failures++;
  758. goto exit;
  759. }
  760. MQTTProperties_free(&props);
  761. /* wait for will message */
  762. while (!test2_will_message_received && ++count < 10000)
  763. MySleep(100);
  764. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  765. test2c_connected = 0;
  766. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  767. for (i = 0; i < 3; ++i)
  768. {
  769. char buf[50];
  770. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  771. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  772. sprintf(buf, "QoS %d message", i);
  773. pubmsg.payload = buf;
  774. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  775. pubmsg.qos = i;
  776. pubmsg.retained = 0;
  777. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  778. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  779. }
  780. assert3PendingTokens(c);
  781. rc = MQTTAsync_reconnect(c);
  782. assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  783. /* wait for client to be reconnected */
  784. while (!test2c_connected && ++count < 10000)
  785. MySleep(100);
  786. /* wait for success or failure callback */
  787. while (test2_messages_received < 3 && ++count < 10000)
  788. MySleep(100);
  789. waitForNoPendingTokens(c);
  790. rc = MQTTAsync_disconnect(c, NULL);
  791. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  792. rc = MQTTAsync_disconnect(d, NULL);
  793. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  794. exit:
  795. MySleep(200);
  796. MQTTAsync_destroy(&c);
  797. MQTTAsync_destroy(&d);
  798. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  799. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  800. write_test_result();
  801. return failures;
  802. }
  803. /*********************************************************************
  804. test3: offline buffering - sending messages while disconnected
  805. 1. call connect
  806. 2. use proxy to disconnect the client
  807. 3. while the client is disconnected, send more messages
  808. 4. when the client auto reconnects, check that those messages are sent
  809. *********************************************************************/
  810. int test3_will_message_received = 0;
  811. int test3_messages_received = 0;
  812. int test3_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  813. {
  814. MQTTAsync c = (MQTTAsync)context;
  815. static int message_count = 0;
  816. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  817. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  818. test3_will_message_received = 1;
  819. else
  820. test3_messages_received++;
  821. MQTTAsync_freeMessage(&message);
  822. MQTTAsync_free(topicName);
  823. return 1;
  824. }
  825. int test3Finished = 0;
  826. int test3OnFailureCalled = 0;
  827. void test3cOnFailure(void* context, MQTTAsync_failureData5* response)
  828. {
  829. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  830. test3OnFailureCalled++;
  831. test3Finished = 1;
  832. }
  833. void test3dOnFailure(void* context, MQTTAsync_failureData5* response)
  834. {
  835. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  836. test3OnFailureCalled++;
  837. test3Finished = 1;
  838. }
  839. void test3cOnConnect(void* context, MQTTAsync_successData5* response)
  840. {
  841. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  842. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
  843. MQTTAsync c = (MQTTAsync)context;
  844. int rc;
  845. /* send a message to the proxy to break the connection */
  846. pubmsg.payload = "TERMINATE";
  847. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  848. pubmsg.qos = 0;
  849. pubmsg.retained = 0;
  850. rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  851. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  852. }
  853. int test3dReady = 0;
  854. char willTopic[100];
  855. char test_topic[100];
  856. void test3donSubscribe(void* context, MQTTAsync_successData5* response)
  857. {
  858. MQTTAsync c = (MQTTAsync)context;
  859. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
  860. response->reasonCode);
  861. test3dReady = 1;
  862. }
  863. void test3dOnConnect(void* context, MQTTAsync_successData5* response)
  864. {
  865. MQTTAsync c = (MQTTAsync)context;
  866. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  867. int rc;
  868. int qoss[2] = {2, 2};
  869. char* topics[2] = {willTopic, test_topic};
  870. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  871. opts.onSuccess5 = test3donSubscribe;
  872. opts.context = c;
  873. rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  874. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  875. if (rc != MQTTASYNC_SUCCESS)
  876. test3Finished = 1;
  877. }
  878. int test3c_connected = 0;
  879. void test3cConnected(void* context, char* cause)
  880. {
  881. MQTTAsync c = (MQTTAsync)context;
  882. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  883. test3c_connected = 1;
  884. }
  885. int test3(struct Options options)
  886. {
  887. char* testname = "test3";
  888. int subsqos = 2;
  889. MQTTAsync c, d;
  890. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
  891. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  892. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  893. int rc = 0;
  894. int count = 0;
  895. char clientidc[70];
  896. char clientidd[70];
  897. int i = 0;
  898. MQTTProperties props = MQTTProperties_initializer;
  899. MQTTProperty property;
  900. sprintf(willTopic, "paho-test95-3-%s", unique);
  901. sprintf(clientidc, "paho-test95-3-c-%s", unique);
  902. sprintf(clientidd, "paho-test95-3-d-%s", unique);
  903. sprintf(test_topic, "paho-test95-3-test topic %s", unique);
  904. test3Finished = 0;
  905. failures = 0;
  906. MyLog(LOGA_INFO, "Starting Offline buffering 3 - messages while disconnected");
  907. fprintf(xml, "<testcase classname=\"test3\" name=\"%s\"", testname);
  908. global_start_time = start_clock();
  909. createOptions.sendWhileDisconnected = 1;
  910. createOptions.MQTTVersion = MQTTVERSION_5;
  911. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  912. NULL, &createOptions);
  913. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  914. if (rc != MQTTASYNC_SUCCESS)
  915. {
  916. MQTTAsync_destroy(&c);
  917. goto exit;
  918. }
  919. createOptions.sendWhileDisconnected = 0;
  920. createOptions.MQTTVersion = MQTTVERSION_5;
  921. rc = MQTTAsync_createWithOptions(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT,
  922. NULL, &createOptions);
  923. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  924. if (rc != MQTTASYNC_SUCCESS)
  925. {
  926. MQTTAsync_destroy(&c);
  927. goto exit;
  928. }
  929. opts.keepAliveInterval = 20;
  930. opts.cleanstart = 1;
  931. opts.MQTTVersion = MQTTVERSION_5;
  932. rc = MQTTAsync_setCallbacks(d, d, NULL, test3_messageArrived, NULL);
  933. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  934. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  935. opts.context = d;
  936. opts.onSuccess5 = test3dOnConnect;
  937. opts.onFailure5 = test3dOnFailure;
  938. MyLog(LOGA_DEBUG, "Connecting client d");
  939. rc = MQTTAsync_connect(d, &opts);
  940. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  941. if (rc != MQTTASYNC_SUCCESS)
  942. {
  943. failures++;
  944. goto exit;
  945. }
  946. /* wait until d is ready: connected and subscribed */
  947. count = 0;
  948. while (!test3dReady && ++count < 10000)
  949. MySleep(100);
  950. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  951. rc = MQTTAsync_setConnected(c, c, test3cConnected);
  952. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  953. /* let client c go: connect, and send disconnect command to proxy */
  954. opts.will = &wopts;
  955. opts.will->message = "will message";
  956. opts.will->qos = 1;
  957. opts.will->retained = 0;
  958. opts.will->topicName = willTopic;
  959. opts.onSuccess5 = test3cOnConnect;
  960. opts.onFailure5 = test3cOnFailure;
  961. opts.context = c;
  962. opts.cleanstart = 0;
  963. property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
  964. property.value.integer4 = 30;
  965. MQTTProperties_add(&props, &property);
  966. opts.connectProperties = &props;
  967. opts.MQTTVersion = MQTTVERSION_5;
  968. opts.automaticReconnect = 1;
  969. MyLog(LOGA_DEBUG, "Connecting client c");
  970. rc = MQTTAsync_connect(c, &opts);
  971. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  972. MQTTProperties_free(&props);
  973. if (rc != MQTTASYNC_SUCCESS)
  974. {
  975. failures++;
  976. goto exit;
  977. }
  978. /* wait for will message */
  979. while (!test3_will_message_received && ++count < 10000)
  980. MySleep(100);
  981. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  982. test3c_connected = 0;
  983. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  984. for (i = 0; i < 3; ++i)
  985. {
  986. char buf[50];
  987. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  988. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  989. sprintf(buf, "QoS %d message", i);
  990. pubmsg.payload = buf;
  991. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  992. pubmsg.qos = i;
  993. pubmsg.retained = 0;
  994. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  995. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  996. }
  997. assert3PendingTokens(c);
  998. /* wait for client to be reconnected */
  999. while (!test3c_connected && ++count < 10000)
  1000. MySleep(100);
  1001. /* wait for success or failure callback */
  1002. while (test3_messages_received < 3 && ++count < 10000)
  1003. MySleep(100);
  1004. waitForNoPendingTokens(c);
  1005. rc = MQTTAsync_disconnect(c, NULL);
  1006. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1007. rc = MQTTAsync_disconnect(d, NULL);
  1008. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1009. exit:
  1010. MySleep(200);
  1011. MQTTAsync_destroy(&c);
  1012. MQTTAsync_destroy(&d);
  1013. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  1014. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  1015. write_test_result();
  1016. return failures;
  1017. }
  1018. /*********************************************************************
  1019. test4: offline buffering - sending messages while disconnected
  1020. 1. call connect
  1021. 2. use proxy to disconnect the client
  1022. 3. while the client is disconnected, send more messages
  1023. 4. when the client auto reconnects, check that those messages are sent
  1024. *********************************************************************/
  1025. int test4_will_message_received = 0;
  1026. int test4_messages_received = 0;
  1027. int test4_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1028. {
  1029. MQTTAsync c = (MQTTAsync)context;
  1030. static int message_count = 0;
  1031. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen,
  1032. message->payload);
  1033. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  1034. test4_will_message_received = 1;
  1035. else
  1036. test4_messages_received++;
  1037. MQTTAsync_freeMessage(&message);
  1038. MQTTAsync_free(topicName);
  1039. return 1;
  1040. }
  1041. int test4Finished = 0;
  1042. int test4OnFailureCalled = 0;
  1043. void test4cOnFailure(void* context, MQTTAsync_failureData5* response)
  1044. {
  1045. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  1046. test4OnFailureCalled++;
  1047. test4Finished = 1;
  1048. }
  1049. void test4dOnFailure(void* context, MQTTAsync_failureData5* response)
  1050. {
  1051. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  1052. test4OnFailureCalled++;
  1053. test4Finished = 1;
  1054. }
  1055. void test4cOnConnect(void* context, MQTTAsync_successData5* response)
  1056. {
  1057. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1058. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
  1059. MQTTAsync c = (MQTTAsync)context;
  1060. int rc;
  1061. /* send a message to the proxy to break the connection */
  1062. pubmsg.payload = "TERMINATE";
  1063. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  1064. pubmsg.qos = 0;
  1065. pubmsg.retained = 0;
  1066. rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  1067. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1068. }
  1069. int test4dReady = 0;
  1070. char willTopic[100];
  1071. char test_topic[100];
  1072. void test4donSubscribe(void* context, MQTTAsync_successData5* response)
  1073. {
  1074. MQTTAsync c = (MQTTAsync)context;
  1075. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
  1076. response->reasonCode);
  1077. test4dReady = 1;
  1078. }
  1079. void test4dOnConnect(void* context, MQTTAsync_successData5* response)
  1080. {
  1081. MQTTAsync c = (MQTTAsync)context;
  1082. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1083. int rc;
  1084. int qoss[2] = {2, 2};
  1085. char* topics[2] = {willTopic, test_topic};
  1086. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  1087. opts.onSuccess5 = test4donSubscribe;
  1088. opts.context = c;
  1089. rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  1090. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1091. if (rc != MQTTASYNC_SUCCESS)
  1092. test4Finished = 1;
  1093. }
  1094. int test4c_connected = 0;
  1095. void test4cConnected(void* context, char* cause)
  1096. {
  1097. MQTTAsync c = (MQTTAsync)context;
  1098. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  1099. test4c_connected = 1;
  1100. }
  1101. int test4(struct Options options)
  1102. {
  1103. char* testname = "test4";
  1104. int subsqos = 2;
  1105. MQTTAsync c, d;
  1106. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
  1107. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  1108. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  1109. int rc = 0;
  1110. int count = 0;
  1111. char clientidc[70];
  1112. char clientidd[70];
  1113. int i = 0;
  1114. char *URIs[2] = {"rubbish", options.proxy_connection};
  1115. MQTTProperties props = MQTTProperties_initializer;
  1116. MQTTProperty property;
  1117. sprintf(willTopic, "paho-test95-4-%s", unique);
  1118. sprintf(clientidc, "paho-test95-4-c-%s", unique);
  1119. sprintf(clientidd, "paho-test95-4-d-%s", unique);
  1120. sprintf(test_topic, "paho-test95-4-test topic %s", unique);
  1121. test4Finished = 0;
  1122. failures = 0;
  1123. MyLog(LOGA_INFO, "Starting Offline buffering 4 - messages while disconnected with serverURIs");
  1124. fprintf(xml, "<testcase classname=\"test4\" name=\"%s\"", testname);
  1125. global_start_time = start_clock();
  1126. createOptions.sendWhileDisconnected = 1;
  1127. createOptions.MQTTVersion = MQTTVERSION_5;
  1128. rc = MQTTAsync_createWithOptions(&c, "not used", clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1129. NULL, &createOptions);
  1130. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1131. if (rc != MQTTASYNC_SUCCESS)
  1132. {
  1133. MQTTAsync_destroy(&c);
  1134. goto exit;
  1135. }
  1136. createOptions.sendWhileDisconnected = 0;
  1137. createOptions.MQTTVersion = MQTTVERSION_5;
  1138. rc = MQTTAsync_createWithOptions(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1139. NULL, &createOptions);
  1140. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1141. if (rc != MQTTASYNC_SUCCESS)
  1142. {
  1143. MQTTAsync_destroy(&c);
  1144. goto exit;
  1145. }
  1146. opts.keepAliveInterval = 20;
  1147. opts.cleanstart = 1;
  1148. rc = MQTTAsync_setCallbacks(d, d, NULL, test4_messageArrived, NULL);
  1149. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1150. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  1151. opts.context = d;
  1152. opts.onSuccess5 = test4dOnConnect;
  1153. opts.onFailure5 = test4dOnFailure;
  1154. MyLog(LOGA_DEBUG, "Connecting client d");
  1155. rc = MQTTAsync_connect(d, &opts);
  1156. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1157. if (rc != MQTTASYNC_SUCCESS)
  1158. {
  1159. failures++;
  1160. goto exit;
  1161. }
  1162. /* wait until d is ready: connected and subscribed */
  1163. count = 0;
  1164. while (!test4dReady && ++count < 10000)
  1165. MySleep(100);
  1166. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1167. rc = MQTTAsync_setConnected(c, c, test4cConnected);
  1168. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1169. /* let client c go: connect, and send disconnect command to proxy */
  1170. opts.will = &wopts;
  1171. opts.will->message = "will message";
  1172. opts.will->qos = 1;
  1173. opts.will->retained = 0;
  1174. opts.will->topicName = willTopic;
  1175. opts.onSuccess5 = test4cOnConnect;
  1176. opts.onFailure5 = test4cOnFailure;
  1177. opts.context = c;
  1178. opts.cleanstart = 0;
  1179. property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
  1180. property.value.integer4 = 30;
  1181. MQTTProperties_add(&props, &property);
  1182. opts.connectProperties = &props;
  1183. opts.serverURIs = URIs;
  1184. opts.serverURIcount = 2;
  1185. opts.automaticReconnect = 1;
  1186. MyLog(LOGA_DEBUG, "Connecting client c");
  1187. rc = MQTTAsync_connect(c, &opts);
  1188. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1189. MQTTProperties_free(&props);
  1190. if (rc != MQTTASYNC_SUCCESS)
  1191. {
  1192. failures++;
  1193. goto exit;
  1194. }
  1195. /* wait for will message */
  1196. while (!test4_will_message_received && ++count < 10000)
  1197. MySleep(100);
  1198. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  1199. test4c_connected = 0;
  1200. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  1201. for (i = 0; i < 3; ++i)
  1202. {
  1203. char buf[50];
  1204. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1205. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1206. sprintf(buf, "QoS %d message", i);
  1207. pubmsg.payload = buf;
  1208. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  1209. pubmsg.qos = i;
  1210. pubmsg.retained = 0;
  1211. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  1212. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1213. }
  1214. assert3PendingTokens(c);
  1215. /* wait for client to be reconnected */
  1216. while (!test4c_connected && ++count < 10000)
  1217. MySleep(100);
  1218. /* wait for success or failure callback */
  1219. while (test4_messages_received < 3 && ++count < 10000)
  1220. MySleep(100);
  1221. waitForNoPendingTokens(c);
  1222. rc = MQTTAsync_disconnect(c, NULL);
  1223. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1224. rc = MQTTAsync_disconnect(d, NULL);
  1225. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1226. exit:
  1227. MySleep(200);
  1228. MQTTAsync_destroy(&c);
  1229. MQTTAsync_destroy(&d);
  1230. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  1231. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  1232. write_test_result();
  1233. return failures;
  1234. }
  1235. /*********************************************************************
  1236. test5: offline buffering - check max buffered
  1237. 1. call connect
  1238. 2. use proxy to disconnect the client
  1239. 3. while the client is disconnected, send more messages
  1240. 4. when the client reconnects, check that those messages are sent
  1241. *********************************************************************/
  1242. int test5_will_message_received = 0;
  1243. int test5_messages_received = 0;
  1244. int test5Finished = 0;
  1245. int test5OnFailureCalled = 0;
  1246. int test5c_connected = 0;
  1247. int test5_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1248. {
  1249. MQTTAsync c = (MQTTAsync)context;
  1250. static int message_count = 0;
  1251. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  1252. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  1253. test5_will_message_received = 1;
  1254. else
  1255. test5_messages_received++;
  1256. MQTTAsync_freeMessage(&message);
  1257. MQTTAsync_free(topicName);
  1258. return 1;
  1259. }
  1260. void test5cOnFailure(void* context, MQTTAsync_failureData5* response)
  1261. {
  1262. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  1263. test5OnFailureCalled++;
  1264. test5Finished = 1;
  1265. }
  1266. void test5dOnFailure(void* context, MQTTAsync_failureData5* response)
  1267. {
  1268. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  1269. test5OnFailureCalled++;
  1270. test5Finished = 1;
  1271. }
  1272. void test5cOnConnect(void* context, MQTTAsync_successData5* response)
  1273. {
  1274. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1275. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
  1276. MQTTAsync c = (MQTTAsync)context;
  1277. int rc;
  1278. /* send a message to the proxy to break the connection */
  1279. pubmsg.payload = "TERMINATE";
  1280. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  1281. pubmsg.qos = 0;
  1282. pubmsg.retained = 0;
  1283. rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  1284. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1285. }
  1286. int test5dReady = 0;
  1287. char willTopic[100];
  1288. char test_topic[100];
  1289. void test5donSubscribe(void* context, MQTTAsync_successData5* response)
  1290. {
  1291. MQTTAsync c = (MQTTAsync)context;
  1292. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
  1293. response->reasonCode);
  1294. test5dReady = 1;
  1295. }
  1296. void test5dOnConnect(void* context, MQTTAsync_successData5* response)
  1297. {
  1298. MQTTAsync c = (MQTTAsync)context;
  1299. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1300. int rc;
  1301. int qoss[2] = {2, 2};
  1302. char* topics[2] = {willTopic, test_topic};
  1303. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  1304. opts.onSuccess5 = test5donSubscribe;
  1305. opts.context = c;
  1306. rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  1307. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1308. if (rc != MQTTASYNC_SUCCESS)
  1309. test5Finished = 1;
  1310. }
  1311. void test5cConnected(void* context, char* cause)
  1312. {
  1313. MQTTAsync c = (MQTTAsync)context;
  1314. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  1315. test5c_connected = 1;
  1316. }
  1317. int test5(struct Options options)
  1318. {
  1319. char* testname = "test5";
  1320. int subsqos = 2;
  1321. MQTTAsync c, d;
  1322. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
  1323. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  1324. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  1325. int rc = 0;
  1326. int count = 0;
  1327. char clientidc[70];
  1328. char clientidd[70];
  1329. int i = 0;
  1330. MQTTProperties props = MQTTProperties_initializer;
  1331. MQTTProperty property;
  1332. sprintf(willTopic, "paho-test95-5-%s", unique);
  1333. sprintf(clientidc, "paho-test95-5-c-%s", unique);
  1334. sprintf(clientidd, "paho-test95-5-d-%s", unique);
  1335. sprintf(test_topic, "paho-test95-5-test topic %s", unique);
  1336. test5Finished = 0;
  1337. failures = 0;
  1338. MyLog(LOGA_INFO, "Starting Offline buffering 5 - max buffered");
  1339. fprintf(xml, "<testcase classname=\"test5\" name=\"%s\"", testname);
  1340. global_start_time = start_clock();
  1341. createOptions.sendWhileDisconnected = 1;
  1342. createOptions.maxBufferedMessages = 3;
  1343. createOptions.MQTTVersion = MQTTVERSION_5;
  1344. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1345. NULL, &createOptions);
  1346. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1347. if (rc != MQTTASYNC_SUCCESS)
  1348. {
  1349. MQTTAsync_destroy(&c);
  1350. goto exit;
  1351. }
  1352. createOptions.sendWhileDisconnected = 0;
  1353. createOptions.maxBufferedMessages = 1;
  1354. createOptions.MQTTVersion = MQTTVERSION_5;
  1355. rc = MQTTAsync_createWithOptions(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1356. NULL, &createOptions);
  1357. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1358. if (rc != MQTTASYNC_SUCCESS)
  1359. {
  1360. MQTTAsync_destroy(&c);
  1361. goto exit;
  1362. }
  1363. opts.keepAliveInterval = 20;
  1364. opts.cleanstart = 1;
  1365. rc = MQTTAsync_setCallbacks(d, d, NULL, test5_messageArrived, NULL);
  1366. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1367. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  1368. opts.context = d;
  1369. opts.onSuccess5 = test5dOnConnect;
  1370. opts.onFailure5 = test5dOnFailure;
  1371. opts.MQTTVersion = MQTTVERSION_5;
  1372. MyLog(LOGA_DEBUG, "Connecting client d");
  1373. rc = MQTTAsync_connect(d, &opts);
  1374. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1375. if (rc != MQTTASYNC_SUCCESS)
  1376. {
  1377. failures++;
  1378. goto exit;
  1379. }
  1380. /* wait until d is ready: connected and subscribed */
  1381. count = 0;
  1382. while (!test5dReady && ++count < 10000)
  1383. MySleep(100);
  1384. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1385. rc = MQTTAsync_setConnected(c, c, test5cConnected);
  1386. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1387. /* let client c go: connect, and send disconnect command to proxy */
  1388. opts.will = &wopts;
  1389. opts.will->message = "will message";
  1390. opts.will->qos = 1;
  1391. opts.will->retained = 0;
  1392. opts.will->topicName = willTopic;
  1393. opts.onSuccess5 = test5cOnConnect;
  1394. opts.onFailure5 = test5cOnFailure;
  1395. opts.context = c;
  1396. opts.cleanstart = 0;
  1397. opts.MQTTVersion = MQTTVERSION_5;
  1398. property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
  1399. property.value.integer4 = 30;
  1400. MQTTProperties_add(&props, &property);
  1401. opts.connectProperties = &props;
  1402. MyLog(LOGA_DEBUG, "Connecting client c");
  1403. rc = MQTTAsync_connect(c, &opts);
  1404. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1405. MQTTProperties_free(&props);
  1406. if (rc != MQTTASYNC_SUCCESS)
  1407. {
  1408. failures++;
  1409. goto exit;
  1410. }
  1411. /* wait for will message */
  1412. while (!test5_will_message_received && ++count < 10000)
  1413. MySleep(100);
  1414. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  1415. test5c_connected = 0;
  1416. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  1417. for (i = 0; i < 5; ++i)
  1418. {
  1419. char buf[50];
  1420. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1421. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1422. sprintf(buf, "QoS %d message", i);
  1423. pubmsg.payload = buf;
  1424. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  1425. pubmsg.qos = i % 3;
  1426. pubmsg.retained = 0;
  1427. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  1428. if (i <= 2)
  1429. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1430. else
  1431. assert("Bad rc from sendMessage", rc == MQTTASYNC_MAX_BUFFERED_MESSAGES, "rc was %d ", rc);
  1432. }
  1433. assert3PendingTokens(c);
  1434. rc = MQTTAsync_reconnect(c);
  1435. assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1436. /* wait for client to be reconnected */
  1437. while (!test5c_connected && ++count < 10000)
  1438. MySleep(100);
  1439. /* wait for success or failure callback */
  1440. while (test5_messages_received < 3 && ++count < 10000)
  1441. MySleep(100);
  1442. waitForNoPendingTokens(c);
  1443. rc = MQTTAsync_disconnect(c, NULL);
  1444. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1445. rc = MQTTAsync_disconnect(d, NULL);
  1446. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1447. exit:
  1448. MySleep(200);
  1449. MQTTAsync_destroy(&c);
  1450. MQTTAsync_destroy(&d);
  1451. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  1452. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  1453. write_test_result();
  1454. return failures;
  1455. }
  1456. int test6(struct Options options)
  1457. {
  1458. char* testname = "test6";
  1459. int subsqos = 2;
  1460. MQTTAsync c, d;
  1461. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
  1462. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  1463. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  1464. int rc = 0;
  1465. int count = 0;
  1466. char clientidc[70];
  1467. char clientidd[70];
  1468. int i = 0;
  1469. MQTTProperties props = MQTTProperties_initializer;
  1470. MQTTProperty property;
  1471. test5_will_message_received = 0;
  1472. test5_messages_received = 0;
  1473. test5Finished = 0;
  1474. test5OnFailureCalled = 0;
  1475. test5c_connected = 0;
  1476. sprintf(willTopic, "paho-test95-6-%s", unique);
  1477. sprintf(clientidc, "paho-test95-6-c-%s", unique);
  1478. sprintf(clientidd, "paho-test95-6-d-%s", unique);
  1479. sprintf(test_topic, "paho-test95-6-test topic %s", unique);
  1480. test5Finished = 0;
  1481. failures = 0;
  1482. MyLog(LOGA_INFO, "Starting Offline buffering 6 - max buffered with binary will");
  1483. fprintf(xml, "<testcase classname=\"test6\" name=\"%s\"", testname);
  1484. global_start_time = start_clock();
  1485. createOptions.sendWhileDisconnected = 1;
  1486. createOptions.maxBufferedMessages = 3;
  1487. createOptions.MQTTVersion = MQTTVERSION_5;
  1488. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1489. NULL, &createOptions);
  1490. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1491. if (rc != MQTTASYNC_SUCCESS)
  1492. {
  1493. MQTTAsync_destroy(&c);
  1494. goto exit;
  1495. }
  1496. createOptions.sendWhileDisconnected = 0;
  1497. createOptions.maxBufferedMessages = 1;
  1498. createOptions.MQTTVersion = MQTTVERSION_5;
  1499. rc = MQTTAsync_createWithOptions(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1500. NULL, &createOptions);
  1501. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1502. if (rc != MQTTASYNC_SUCCESS)
  1503. {
  1504. MQTTAsync_destroy(&c);
  1505. goto exit;
  1506. }
  1507. opts.keepAliveInterval = 20;
  1508. opts.cleanstart = 1;
  1509. opts.MQTTVersion = MQTTVERSION_5;
  1510. rc = MQTTAsync_setCallbacks(d, d, NULL, test5_messageArrived, NULL);
  1511. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1512. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  1513. opts.context = d;
  1514. opts.onSuccess5 = test5dOnConnect;
  1515. opts.onFailure5 = test5dOnFailure;
  1516. MyLog(LOGA_DEBUG, "Connecting client d");
  1517. rc = MQTTAsync_connect(d, &opts);
  1518. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1519. if (rc != MQTTASYNC_SUCCESS)
  1520. {
  1521. failures++;
  1522. goto exit;
  1523. }
  1524. /* wait until d is ready: connected and subscribed */
  1525. count = 0;
  1526. while (!test5dReady && ++count < 10000)
  1527. MySleep(100);
  1528. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1529. rc = MQTTAsync_setConnected(c, c, test5cConnected);
  1530. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1531. /* let client c go: connect, and send disconnect command to proxy */
  1532. opts.will = &wopts;
  1533. opts.will->payload.data = "will message";
  1534. opts.will->payload.len = (int)strlen(opts.will->payload.data) + 1;
  1535. opts.will->qos = 1;
  1536. opts.will->retained = 0;
  1537. opts.will->topicName = willTopic;
  1538. opts.onSuccess5 = test5cOnConnect;
  1539. opts.onFailure5 = test5cOnFailure;
  1540. opts.context = c;
  1541. opts.cleanstart = 0;
  1542. opts.MQTTVersion = MQTTVERSION_5;
  1543. property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
  1544. property.value.integer4 = 30;
  1545. MQTTProperties_add(&props, &property);
  1546. opts.connectProperties = &props;
  1547. MyLog(LOGA_DEBUG, "Connecting client c");
  1548. rc = MQTTAsync_connect(c, &opts);
  1549. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1550. MQTTProperties_free(&props);
  1551. if (rc != MQTTASYNC_SUCCESS)
  1552. {
  1553. failures++;
  1554. goto exit;
  1555. }
  1556. /* wait for will message */
  1557. while (!test5_will_message_received && ++count < 10000)
  1558. MySleep(100);
  1559. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  1560. test5c_connected = 0;
  1561. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  1562. for (i = 0; i < 5; ++i)
  1563. {
  1564. char buf[50];
  1565. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1566. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1567. sprintf(buf, "QoS %d message", i);
  1568. pubmsg.payload = buf;
  1569. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  1570. pubmsg.qos = i % 3;
  1571. pubmsg.retained = 0;
  1572. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  1573. if (i <= 2)
  1574. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1575. else
  1576. assert("Bad rc from sendMessage", rc == MQTTASYNC_MAX_BUFFERED_MESSAGES, "rc was %d ", rc);
  1577. }
  1578. assert3PendingTokens(c);
  1579. rc = MQTTAsync_reconnect(c);
  1580. assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1581. /* wait for client to be reconnected */
  1582. while (!test5c_connected && ++count < 10000)
  1583. MySleep(100);
  1584. /* wait for success or failure callback */
  1585. while (test5_messages_received < 3 && ++count < 10000)
  1586. MySleep(100);
  1587. waitForNoPendingTokens(c);
  1588. rc = MQTTAsync_disconnect(c, NULL);
  1589. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1590. rc = MQTTAsync_disconnect(d, NULL);
  1591. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1592. exit:
  1593. MySleep(200);
  1594. MQTTAsync_destroy(&c);
  1595. MQTTAsync_destroy(&d);
  1596. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  1597. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  1598. write_test_result();
  1599. return failures;
  1600. }
  1601. /*********************************************************************
  1602. Test7: Fill up TCP buffer with QoS 0 messages
  1603. *********************************************************************/
  1604. int test7c_connected = 0;
  1605. int test7_will_message_received = 0;
  1606. int test7_messages_received = 0;
  1607. int test7Finished = 0;
  1608. int test7OnFailureCalled = 0;
  1609. int test7dReady = 0;
  1610. int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1611. {
  1612. MQTTAsync c = (MQTTAsync)context;
  1613. static int message_count = 0;
  1614. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  1615. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  1616. test7_will_message_received = 1;
  1617. else
  1618. test7_messages_received++;
  1619. MQTTAsync_freeMessage(&message);
  1620. MQTTAsync_free(topicName);
  1621. return 1;
  1622. }
  1623. void test7cConnected(void* context, char* cause)
  1624. {
  1625. MQTTAsync c = (MQTTAsync)context;
  1626. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  1627. test7c_connected = 1;
  1628. }
  1629. void test7cOnConnectFailure(void* context, MQTTAsync_failureData5* response)
  1630. {
  1631. MyLog(LOGA_DEBUG, "In c connect onFailure callback, context %p", context);
  1632. test7OnFailureCalled++;
  1633. test7Finished = 1;
  1634. }
  1635. void test7cOnConnectSuccess(void* context, MQTTAsync_successData5* response)
  1636. {
  1637. MQTTAsync c = (MQTTAsync)context;
  1638. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1639. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  1640. /* send a message to the proxy to break the connection */
  1641. pubmsg.payload = "TERMINATE";
  1642. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  1643. pubmsg.qos = 0;
  1644. pubmsg.retained = 0;
  1645. //rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  1646. //assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1647. }
  1648. void test7dOnConnectFailure(void* context, MQTTAsync_failureData5* response)
  1649. {
  1650. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  1651. test7OnFailureCalled++;
  1652. test7Finished = 1;
  1653. }
  1654. void test7donSubscribe(void* context, MQTTAsync_successData5* response)
  1655. {
  1656. MQTTAsync c = (MQTTAsync)context;
  1657. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c,
  1658. response->reasonCode);
  1659. test7dReady = 1;
  1660. }
  1661. void test7dOnConnectSuccess(void* context, MQTTAsync_successData5* response)
  1662. {
  1663. MQTTAsync c = (MQTTAsync)context;
  1664. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1665. int qoss[2] = {2, 2};
  1666. char* topics[2] = {willTopic, test_topic};
  1667. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  1668. opts.onSuccess5 = test7donSubscribe;
  1669. opts.context = c;
  1670. //rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  1671. //assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1672. //if (rc != MQTTASYNC_SUCCESS)
  1673. // test5Finished = 1;
  1674. test7dReady = 1;
  1675. }
  1676. int test7(struct Options options)
  1677. {
  1678. char* testname = "test7";
  1679. int subsqos = 2;
  1680. MQTTAsync c, d;
  1681. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
  1682. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  1683. MQTTAsync_createOptions createOpts = MQTTAsync_createOptions_initializer;
  1684. int rc = 0;
  1685. int count = 0;
  1686. char clientidc[70];
  1687. char clientidd[70];
  1688. int i = 0;
  1689. test7_will_message_received = 0;
  1690. test7_messages_received = 0;
  1691. test7Finished = 0;
  1692. test7OnFailureCalled = 0;
  1693. test7c_connected = 0;
  1694. sprintf(willTopic, "paho-test95-7-%s", unique);
  1695. sprintf(clientidc, "paho-test9-7-c-%s", unique);
  1696. sprintf(clientidd, "paho-test9-7-d-%s", unique);
  1697. sprintf(test_topic, "longer paho-test9-7-test topic %s", unique);
  1698. test7Finished = 0;
  1699. failures = 0;
  1700. MyLog(LOGA_INFO, "Starting Offline buffering 7 - fill TCP buffer");
  1701. fprintf(xml, "<testcase classname=\"test7\" name=\"%s\"", testname);
  1702. global_start_time = start_clock();
  1703. createOpts.MQTTVersion = MQTTVERSION_5;
  1704. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1705. NULL, &createOpts);
  1706. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1707. if (rc != MQTTASYNC_SUCCESS)
  1708. {
  1709. MQTTAsync_destroy(&c);
  1710. goto exit;
  1711. }
  1712. createOpts.MQTTVersion = MQTTVERSION_5;
  1713. rc = MQTTAsync_createWithOptions(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1714. NULL, &createOpts);
  1715. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1716. if (rc != MQTTASYNC_SUCCESS)
  1717. {
  1718. MQTTAsync_destroy(&c);
  1719. goto exit;
  1720. }
  1721. opts.keepAliveInterval = 20;
  1722. opts.cleansession = 1;
  1723. rc = MQTTAsync_setCallbacks(d, d, NULL, test7_messageArrived, NULL);
  1724. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1725. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  1726. opts.context = d;
  1727. opts.onSuccess5 = test7dOnConnectSuccess;
  1728. opts.onFailure5 = test7dOnConnectFailure;
  1729. MyLog(LOGA_DEBUG, "Connecting client d");
  1730. rc = MQTTAsync_connect(d, &opts);
  1731. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1732. if (rc != MQTTASYNC_SUCCESS)
  1733. {
  1734. failures++;
  1735. goto exit;
  1736. }
  1737. /* wait until d is ready: connected and subscribed */
  1738. count = 0;
  1739. while (!test7dReady && ++count < 10000)
  1740. {
  1741. if (test7Finished)
  1742. goto exit;
  1743. MySleep(100);
  1744. }
  1745. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1746. rc = MQTTAsync_setConnected(c, c, test7cConnected);
  1747. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1748. /* let client c go: connect, and send disconnect command to proxy */
  1749. opts.will = &wopts;
  1750. opts.will->payload.data = "will message";
  1751. opts.will->payload.len = (int)strlen(opts.will->payload.data) + 1;
  1752. opts.will->qos = 1;
  1753. opts.will->retained = 0;
  1754. opts.will->topicName = willTopic;
  1755. opts.onSuccess5 = test7cOnConnectSuccess;
  1756. opts.onFailure5 = test7cOnConnectFailure;
  1757. opts.context = c;
  1758. opts.cleansession = 0;
  1759. /*opts.automaticReconnect = 1;
  1760. opts.minRetryInterval = 3;
  1761. opts.maxRetryInterval = 6;*/
  1762. MyLog(LOGA_DEBUG, "Connecting client c");
  1763. rc = MQTTAsync_connect(c, &opts);
  1764. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1765. if (rc != MQTTASYNC_SUCCESS)
  1766. {
  1767. failures++;
  1768. goto exit;
  1769. }
  1770. count = 0;
  1771. while (!test7c_connected && ++count < 10000)
  1772. MySleep(100);
  1773. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1774. /* wait for will message */
  1775. //while (test7_will_message_received == 0 && ++count < 10000)
  1776. // MySleep(100);
  1777. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered by TCP");
  1778. test7c_connected = 0;
  1779. char buf[5000000];
  1780. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  1781. for (i = 0; i < 50000; ++i)
  1782. {
  1783. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1784. MQTTAsync_responseOptions pubopts = MQTTAsync_responseOptions_initializer;
  1785. pubmsg.qos = 0; /*i % 3;*/
  1786. sprintf(buf, "QoS %d message", pubmsg.qos);
  1787. pubmsg.payload = buf;
  1788. pubmsg.payloadlen = 5000000; //(int)(strlen(pubmsg.payload) + 1);
  1789. pubmsg.retained = 0;
  1790. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &pubopts);
  1791. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
  1792. if (rc != 0)
  1793. {
  1794. //MyLog(LOGA_DEBUG, "Connecting client c");
  1795. //rc = MQTTAsync_connect(c, &opts);
  1796. //MySleep(1000);
  1797. break;
  1798. }
  1799. }
  1800. #if 0
  1801. assert3PendingTokens(c);
  1802. rc = MQTTAsync_reconnect(c);
  1803. assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1804. /* wait for client to be reconnected */
  1805. while (!test5c_connected && ++count < 10000)
  1806. MySleep(100);
  1807. /* wait for success or failure callback */
  1808. while (test5_messages_received < 3 && ++count < 10000)
  1809. MySleep(100);
  1810. waitForNoPendingTokens(c);
  1811. #endif
  1812. exit:
  1813. rc = MQTTAsync_disconnect(c, NULL);
  1814. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1815. rc = MQTTAsync_disconnect(d, NULL);
  1816. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1817. MySleep(200);
  1818. MQTTAsync_destroy(&c);
  1819. MQTTAsync_destroy(&d);
  1820. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  1821. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  1822. write_test_result();
  1823. return failures;
  1824. }
  1825. void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
  1826. {
  1827. printf("%s\n", message);
  1828. }
  1829. int main(int argc, char** argv)
  1830. {
  1831. int* numtests = &tests;
  1832. int rc = 0;
  1833. int (*tests[])() = { NULL, test1, test2, test3, test4, test5, test6 };
  1834. time_t randtime;
  1835. srand((unsigned) time(&randtime));
  1836. sprintf(unique, "%u", rand());
  1837. MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
  1838. xml = fopen("TEST-test9.xml", "w");
  1839. fprintf(xml, "<testsuite name=\"test9\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
  1840. MQTTAsync_setTraceCallback(handleTrace);
  1841. getopts(argc, argv);
  1842. if (options.test_no == 0)
  1843. { /* run all the tests */
  1844. for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
  1845. {
  1846. failures = 0;
  1847. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  1848. rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
  1849. }
  1850. }
  1851. else
  1852. {
  1853. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  1854. rc = tests[options.test_no](options); /* run just the selected test */
  1855. }
  1856. MyLog(LOGA_INFO, "Total tests run: %d", *numtests);
  1857. if (rc == 0)
  1858. MyLog(LOGA_INFO, "verdict pass");
  1859. else
  1860. MyLog(LOGA_INFO, "verdict fail");
  1861. fprintf(xml, "</testsuite>\n");
  1862. fclose(xml);
  1863. return rc;
  1864. }