test10.c 58 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854
  1. /*******************************************************************************
  2. * Copyright (c) 2009, 2020 IBM Corp. and others
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs - MQTT 5.0 support
  16. *******************************************************************************/
  17. /**
  18. * @file
  19. * MQTT V5 specific tests for the MQTT C client
  20. *
  21. * - topic aliases
  22. * - subscription ids
  23. * - session expiry
  24. * - payload format
  25. * - flow control
  26. * - QoS 2 exchange termination
  27. * - request/response
  28. * - shared subscriptions
  29. * - server initiated disconnect
  30. * - auth packets
  31. * - server assigned clientid returned in a property
  32. * - server defined keepalive
  33. * - subscribe failure
  34. */
  35. #include "MQTTClient.h"
  36. #include <string.h>
  37. #include <stdlib.h>
  38. #if !defined(_WINDOWS)
  39. #include <sys/time.h>
  40. #include <sys/socket.h>
  41. #include <unistd.h>
  42. #include <errno.h>
  43. #else
  44. #include <windows.h>
  45. #define setenv(a, b, c) _putenv_s(a, b)
  46. #endif
  47. #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
  48. void usage(void)
  49. {
  50. printf("help!!\n");
  51. exit(EXIT_FAILURE);
  52. }
  53. struct Options
  54. {
  55. char* connection; /**< connection to system under test. */
  56. char** haconnections;
  57. char* proxy_connection;
  58. int hacount;
  59. int verbose;
  60. int test_no;
  61. int MQTTVersion;
  62. int iterations;
  63. } options =
  64. {
  65. "tcp://localhost:1883",
  66. NULL,
  67. "tcp://localhost:1884",
  68. 0,
  69. 0,
  70. 0,
  71. MQTTVERSION_5,
  72. 1,
  73. };
  74. void getopts(int argc, char** argv)
  75. {
  76. int count = 1;
  77. while (count < argc)
  78. {
  79. if (strcmp(argv[count], "--test_no") == 0)
  80. {
  81. if (++count < argc)
  82. options.test_no = atoi(argv[count]);
  83. else
  84. usage();
  85. }
  86. else if (strcmp(argv[count], "--connection") == 0)
  87. {
  88. if (++count < argc)
  89. {
  90. options.connection = argv[count];
  91. printf("\nSetting connection to %s\n", options.connection);
  92. }
  93. else
  94. usage();
  95. }
  96. else if (strcmp(argv[count], "--haconnections") == 0)
  97. {
  98. if (++count < argc)
  99. {
  100. char* tok = strtok(argv[count], " ");
  101. options.hacount = 0;
  102. options.haconnections = malloc(sizeof(char*) * 5);
  103. while (tok)
  104. {
  105. options.haconnections[options.hacount] = malloc(strlen(tok) + 1);
  106. strcpy(options.haconnections[options.hacount], tok);
  107. options.hacount++;
  108. tok = strtok(NULL, " ");
  109. }
  110. }
  111. else
  112. usage();
  113. }
  114. else if (strcmp(argv[count], "--proxy_connection") == 0)
  115. {
  116. if (++count < argc)
  117. options.proxy_connection = argv[count];
  118. else
  119. usage();
  120. }
  121. else if (strcmp(argv[count], "--MQTTversion") == 0)
  122. {
  123. if (++count < argc)
  124. {
  125. options.MQTTVersion = atoi(argv[count]);
  126. printf("setting MQTT version to %d\n", options.MQTTVersion);
  127. }
  128. else
  129. usage();
  130. }
  131. else if (strcmp(argv[count], "--iterations") == 0)
  132. {
  133. if (++count < argc)
  134. options.iterations = atoi(argv[count]);
  135. else
  136. usage();
  137. }
  138. else if (strcmp(argv[count], "--verbose") == 0)
  139. {
  140. options.verbose = 1;
  141. printf("\nSetting verbose on\n");
  142. }
  143. count++;
  144. }
  145. }
  146. #define LOGA_DEBUG 0
  147. #define LOGA_INFO 1
  148. #include <stdarg.h>
  149. #include <time.h>
  150. #include <sys/timeb.h>
  151. void MyLog(int LOGA_level, char* format, ...)
  152. {
  153. static char msg_buf[256];
  154. va_list args;
  155. #if defined(_WIN32) || defined(_WINDOWS)
  156. struct timeb ts;
  157. #else
  158. struct timeval ts;
  159. #endif
  160. struct tm timeinfo;
  161. if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
  162. return;
  163. #if defined(_WIN32) || defined(_WINDOWS)
  164. ftime(&ts);
  165. localtime_s(&timeinfo, &ts.time);
  166. #else
  167. gettimeofday(&ts, NULL);
  168. localtime_r(&ts.tv_sec, &timeinfo);
  169. #endif
  170. strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo);
  171. #if defined(_WIN32) || defined(_WINDOWS)
  172. sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
  173. #else
  174. sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000L);
  175. #endif
  176. va_start(args, format);
  177. vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
  178. va_end(args);
  179. printf("%s\n", msg_buf);
  180. fflush(stdout);
  181. }
  182. #if defined(_WIN32) || defined(_WINDOWS)
  183. #define mqsleep(A) Sleep(1000*A)
  184. #define START_TIME_TYPE DWORD
  185. static DWORD start_time = 0;
  186. START_TIME_TYPE start_clock(void)
  187. {
  188. return GetTickCount();
  189. }
  190. #elif defined(AIX)
  191. #define mqsleep sleep
  192. #define START_TIME_TYPE struct timespec
  193. START_TIME_TYPE start_clock(void)
  194. {
  195. static struct timespec start;
  196. clock_gettime(CLOCK_REALTIME, &start);
  197. return start;
  198. }
  199. #else
  200. #define mqsleep sleep
  201. #define START_TIME_TYPE struct timeval
  202. /* TODO - unused - remove? static struct timeval start_time; */
  203. START_TIME_TYPE start_clock(void)
  204. {
  205. struct timeval start_time;
  206. gettimeofday(&start_time, NULL);
  207. return start_time;
  208. }
  209. #endif
  210. #if defined(_WIN32)
  211. long elapsed(START_TIME_TYPE start_time)
  212. {
  213. return GetTickCount() - start_time;
  214. }
  215. #elif defined(AIX)
  216. #define assert(a)
  217. long elapsed(struct timespec start)
  218. {
  219. struct timespec now, res;
  220. clock_gettime(CLOCK_REALTIME, &now);
  221. ntimersub(now, start, res);
  222. return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
  223. }
  224. #else
  225. long elapsed(START_TIME_TYPE start_time)
  226. {
  227. struct timeval now, res;
  228. gettimeofday(&now, NULL);
  229. timersub(&now, &start_time, &res);
  230. return (res.tv_sec)*1000 + (res.tv_usec)/1000;
  231. }
  232. #endif
  233. #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
  234. #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
  235. int tests = 0;
  236. int failures = 0;
  237. FILE* xml;
  238. START_TIME_TYPE global_start_time;
  239. char output[3000];
  240. char* cur_output = output;
  241. void write_test_result(void)
  242. {
  243. long duration = elapsed(global_start_time);
  244. fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
  245. if (cur_output != output)
  246. {
  247. fprintf(xml, "%s", output);
  248. cur_output = output;
  249. }
  250. fprintf(xml, "</testcase>\n");
  251. }
  252. void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
  253. {
  254. ++tests;
  255. if (!value)
  256. {
  257. va_list args;
  258. ++failures;
  259. MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
  260. va_start(args, format);
  261. vprintf(format, args);
  262. va_end(args);
  263. cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
  264. description, filename, lineno);
  265. }
  266. else
  267. MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
  268. }
  269. void logProperties(MQTTProperties *props)
  270. {
  271. int i = 0;
  272. for (i = 0; i < props->count; ++i)
  273. {
  274. int id = props->array[i].identifier;
  275. const char* name = MQTTPropertyName(id);
  276. char* intformat = "Property name %s value %d";
  277. switch (MQTTProperty_getType(id))
  278. {
  279. case MQTTPROPERTY_TYPE_BYTE:
  280. MyLog(LOGA_INFO, intformat, name, props->array[i].value.byte);
  281. break;
  282. case MQTTPROPERTY_TYPE_TWO_BYTE_INTEGER:
  283. MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer2);
  284. break;
  285. case MQTTPROPERTY_TYPE_FOUR_BYTE_INTEGER:
  286. MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer4);
  287. break;
  288. case MQTTPROPERTY_TYPE_VARIABLE_BYTE_INTEGER:
  289. MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer4);
  290. break;
  291. case MQTTPROPERTY_TYPE_BINARY_DATA:
  292. case MQTTPROPERTY_TYPE_UTF_8_ENCODED_STRING:
  293. MyLog(LOGA_INFO, "Property name value %s %.*s", name,
  294. props->array[i].value.data.len, props->array[i].value.data.data);
  295. break;
  296. case MQTTPROPERTY_TYPE_UTF_8_STRING_PAIR:
  297. MyLog(LOGA_INFO, "Property name %s key %.*s value %.*s", name,
  298. props->array[i].value.data.len, props->array[i].value.data.data,
  299. props->array[i].value.value.len, props->array[i].value.value.data);
  300. break;
  301. }
  302. }
  303. }
  304. struct
  305. {
  306. int disconnected;
  307. } test_topic_aliases_globals =
  308. {
  309. 0,
  310. };
  311. void disconnected(void* context, MQTTProperties* props, enum MQTTReasonCodes rc)
  312. {
  313. MQTTClient c = (MQTTClient)context;
  314. MyLog(LOGA_INFO, "Callback: disconnected, reason code \"%s\"", MQTTReasonCode_toString(rc));
  315. logProperties(props);
  316. test_topic_aliases_globals.disconnected = 1;
  317. }
  318. static int messages_arrived = 0;
  319. int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
  320. {
  321. MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
  322. topicName, message->payloadlen, (char*)(message->payload));
  323. assert("Message structure version should be 1", message->struct_version == 1,
  324. "message->struct_version was %d", message->struct_version);
  325. if (message->struct_version == 1)
  326. {
  327. const int props_count = 0;
  328. assert("Properties count should be 0", message->properties.count == props_count,
  329. "Properties count was %d\n", message->properties.count);
  330. logProperties(&message->properties);
  331. }
  332. messages_arrived++;
  333. MQTTClient_free(topicName);
  334. MQTTClient_freeMessage(&message);
  335. return 1;
  336. }
  337. int test_client_topic_aliases(struct Options options)
  338. {
  339. int subsqos = 2;
  340. MQTTClient c;
  341. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
  342. MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
  343. MQTTProperties props = MQTTProperties_initializer;
  344. MQTTProperties connect_props = MQTTProperties_initializer;
  345. MQTTProperty property;
  346. MQTTSubscribe_options subopts = MQTTSubscribe_options_initializer;
  347. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  348. MQTTResponse response = MQTTResponse_initializer;
  349. MQTTClient_deliveryToken dt;
  350. int rc = 0;
  351. int count = 0;
  352. char* test_topic = "test_client_topic_aliases";
  353. int topicAliasMaximum = 0;
  354. MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
  355. fprintf(xml, "<testcase classname=\"test_client_topic_aliases\" name=\"client topic aliases\"");
  356. global_start_time = start_clock();
  357. failures = 0;
  358. MyLog(LOGA_INFO, "Starting test 1 - client topic aliases");
  359. createOpts.MQTTVersion = MQTTVERSION_5;
  360. rc = MQTTClient_createWithOptions(&c, options.connection, "client_topic_alias_test",
  361. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
  362. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  363. if (rc != MQTTCLIENT_SUCCESS)
  364. {
  365. MQTTClient_destroy(&c);
  366. goto exit;
  367. }
  368. rc = MQTTClient_setCallbacks(c, NULL, NULL, messageArrived, NULL);
  369. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  370. rc = MQTTClient_setDisconnected(c, c, disconnected);
  371. assert("Good rc from setDisconnected", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  372. opts.keepAliveInterval = 20;
  373. opts.cleanstart = 1;
  374. opts.MQTTVersion = options.MQTTVersion;
  375. if (options.haconnections != NULL)
  376. {
  377. opts.serverURIs = options.haconnections;
  378. opts.serverURIcount = options.hacount;
  379. }
  380. MyLog(LOGA_DEBUG, "Connecting");
  381. response = MQTTClient_connect5(c, &opts, NULL, NULL);
  382. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  383. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  384. goto exit;
  385. if (response.properties)
  386. {
  387. logProperties(response.properties);
  388. MQTTResponse_free(response);
  389. }
  390. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  391. pubmsg.payloadlen = 11;
  392. pubmsg.qos = 1;
  393. pubmsg.retained = 0;
  394. /* a Topic Alias of 0 is not allowed, so we should be disconnected */
  395. property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS;
  396. property.value.integer2 = 0;
  397. MQTTProperties_add(&pubmsg.properties, &property);
  398. response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
  399. assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  400. /* Now we expect to receive a disconnect packet telling us why */
  401. count = 0;
  402. while (test_topic_aliases_globals.disconnected == 0 && ++count < 10)
  403. {
  404. #if defined(_WIN32)
  405. Sleep(1000);
  406. #else
  407. usleep(1000000L);
  408. #endif
  409. }
  410. assert("Disconnected should be called", test_topic_aliases_globals.disconnected == 1,
  411. "was %d", test_topic_aliases_globals.disconnected);
  412. property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
  413. property.value.integer4 = 30;
  414. MQTTProperties_add(&connect_props, &property);
  415. /* Now try a valid topic alias */
  416. response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
  417. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  418. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  419. goto exit;
  420. if (response.properties)
  421. {
  422. if (MQTTProperties_hasProperty(response.properties, MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM))
  423. topicAliasMaximum = MQTTProperties_getNumericValue(response.properties, MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM);
  424. logProperties(response.properties);
  425. MQTTResponse_free(response);
  426. }
  427. assert("topicAliasMaximum > 0", topicAliasMaximum > 0, "topicAliasMaximum was %d", topicAliasMaximum);
  428. /* subscribe to a topic */
  429. response = MQTTClient_subscribe5(c, test_topic, 2, NULL, NULL);
  430. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  431. /* then publish to the topic */
  432. MQTTProperties_free(&pubmsg.properties);
  433. property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS;
  434. property.value.integer2 = 1;
  435. MQTTProperties_add(&pubmsg.properties, &property);
  436. messages_arrived = 0;
  437. response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
  438. assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  439. /* should get a response */
  440. while (messages_arrived == 0 && ++count < 10)
  441. {
  442. #if defined(_WIN32)
  443. Sleep(1000);
  444. #else
  445. usleep(1000000L);
  446. #endif
  447. }
  448. assert("1 message should have arrived", messages_arrived == 1, "was %d", messages_arrived);
  449. /* now publish to the topic alias only */
  450. messages_arrived = 0;
  451. response = MQTTClient_publishMessage5(c, "", &pubmsg, &dt);
  452. assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  453. /* should get a response */
  454. while (messages_arrived == 0 && ++count < 10)
  455. {
  456. #if defined(_WIN32)
  457. Sleep(1000);
  458. #else
  459. usleep(1000000L);
  460. #endif
  461. }
  462. assert("1 message should have arrived", messages_arrived == 1, "was %d", messages_arrived);
  463. rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
  464. /* Reconnect. Topic aliases should be deleted, but not subscription */
  465. opts.cleanstart = 0;
  466. response = MQTTClient_connect5(c, &opts, NULL, NULL);
  467. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  468. MQTTResponse_free(response);
  469. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  470. goto exit;
  471. /* then publish to the topic */
  472. MQTTProperties_free(&pubmsg.properties);
  473. messages_arrived = 0;
  474. response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
  475. assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  476. /* should get a response */
  477. while (messages_arrived == 0 && ++count < 10)
  478. {
  479. #if defined(_WIN32)
  480. Sleep(1000);
  481. #else
  482. usleep(1000000L);
  483. #endif
  484. }
  485. assert("1 message should have arrived", messages_arrived == 1, "was %d", messages_arrived);
  486. /* now publish to the topic alias only */
  487. test_topic_aliases_globals.disconnected = 0;
  488. messages_arrived = 0;
  489. property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS;
  490. property.value.integer2 = 1;
  491. MQTTProperties_add(&pubmsg.properties, &property);
  492. response = MQTTClient_publishMessage5(c, "", &pubmsg, &dt);
  493. assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  494. /* should not get a response */
  495. while (messages_arrived == 0 && ++count < 10)
  496. {
  497. #if defined(_WIN32)
  498. Sleep(1000);
  499. #else
  500. usleep(1000000L);
  501. #endif
  502. }
  503. assert("No message should have arrived", messages_arrived == 0, "was %d", messages_arrived);
  504. /* Now we expect to receive a disconnect packet telling us why */
  505. count = 0;
  506. while (test_topic_aliases_globals.disconnected == 0 && ++count < 10)
  507. {
  508. #if defined(_WIN32)
  509. Sleep(1000);
  510. #else
  511. usleep(1000000L);
  512. #endif
  513. }
  514. assert("Disconnected should be called", test_topic_aliases_globals.disconnected == 1,
  515. "was %d", test_topic_aliases_globals.disconnected);
  516. MQTTProperties_free(&pubmsg.properties);
  517. MQTTProperties_free(&props);
  518. MQTTProperties_free(&connect_props);
  519. MQTTClient_destroy(&c);
  520. exit:
  521. MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.",
  522. (failures == 0) ? "passed" : "failed", tests, failures);
  523. write_test_result();
  524. return failures;
  525. }
  526. int test2_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
  527. {
  528. static int received = 0;
  529. static int first_topic_alias = 0;
  530. int topicAlias = 0;
  531. received++;
  532. MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
  533. topicName, message->payloadlen, (char*)(message->payload));
  534. assert("Message structure version should be 1", message->struct_version == 1,
  535. "message->struct_version was %d", message->struct_version);
  536. if (message->struct_version == 1)
  537. {
  538. const int props_count = 0;
  539. if (MQTTProperties_hasProperty(&message->properties, MQTTPROPERTY_CODE_TOPIC_ALIAS))
  540. topicAlias = MQTTProperties_getNumericValue(&message->properties, MQTTPROPERTY_CODE_TOPIC_ALIAS);
  541. if (received == 1)
  542. first_topic_alias = topicAlias;
  543. else
  544. assert("All topic aliases should be the same", topicAlias == first_topic_alias,
  545. "Topic alias was %d\n", topicAlias);
  546. assert("topicAlias should not be 0", topicAlias > 0, "Topic alias was %d\n", topicAlias);
  547. logProperties(&message->properties);
  548. }
  549. messages_arrived++;
  550. MQTTClient_free(topicName);
  551. MQTTClient_freeMessage(&message);
  552. return 1;
  553. }
  554. int test_server_topic_aliases(struct Options options)
  555. {
  556. int subsqos = 2;
  557. MQTTClient c;
  558. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
  559. MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
  560. MQTTProperties connect_props = MQTTProperties_initializer;
  561. MQTTProperty property;
  562. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  563. MQTTResponse response = MQTTResponse_initializer;
  564. MQTTClient_deliveryToken dt;
  565. int rc = 0;
  566. int count = 0;
  567. char* test_topic = "test_server_topic_aliases";
  568. int topicAliasMaximum = 0;
  569. int qos = 0;
  570. const int msg_count = 3;
  571. MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
  572. fprintf(xml, "<testcase classname=\"test_server_topic_aliases\" name=\"server topic aliases\"");
  573. global_start_time = start_clock();
  574. failures = 0;
  575. MyLog(LOGA_INFO, "Starting test 2 - server topic aliases");
  576. createOpts.MQTTVersion = MQTTVERSION_5;
  577. rc = MQTTClient_createWithOptions(&c, options.connection, "server_topic_alias_test",
  578. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
  579. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  580. if (rc != MQTTCLIENT_SUCCESS)
  581. {
  582. MQTTClient_destroy(&c);
  583. goto exit;
  584. }
  585. rc = MQTTClient_setCallbacks(c, NULL, NULL, test2_messageArrived, NULL);
  586. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  587. opts.keepAliveInterval = 20;
  588. opts.cleanstart = 1;
  589. opts.MQTTVersion = options.MQTTVersion;
  590. if (options.haconnections != NULL)
  591. {
  592. opts.serverURIs = options.haconnections;
  593. opts.serverURIcount = options.hacount;
  594. }
  595. /* Allow at least one server topic alias */
  596. property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM;
  597. property.value.integer2 = 1;
  598. MQTTProperties_add(&connect_props, &property);
  599. MyLog(LOGA_DEBUG, "Connecting");
  600. response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
  601. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  602. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  603. goto exit;
  604. if (response.properties)
  605. {
  606. if (MQTTProperties_hasProperty(response.properties, MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM))
  607. topicAliasMaximum = MQTTProperties_getNumericValue(response.properties, MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM);
  608. logProperties(response.properties);
  609. MQTTResponse_free(response);
  610. }
  611. /* subscribe to a topic */
  612. response = MQTTClient_subscribe5(c, test_topic, 2, NULL, NULL);
  613. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  614. messages_arrived = 0;
  615. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  616. pubmsg.payloadlen = 11;
  617. pubmsg.retained = 0;
  618. for (qos = 0; qos < msg_count; ++qos)
  619. {
  620. pubmsg.qos = qos;
  621. response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
  622. assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  623. }
  624. /* should get responses */
  625. while (messages_arrived < msg_count && ++count < 10)
  626. {
  627. #if defined(_WIN32)
  628. Sleep(1000);
  629. #else
  630. usleep(1000000L);
  631. #endif
  632. }
  633. assert("3 messages should have arrived", messages_arrived == msg_count, "was %d", messages_arrived);
  634. rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
  635. MQTTProperties_free(&pubmsg.properties);
  636. MQTTProperties_free(&connect_props);
  637. MQTTClient_destroy(&c);
  638. exit:
  639. MyLog(LOGA_INFO, "TEST2: test %s. %d tests run, %d failures.",
  640. (failures == 0) ? "passed" : "failed", tests, failures);
  641. write_test_result();
  642. return failures;
  643. }
  644. int test_subscription_ids_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
  645. {
  646. static int received = 0;
  647. static int first_topic_alias = 0;
  648. int topicAlias = 0;
  649. received++;
  650. MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
  651. topicName, message->payloadlen, (char*)(message->payload));
  652. assert("Message structure version should be 1", message->struct_version == 1,
  653. "message->struct_version was %d", message->struct_version);
  654. if (message->struct_version == 1)
  655. {
  656. int subsidcount = 0, i = 0;
  657. subsidcount = MQTTProperties_propertyCount(&message->properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER);
  658. for (i = 0; i < subsidcount; ++i)
  659. {
  660. int subsid = MQTTProperties_getNumericValueAt(&message->properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER, i);
  661. assert("Subsid is i+1", subsid == i+1, "subsid is not correct %d\n", subsid);
  662. }
  663. logProperties(&message->properties);
  664. }
  665. messages_arrived++;
  666. MQTTClient_free(topicName);
  667. MQTTClient_freeMessage(&message);
  668. return 1;
  669. }
  670. int test_subscription_ids(struct Options options)
  671. {
  672. int subsqos = 2;
  673. MQTTClient c;
  674. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
  675. MQTTProperties connect_props = MQTTProperties_initializer;
  676. MQTTProperties subs_props = MQTTProperties_initializer;
  677. MQTTProperty property;
  678. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  679. MQTTResponse response = MQTTResponse_initializer;
  680. MQTTClient_deliveryToken dt;
  681. int rc = 0;
  682. int count = 0;
  683. char* test_topic = "test_subscription_ids";
  684. const int msg_count = 1;
  685. int subsids = 1;
  686. MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
  687. fprintf(xml, "<testcase classname=\"test_subscription_ids\" name=\"subscription ids\"");
  688. global_start_time = start_clock();
  689. failures = 0;
  690. MyLog(LOGA_INFO, "Starting test 3 - subscription ids");
  691. createOpts.MQTTVersion = MQTTVERSION_5;
  692. rc = MQTTClient_createWithOptions(&c, options.connection, "subscription_ids",
  693. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
  694. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  695. if (rc != MQTTCLIENT_SUCCESS)
  696. {
  697. MQTTClient_destroy(&c);
  698. goto exit;
  699. }
  700. rc = MQTTClient_setCallbacks(c, NULL, NULL, test_subscription_ids_messageArrived, NULL);
  701. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  702. opts.keepAliveInterval = 20;
  703. opts.cleanstart = 1;
  704. opts.MQTTVersion = options.MQTTVersion;
  705. if (options.haconnections != NULL)
  706. {
  707. opts.serverURIs = options.haconnections;
  708. opts.serverURIcount = options.hacount;
  709. }
  710. MyLog(LOGA_DEBUG, "Connecting");
  711. response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
  712. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  713. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  714. goto exit;
  715. if (response.properties)
  716. {
  717. if (MQTTProperties_hasProperty(response.properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIERS_AVAILABLE))
  718. subsids = MQTTProperties_getNumericValue(response.properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIERS_AVAILABLE);
  719. logProperties(response.properties);
  720. MQTTResponse_free(response);
  721. }
  722. assert("Subscription ids must be available", subsids == 1, "subsids is %d", subsids);
  723. /* subscribe to the test topic */
  724. property.identifier = MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER;
  725. property.value.integer4 = 1;
  726. MQTTProperties_add(&subs_props, &property);
  727. response = MQTTClient_subscribe5(c, test_topic, 2, NULL, &subs_props);
  728. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  729. /* now to an overlapping topic */
  730. property.value.integer4 = 2;
  731. subs_props.array[0].value.integer4 = 2;
  732. response = MQTTClient_subscribe5(c, "+", 2, NULL, &subs_props);
  733. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  734. messages_arrived = 0;
  735. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  736. pubmsg.payloadlen = 11;
  737. pubmsg.retained = 0;
  738. pubmsg.qos = 2;
  739. response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
  740. assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  741. /* should get responses */
  742. while (messages_arrived < msg_count && ++count < 10)
  743. {
  744. #if defined(_WIN32)
  745. Sleep(1000);
  746. #else
  747. usleep(1000000L);
  748. #endif
  749. }
  750. assert("1 message should have arrived", messages_arrived == msg_count, "was %d", messages_arrived);
  751. rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
  752. MQTTProperties_free(&pubmsg.properties);
  753. MQTTProperties_free(&subs_props);
  754. MQTTProperties_free(&connect_props);
  755. MQTTClient_destroy(&c);
  756. exit:
  757. MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
  758. (failures == 0) ? "passed" : "failed", tests, failures);
  759. write_test_result();
  760. return failures;
  761. }
  762. int test_flow_control_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
  763. {
  764. static int received = 0;
  765. static int first_topic_alias = 0;
  766. int topicAlias = 0;
  767. received++;
  768. MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
  769. topicName, message->payloadlen, (char*)(message->payload));
  770. assert("Message structure version should be 1", message->struct_version == 1,
  771. "message->struct_version was %d", message->struct_version);
  772. messages_arrived++;
  773. MQTTClient_free(topicName);
  774. MQTTClient_freeMessage(&message);
  775. return 1;
  776. }
  777. static int blocking_found = 0;
  778. void test_flow_control_trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
  779. {
  780. static char* msg = "Blocking publish on queue full";
  781. if (strstr(message, msg) != NULL)
  782. blocking_found = 1;
  783. }
  784. int test_flow_control(struct Options options)
  785. {
  786. int subsqos = 2;
  787. MQTTClient c;
  788. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
  789. MQTTProperties connect_props = MQTTProperties_initializer;
  790. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  791. MQTTResponse response = MQTTResponse_initializer;
  792. MQTTClient_deliveryToken dt;
  793. int rc = 0, i = 0, count = 0;
  794. char* test_topic = "test_flow_control";
  795. int receive_maximum = 65535;
  796. MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
  797. fprintf(xml, "<testcase classname=\"test_flow_control\" name=\"flow control\"");
  798. global_start_time = start_clock();
  799. failures = 0;
  800. MyLog(LOGA_INFO, "Starting test - flow control");
  801. MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_MINIMUM); /* to get the blocking trace message */
  802. createOpts.MQTTVersion = MQTTVERSION_5;
  803. rc = MQTTClient_createWithOptions(&c, options.connection, "flow_control",
  804. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
  805. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  806. if (rc != MQTTCLIENT_SUCCESS)
  807. goto exit;
  808. rc = MQTTClient_setCallbacks(c, NULL, NULL, test_flow_control_messageArrived, NULL);
  809. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  810. opts.keepAliveInterval = 20;
  811. opts.cleanstart = 1;
  812. opts.MQTTVersion = options.MQTTVersion;
  813. opts.reliable = 0;
  814. opts.maxInflightMessages = 100;
  815. if (options.haconnections != NULL)
  816. {
  817. opts.serverURIs = options.haconnections;
  818. opts.serverURIcount = options.hacount;
  819. }
  820. MyLog(LOGA_DEBUG, "Connecting");
  821. response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
  822. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  823. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  824. goto exit;
  825. if (response.properties)
  826. {
  827. if (MQTTProperties_hasProperty(response.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM))
  828. receive_maximum = MQTTProperties_getNumericValue(response.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM);
  829. logProperties(response.properties);
  830. MQTTResponse_free(response);
  831. }
  832. response = MQTTClient_subscribe5(c, test_topic, 2, NULL, NULL);
  833. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  834. messages_arrived = 0;
  835. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  836. pubmsg.payloadlen = 11;
  837. pubmsg.retained = 0;
  838. pubmsg.qos = 2;
  839. for (i = 0; i < receive_maximum + 2; ++i)
  840. {
  841. response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
  842. assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  843. }
  844. /* should get responses */
  845. while (messages_arrived < receive_maximum + 2 && ++count < 10)
  846. {
  847. #if defined(_WIN32)
  848. Sleep(1000);
  849. #else
  850. usleep(1000000L);
  851. #endif
  852. }
  853. assert("messages should have arrived", messages_arrived == receive_maximum + 2, "was %d", messages_arrived);
  854. assert("should have blocked", blocking_found == 1, "was %d\n", blocking_found);
  855. rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
  856. exit:
  857. MQTTClient_setTraceCallback(NULL);
  858. MQTTProperties_free(&pubmsg.properties);
  859. MQTTProperties_free(&connect_props);
  860. MQTTClient_destroy(&c);
  861. MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
  862. (failures == 0) ? "passed" : "failed", tests, failures);
  863. write_test_result();
  864. return failures;
  865. }
  866. int test_error_reporting(struct Options options)
  867. {
  868. int subsqos = 2;
  869. MQTTClient c;
  870. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
  871. MQTTProperties props = MQTTProperties_initializer;
  872. MQTTProperty property;
  873. MQTTResponse response = MQTTResponse_initializer;
  874. int rc = 0, i = 0, count = 0;
  875. char* test_topic = "test_error_reporting";
  876. int receive_maximum = 65535;
  877. MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
  878. fprintf(xml, "<testcase classname=\"test_error_reporting\" name=\"error reporting\"");
  879. global_start_time = start_clock();
  880. failures = 0;
  881. MyLog(LOGA_INFO, "Starting test - error reporting");
  882. //MQTTClient_setTraceCallback(test_flow_control_trace_callback);
  883. createOpts.MQTTVersion = MQTTVERSION_5;
  884. rc = MQTTClient_createWithOptions(&c, options.connection, "error_reporting",
  885. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
  886. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  887. if (rc != MQTTCLIENT_SUCCESS)
  888. goto exit;
  889. rc = MQTTClient_setCallbacks(c, NULL, NULL, test_flow_control_messageArrived, NULL);
  890. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  891. opts.MQTTVersion = options.MQTTVersion;
  892. if (options.haconnections != NULL)
  893. {
  894. opts.serverURIs = options.haconnections;
  895. opts.serverURIcount = options.hacount;
  896. }
  897. MyLog(LOGA_DEBUG, "Connecting");
  898. response = MQTTClient_connect5(c, &opts, NULL, NULL);
  899. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  900. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  901. goto exit;
  902. if (response.properties)
  903. {
  904. if (MQTTProperties_hasProperty(response.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM))
  905. receive_maximum = MQTTProperties_getNumericValue(response.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM);
  906. logProperties(response.properties);
  907. MQTTResponse_free(response);
  908. }
  909. property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
  910. property.value.data.data = "unsub user property";
  911. property.value.data.len = (int)strlen(property.value.data.data);
  912. property.value.value.data = "unsub user property value";
  913. property.value.value.len = (int)strlen(property.value.value.data);
  914. MQTTProperties_add(&props, &property);
  915. response = MQTTClient_subscribe5(c, test_topic, 2, NULL, &props);
  916. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  917. assert("Properties should exist", response.properties != NULL, "props was %p", response.properties);
  918. if (response.properties)
  919. {
  920. logProperties(response.properties);
  921. MQTTResponse_free(response);
  922. }
  923. response = MQTTClient_unsubscribe5(c, test_topic, &props);
  924. assert("Good rc from unsubscribe", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  925. assert("Properties should exist", response.properties != NULL, "props was %p", response.properties);
  926. if (response.properties)
  927. {
  928. logProperties(response.properties);
  929. MQTTResponse_free(response);
  930. }
  931. rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
  932. exit:
  933. MQTTClient_setTraceCallback(NULL);
  934. MQTTProperties_free(&props);
  935. MQTTClient_destroy(&c);
  936. MyLog(LOGA_INFO, "TEST3: test %s. %d tests run, %d failures.",
  937. (failures == 0) ? "passed" : "failed", tests, failures);
  938. write_test_result();
  939. return failures;
  940. }
  941. struct
  942. {
  943. int published;
  944. int packet_type;
  945. enum MQTTReasonCodes rc;
  946. } test_qos_1_2_errors_globals =
  947. {
  948. 0, -1, MQTTREASONCODE_SUCCESS
  949. };
  950. void published(void* context, int msgid, int packet_type, MQTTProperties* props, enum MQTTReasonCodes rc)
  951. {
  952. MQTTClient c = (MQTTClient)context;
  953. MyLog(LOGA_INFO, "Callback: published, reason code \"%s\" msgid: %d packet type: %d",
  954. MQTTReasonCode_toString(rc), msgid, packet_type);
  955. test_qos_1_2_errors_globals.packet_type = packet_type;
  956. test_qos_1_2_errors_globals.rc = rc;
  957. if (props)
  958. {
  959. MyLog(LOGA_INFO, "Callback: published, properties:");
  960. logProperties(props);
  961. }
  962. test_qos_1_2_errors_globals.published = 1;
  963. }
  964. void test_trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message)
  965. {
  966. printf("%s\n", message);
  967. }
  968. enum msgTypes
  969. {
  970. CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
  971. PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
  972. PINGREQ, PINGRESP, DISCONNECT, AUTH
  973. };
  974. int test_qos_1_2_errors(struct Options options)
  975. {
  976. int subsqos = 2;
  977. MQTTClient c;
  978. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
  979. MQTTProperties props = MQTTProperties_initializer;
  980. MQTTProperty property;
  981. MQTTResponse response = MQTTResponse_initializer;
  982. MQTTClient_deliveryToken dt;
  983. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  984. int rc = 0, i = 0, count = 0;
  985. char* test_topic = "test_qos_1_2_errors";
  986. int receive_maximum = 65535;
  987. MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
  988. fprintf(xml, "<testcase classname=\"test_qos_1_2_errors\" name=\"qos 1 2 errors\"");
  989. global_start_time = start_clock();
  990. failures = 0;
  991. MyLog(LOGA_INFO, "Starting test - qos 1 and 2 errors");
  992. //MQTTClient_setTraceCallback(test_trace_callback);
  993. createOpts.MQTTVersion = MQTTVERSION_5;
  994. rc = MQTTClient_createWithOptions(&c, options.connection, "error_reporting",
  995. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
  996. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  997. if (rc != MQTTCLIENT_SUCCESS)
  998. goto exit;
  999. rc = MQTTClient_setCallbacks(c, NULL, NULL, test_flow_control_messageArrived, NULL);
  1000. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  1001. rc = MQTTClient_setPublished(c, c, published);
  1002. assert("Good rc from setPublished", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  1003. opts.MQTTVersion = options.MQTTVersion;
  1004. if (options.haconnections != NULL)
  1005. {
  1006. opts.serverURIs = options.haconnections;
  1007. opts.serverURIcount = options.hacount;
  1008. }
  1009. MyLog(LOGA_DEBUG, "Connecting");
  1010. response = MQTTClient_connect5(c, &opts, NULL, NULL);
  1011. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  1012. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  1013. goto exit;
  1014. if (response.properties)
  1015. {
  1016. if (MQTTProperties_hasProperty(response.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM))
  1017. receive_maximum = MQTTProperties_getNumericValue(response.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM);
  1018. logProperties(response.properties);
  1019. MQTTResponse_free(response);
  1020. }
  1021. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  1022. pubmsg.payloadlen = 11;
  1023. pubmsg.qos = 1;
  1024. pubmsg.retained = 0;
  1025. property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
  1026. property.value.data.data = "unsub user property";
  1027. property.value.data.len = (int)strlen(property.value.data.data);
  1028. property.value.value.data = "unsub user property value";
  1029. property.value.value.len = (int)strlen(property.value.value.data);
  1030. MQTTProperties_add(&pubmsg.properties, &property);
  1031. response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
  1032. assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
  1033. count = 0;
  1034. while (test_qos_1_2_errors_globals.published == 0 && ++count < 10)
  1035. {
  1036. #if defined(_WIN32)
  1037. Sleep(1000);
  1038. #else
  1039. usleep(1000000L);
  1040. #endif
  1041. }
  1042. assert("Published called", test_qos_1_2_errors_globals.published == 1,
  1043. "published was %d", test_qos_1_2_errors_globals.published);
  1044. assert("Reason code was packet identifier not found",
  1045. test_qos_1_2_errors_globals.rc == MQTTREASONCODE_NOT_AUTHORIZED,
  1046. "Reason code was %d", test_qos_1_2_errors_globals.rc);
  1047. assert("Packet type was PUBACK", test_qos_1_2_errors_globals.packet_type == PUBACK,
  1048. "packet type was %d", test_qos_1_2_errors_globals.packet_type);
  1049. test_qos_1_2_errors_globals.published = 0;
  1050. pubmsg.qos = 2;
  1051. response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);
  1052. assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  1053. count = 0;
  1054. while (test_qos_1_2_errors_globals.published == 0 && ++count < 10)
  1055. {
  1056. #if defined(_WIN32)
  1057. Sleep(1000);
  1058. #else
  1059. usleep(1000000L);
  1060. #endif
  1061. }
  1062. assert("Published called", test_qos_1_2_errors_globals.published == 1,
  1063. "published was %d", test_qos_1_2_errors_globals.published);
  1064. assert("Reason code was packet identifier not found",
  1065. test_qos_1_2_errors_globals.rc == MQTTREASONCODE_NOT_AUTHORIZED,
  1066. "Reason code was %d", test_qos_1_2_errors_globals.rc);
  1067. assert("Packet type was PUBREC", test_qos_1_2_errors_globals.packet_type == PUBREC,
  1068. "packet type was %d", test_qos_1_2_errors_globals.packet_type);
  1069. test_qos_1_2_errors_globals.published = 0;
  1070. response = MQTTClient_publishMessage5(c, "test_qos_1_2_errors_pubcomp", &pubmsg, &dt);
  1071. assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
  1072. count = 0;
  1073. while (test_qos_1_2_errors_globals.published == 0 && ++count < 10)
  1074. {
  1075. #if defined(_WIN32)
  1076. Sleep(1000);
  1077. #else
  1078. usleep(1000000L);
  1079. #endif
  1080. }
  1081. assert("Published called", test_qos_1_2_errors_globals.published == 1,
  1082. "published was %d", test_qos_1_2_errors_globals.published);
  1083. assert("Reason code was packet identifier not found",
  1084. test_qos_1_2_errors_globals.rc == MQTTREASONCODE_PACKET_IDENTIFIER_NOT_FOUND,
  1085. "Reason code was %d", test_qos_1_2_errors_globals.rc);
  1086. assert("Packet type was PUBCOMP", test_qos_1_2_errors_globals.packet_type == PUBCOMP,
  1087. "packet type was %d", test_qos_1_2_errors_globals.packet_type);
  1088. rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
  1089. exit:
  1090. MQTTClient_setTraceCallback(NULL);
  1091. MQTTProperties_free(&props);
  1092. MQTTClient_destroy(&c);
  1093. MyLog(LOGA_INFO, "TEST6: test %s. %d tests run, %d failures.",
  1094. (failures == 0) ? "passed" : "failed", tests, failures);
  1095. write_test_result();
  1096. return failures;
  1097. }
  1098. struct
  1099. {
  1100. char* response_topic;
  1101. char* request_topic;
  1102. int messages_arrived;
  1103. char* correlation_id;
  1104. } test_request_response_globals =
  1105. {
  1106. "my response topic",
  1107. "my request topic",
  1108. 0,
  1109. "request no 1",
  1110. };
  1111. int test_request_response_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
  1112. {
  1113. MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
  1114. topicName, message->payloadlen, (char*)(message->payload));
  1115. assert("Message structure version should be 1", message->struct_version == 1,
  1116. "message->struct_version was %d", message->struct_version);
  1117. if (message->struct_version == 1)
  1118. {
  1119. const int props_count = 0;
  1120. MyLog(LOGA_INFO, "Message properties:");
  1121. logProperties(&message->properties);
  1122. }
  1123. test_request_response_globals.messages_arrived++;
  1124. if (test_request_response_globals.messages_arrived == 1)
  1125. {
  1126. MQTTProperty *prop;
  1127. assert("Topic should be request",
  1128. strcmp(test_request_response_globals.request_topic, topicName) == 0,
  1129. "topic was %s\n", topicName);
  1130. if (MQTTProperties_hasProperty(&message->properties, MQTTPROPERTY_CODE_RESPONSE_TOPIC))
  1131. prop = MQTTProperties_getProperty(&message->properties, MQTTPROPERTY_CODE_RESPONSE_TOPIC);
  1132. assert("Topic should be response",
  1133. strncmp(test_request_response_globals.response_topic, prop->value.data.data, prop->value.data.len) == 0,
  1134. "topic was %.4s\n", prop->value.data.data);
  1135. if (MQTTProperties_hasProperty(&message->properties, MQTTPROPERTY_CODE_CORRELATION_DATA))
  1136. prop = MQTTProperties_getProperty(&message->properties, MQTTPROPERTY_CODE_CORRELATION_DATA);
  1137. assert("Correlation data should be",
  1138. strncmp(test_request_response_globals.correlation_id, prop->value.data.data, prop->value.data.len) == 0,
  1139. "Correlation data was %.4s\n", prop->value.data.data);
  1140. }
  1141. else if (test_request_response_globals.messages_arrived == 2)
  1142. {
  1143. assert("Topic should be response",
  1144. strcmp(test_request_response_globals.response_topic, topicName) == 0,
  1145. "topic was %s\n", topicName);
  1146. }
  1147. MQTTClient_free(topicName);
  1148. MQTTClient_freeMessage(&message);
  1149. return 1;
  1150. }
  1151. int test_request_response(struct Options options)
  1152. {
  1153. int subsqos = 2;
  1154. MQTTClient c;
  1155. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
  1156. MQTTProperties connect_props = MQTTProperties_initializer;
  1157. MQTTProperties subs_props = MQTTProperties_initializer;
  1158. MQTTProperty property;
  1159. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  1160. MQTTResponse response = MQTTResponse_initializer;
  1161. MQTTClient_deliveryToken dt;
  1162. int rc = 0;
  1163. int count = 0;
  1164. char* test_topic = "test_request_response";
  1165. const int msg_count = 1;
  1166. int subsids = 1;
  1167. MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
  1168. fprintf(xml, "<testcase classname=\"test_request_response\" name=\"request/response\"");
  1169. global_start_time = start_clock();
  1170. failures = 0;
  1171. MyLog(LOGA_INFO, "Starting test 7 - request response");
  1172. createOpts.MQTTVersion = MQTTVERSION_5;
  1173. rc = MQTTClient_createWithOptions(&c, options.connection, "request_response",
  1174. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
  1175. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  1176. if (rc != MQTTCLIENT_SUCCESS)
  1177. {
  1178. MQTTClient_destroy(&c);
  1179. goto exit;
  1180. }
  1181. rc = MQTTClient_setCallbacks(c, NULL, NULL, test_request_response_messageArrived, NULL);
  1182. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  1183. opts.keepAliveInterval = 20;
  1184. opts.cleanstart = 1;
  1185. opts.MQTTVersion = options.MQTTVersion;
  1186. if (options.haconnections != NULL)
  1187. {
  1188. opts.serverURIs = options.haconnections;
  1189. opts.serverURIcount = options.hacount;
  1190. }
  1191. MyLog(LOGA_DEBUG, "Connecting");
  1192. response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
  1193. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  1194. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  1195. goto exit;
  1196. if (response.properties)
  1197. {
  1198. if (MQTTProperties_hasProperty(response.properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIERS_AVAILABLE))
  1199. subsids = MQTTProperties_getNumericValue(response.properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIERS_AVAILABLE);
  1200. MyLog(LOGA_INFO, "Connack properties:");
  1201. logProperties(response.properties);
  1202. MQTTResponse_free(response);
  1203. }
  1204. response = MQTTClient_subscribe5(c, test_request_response_globals.response_topic, 2, NULL, NULL);
  1205. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  1206. response = MQTTClient_subscribe5(c, test_request_response_globals.request_topic, 2, NULL, NULL);
  1207. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  1208. messages_arrived = 0;
  1209. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  1210. pubmsg.payloadlen = 11;
  1211. pubmsg.retained = 0;
  1212. pubmsg.qos = 2;
  1213. property.identifier = MQTTPROPERTY_CODE_RESPONSE_TOPIC;
  1214. property.value.data.data = test_request_response_globals.response_topic;
  1215. property.value.data.len = (int)strlen(property.value.data.data);
  1216. MQTTProperties_add(&pubmsg.properties, &property);
  1217. property.identifier = MQTTPROPERTY_CODE_CORRELATION_DATA;
  1218. property.value.data.data = test_request_response_globals.correlation_id;
  1219. property.value.data.len = (int)strlen(property.value.data.data);
  1220. MQTTProperties_add(&pubmsg.properties, &property);
  1221. response = MQTTClient_publishMessage5(c, test_request_response_globals.request_topic, &pubmsg, &dt);
  1222. assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
  1223. /* should get the request */
  1224. while (test_request_response_globals.messages_arrived < 1 && ++count < 10)
  1225. {
  1226. #if defined(_WIN32)
  1227. Sleep(1000);
  1228. #else
  1229. usleep(1000000L);
  1230. #endif
  1231. }
  1232. assert("1 message should have arrived", test_request_response_globals.messages_arrived == 1, "was %d",
  1233. test_request_response_globals.messages_arrived);
  1234. MQTTProperties_free(&pubmsg.properties);
  1235. property.identifier = MQTTPROPERTY_CODE_CORRELATION_DATA;
  1236. property.value.data.data = "request no 1";
  1237. property.value.data.len = (int)strlen(property.value.data.data);
  1238. MQTTProperties_add(&pubmsg.properties, &property);
  1239. response = MQTTClient_publishMessage5(c, test_request_response_globals.response_topic, &pubmsg, &dt);
  1240. assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
  1241. /* should get the response */
  1242. while (test_request_response_globals.messages_arrived < 1 && ++count < 10)
  1243. {
  1244. #if defined(_WIN32)
  1245. Sleep(1000);
  1246. #else
  1247. usleep(1000000L);
  1248. #endif
  1249. }
  1250. assert("1 message should have arrived", test_request_response_globals.messages_arrived == 1, "was %d",
  1251. test_request_response_globals.messages_arrived);
  1252. rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
  1253. MQTTProperties_free(&pubmsg.properties);
  1254. MQTTProperties_free(&subs_props);
  1255. MQTTProperties_free(&connect_props);
  1256. MQTTClient_destroy(&c);
  1257. exit:
  1258. MyLog(LOGA_INFO, "TEST7: test %s. %d tests run, %d failures.",
  1259. (failures == 0) ? "passed" : "failed", tests, failures);
  1260. write_test_result();
  1261. return failures;
  1262. }
  1263. struct
  1264. {
  1265. char* topic;
  1266. int messages_arrived;
  1267. } test_subscribe_options_globals =
  1268. {
  1269. "my response topic",
  1270. 0,
  1271. };
  1272. int test_subscribe_options_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
  1273. {
  1274. int subsidcount = 0, i = 0, subsid = -1;
  1275. MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
  1276. topicName, message->payloadlen, (char*)(message->payload));
  1277. assert("Message structure version should be 1", message->struct_version == 1,
  1278. "message->struct_version was %d", message->struct_version);
  1279. if (message->struct_version == 1)
  1280. {
  1281. const int props_count = 0;
  1282. MyLog(LOGA_INFO, "Message properties:");
  1283. logProperties(&message->properties);
  1284. }
  1285. test_subscribe_options_globals.messages_arrived++;
  1286. if (test_subscribe_options_globals.messages_arrived == 1)
  1287. {
  1288. subsidcount = MQTTProperties_propertyCount(&message->properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER);
  1289. assert("Subsidcount is i", subsidcount == 1, "subsidcount is not correct %d\n", subsidcount);
  1290. subsid = MQTTProperties_getNumericValueAt(&message->properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER, 0);
  1291. assert("Subsid is 2", subsid == 2, "subsid is not correct %d\n", subsid);
  1292. }
  1293. MQTTClient_free(topicName);
  1294. MQTTClient_freeMessage(&message);
  1295. return 1;
  1296. }
  1297. int test_subscribe_options(struct Options options)
  1298. {
  1299. int subsqos = 2;
  1300. MQTTClient c;
  1301. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
  1302. MQTTProperties connect_props = MQTTProperties_initializer;
  1303. MQTTProperties subs_props = MQTTProperties_initializer;
  1304. MQTTProperty property;
  1305. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  1306. MQTTResponse response = MQTTResponse_initializer;
  1307. MQTTClient_deliveryToken dt;
  1308. int rc = 0;
  1309. int count = 0;
  1310. const int msg_count = 1;
  1311. MQTTSubscribe_options subopts = MQTTSubscribe_options_initializer;
  1312. MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
  1313. fprintf(xml, "<testcase classname=\"test_subscribe_options\" name=\"subscribe options\"");
  1314. global_start_time = start_clock();
  1315. failures = 0;
  1316. MyLog(LOGA_INFO, "Starting test 8 - subscribe options");
  1317. createOpts.MQTTVersion = MQTTVERSION_5;
  1318. rc = MQTTClient_createWithOptions(&c, options.connection, "subscribe_options",
  1319. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
  1320. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  1321. if (rc != MQTTCLIENT_SUCCESS)
  1322. {
  1323. MQTTClient_destroy(&c);
  1324. goto exit;
  1325. }
  1326. rc = MQTTClient_setCallbacks(c, NULL, NULL, test_subscribe_options_messageArrived, NULL);
  1327. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  1328. opts.keepAliveInterval = 20;
  1329. opts.cleanstart = 1;
  1330. opts.MQTTVersion = options.MQTTVersion;
  1331. if (options.haconnections != NULL)
  1332. {
  1333. opts.serverURIs = options.haconnections;
  1334. opts.serverURIcount = options.hacount;
  1335. }
  1336. MyLog(LOGA_DEBUG, "Connecting");
  1337. response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
  1338. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  1339. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  1340. goto exit;
  1341. if (response.properties)
  1342. {
  1343. MyLog(LOGA_INFO, "Connack properties:");
  1344. logProperties(response.properties);
  1345. MQTTResponse_free(response);
  1346. }
  1347. property.identifier = MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER;
  1348. property.value.integer4 = 1;
  1349. MQTTProperties_add(&subs_props, &property);
  1350. subopts.noLocal = 1;
  1351. response = MQTTClient_subscribe5(c, test_subscribe_options_globals.topic, 2, &subopts, &subs_props);
  1352. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  1353. subs_props.array[0].value.integer4 = 2;
  1354. subopts.noLocal = 0;
  1355. subopts.retainHandling = 1;
  1356. response = MQTTClient_subscribe5(c, "#", 2, &subopts, &subs_props);
  1357. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  1358. messages_arrived = 0;
  1359. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  1360. pubmsg.payloadlen = 11;
  1361. pubmsg.retained = 0;
  1362. pubmsg.qos = 2;
  1363. response = MQTTClient_publishMessage5(c, test_subscribe_options_globals.topic, &pubmsg, &dt);
  1364. assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
  1365. /* should get the request */
  1366. while (test_subscribe_options_globals.messages_arrived < 1 && ++count < 10)
  1367. {
  1368. #if defined(_WIN32)
  1369. Sleep(1000);
  1370. #else
  1371. usleep(1000000L);
  1372. #endif
  1373. }
  1374. assert("1 message should have arrived", test_subscribe_options_globals.messages_arrived == 1, "was %d",
  1375. test_subscribe_options_globals.messages_arrived);
  1376. rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
  1377. MQTTProperties_free(&pubmsg.properties);
  1378. MQTTProperties_free(&subs_props);
  1379. MQTTProperties_free(&connect_props);
  1380. MQTTClient_destroy(&c);
  1381. exit:
  1382. MyLog(LOGA_INFO, "TEST8: test %s. %d tests run, %d failures.",
  1383. (failures == 0) ? "passed" : "failed", tests, failures);
  1384. write_test_result();
  1385. return failures;
  1386. }
  1387. struct
  1388. {
  1389. char* shared_topic;
  1390. char* topic;
  1391. int messages_arrived;
  1392. } test_shared_subscriptions_globals =
  1393. {
  1394. "$share/share_test/#",
  1395. "a",
  1396. 0,
  1397. };
  1398. MQTTClient c, d;
  1399. int test_shared_subscriptions_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message)
  1400. {
  1401. int subsidcount = 0, i = 0, subsid = -1;
  1402. MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.",
  1403. topicName, message->payloadlen, (char*)(message->payload));
  1404. assert("Message structure version should be 1", message->struct_version == 1,
  1405. "message->struct_version was %d", message->struct_version);
  1406. if (message->struct_version == 1)
  1407. {
  1408. const int props_count = 0;
  1409. if (message->properties.count > 0)
  1410. {
  1411. MyLog(LOGA_INFO, "Message properties:");
  1412. logProperties(&message->properties);
  1413. }
  1414. }
  1415. test_shared_subscriptions_globals.messages_arrived++;
  1416. MQTTClient_free(topicName);
  1417. MQTTClient_freeMessage(&message);
  1418. return 1;
  1419. }
  1420. int test_shared_subscriptions(struct Options options)
  1421. {
  1422. int subsqos = 2;
  1423. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5;
  1424. MQTTProperties connect_props = MQTTProperties_initializer;
  1425. MQTTProperties subs_props = MQTTProperties_initializer;
  1426. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  1427. MQTTResponse response = MQTTResponse_initializer;
  1428. MQTTClient_deliveryToken dt;
  1429. int rc = 0;
  1430. int count = 0;
  1431. const int msg_count = 1;
  1432. MQTTSubscribe_options subopts = MQTTSubscribe_options_initializer;
  1433. int i;
  1434. MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer;
  1435. fprintf(xml, "<testcase classname=\"test_shared_subscriptions\" name=\"shared subscriptions\"");
  1436. global_start_time = start_clock();
  1437. failures = 0;
  1438. MyLog(LOGA_INFO, "Starting test 8 - shared subscriptions");
  1439. createOpts.MQTTVersion = MQTTVERSION_5;
  1440. rc = MQTTClient_createWithOptions(&c, options.connection, "shared_subscriptions",
  1441. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
  1442. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  1443. if (rc != MQTTCLIENT_SUCCESS)
  1444. {
  1445. MQTTClient_destroy(&c);
  1446. goto exit;
  1447. }
  1448. rc = MQTTClient_createWithOptions(&d, options.connection, "shared_subscriptions_1",
  1449. MQTTCLIENT_PERSISTENCE_DEFAULT, NULL, &createOpts);
  1450. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  1451. if (rc != MQTTCLIENT_SUCCESS)
  1452. {
  1453. MQTTClient_destroy(&d);
  1454. goto exit;
  1455. }
  1456. rc = MQTTClient_setCallbacks(c, c, NULL, test_shared_subscriptions_messageArrived, NULL);
  1457. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  1458. rc = MQTTClient_setCallbacks(d, d, NULL, test_shared_subscriptions_messageArrived, NULL);
  1459. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  1460. opts.keepAliveInterval = 20;
  1461. opts.cleanstart = 1;
  1462. opts.MQTTVersion = options.MQTTVersion;
  1463. if (options.haconnections != NULL)
  1464. {
  1465. opts.serverURIs = options.haconnections;
  1466. opts.serverURIcount = options.hacount;
  1467. }
  1468. MyLog(LOGA_DEBUG, "Connecting");
  1469. response = MQTTClient_connect5(c, &opts, &connect_props, NULL);
  1470. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  1471. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  1472. goto exit;
  1473. if (response.properties)
  1474. {
  1475. MyLog(LOGA_INFO, "Connack properties:");
  1476. logProperties(response.properties);
  1477. MQTTResponse_free(response);
  1478. }
  1479. MyLog(LOGA_DEBUG, "Connecting");
  1480. response = MQTTClient_connect5(d, &opts, &connect_props, NULL);
  1481. assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode);
  1482. if (response.reasonCode != MQTTCLIENT_SUCCESS)
  1483. goto exit;
  1484. if (response.properties)
  1485. {
  1486. MyLog(LOGA_INFO, "Connack properties:");
  1487. logProperties(response.properties);
  1488. MQTTResponse_free(response);
  1489. }
  1490. response = MQTTClient_subscribe5(c, test_shared_subscriptions_globals.shared_topic, 2, &subopts, &subs_props);
  1491. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  1492. response = MQTTClient_subscribe5(d, test_shared_subscriptions_globals.shared_topic, 2, &subopts, &subs_props);
  1493. assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode);
  1494. messages_arrived = 0;
  1495. pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
  1496. pubmsg.payloadlen = 11;
  1497. pubmsg.retained = 0;
  1498. pubmsg.qos = 2;
  1499. test_shared_subscriptions_globals.messages_arrived = 0;
  1500. for (i = 0; i < 10; ++i)
  1501. {
  1502. response = MQTTClient_publishMessage5(c, test_shared_subscriptions_globals.topic, &pubmsg, &dt);
  1503. assert("Good rc from publish", response.reasonCode == MQTTREASONCODE_SUCCESS, "rc was %d", response.reasonCode);
  1504. /* should get the request */
  1505. while (test_shared_subscriptions_globals.messages_arrived < i+1 && ++count < 100)
  1506. {
  1507. #if defined(_WIN32)
  1508. Sleep(100);
  1509. #else
  1510. usleep(100000L);
  1511. #endif
  1512. }
  1513. assert("1 message should have arrived", test_shared_subscriptions_globals.messages_arrived == i+1, "was %d",
  1514. test_shared_subscriptions_globals.messages_arrived);
  1515. }
  1516. rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL);
  1517. MQTTProperties_free(&pubmsg.properties);
  1518. MQTTProperties_free(&subs_props);
  1519. MQTTProperties_free(&connect_props);
  1520. MQTTClient_destroy(&c);
  1521. exit:
  1522. MyLog(LOGA_INFO, "TEST9: test %s. %d tests run, %d failures.",
  1523. (failures == 0) ? "passed" : "failed", tests, failures);
  1524. write_test_result();
  1525. return failures;
  1526. }
  1527. int main(int argc, char** argv)
  1528. {
  1529. int rc = 0,
  1530. i;
  1531. int (*tests[])() = {NULL,
  1532. test_client_topic_aliases,
  1533. test_server_topic_aliases,
  1534. test_subscription_ids,
  1535. test_flow_control,
  1536. test_error_reporting,
  1537. test_qos_1_2_errors,
  1538. test_request_response,
  1539. test_subscribe_options,
  1540. test_shared_subscriptions
  1541. };
  1542. xml = fopen("TEST-test1.xml", "w");
  1543. fprintf(xml, "<testsuite name=\"test1\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
  1544. MQTTClient_setTraceCallback(test_flow_control_trace_callback);
  1545. getopts(argc, argv);
  1546. for (i = 0; i < options.iterations; ++i)
  1547. {
  1548. if (options.test_no == 0)
  1549. { /* run all the tests */
  1550. for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
  1551. rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
  1552. }
  1553. else
  1554. rc = tests[options.test_no](options); /* run just the selected test */
  1555. }
  1556. if (rc == 0)
  1557. MyLog(LOGA_INFO, "verdict pass");
  1558. else
  1559. MyLog(LOGA_INFO, "verdict fail");
  1560. fprintf(xml, "</testsuite>\n");
  1561. fclose(xml);
  1562. return rc;
  1563. }