sync_client_test.c 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133
  1. /*******************************************************************
  2. Copyright (c) 2013, 2020 IBM Corp.
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v2.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. https://www.eclipse.org/legal/epl-2.0/
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Ian Craggs - initial implementation and/or documentation
  12. *******************************************************************/
  13. #include "MQTTClient.h"
  14. #include <string.h>
  15. #include <stdlib.h>
  16. #if !defined(_WINDOWS)
  17. #include <sys/time.h>
  18. #include <sys/socket.h>
  19. #include <unistd.h>
  20. #include <errno.h>
  21. #else
  22. #include <winsock2.h>
  23. #include <ws2tcpip.h>
  24. #define MAXHOSTNAMELEN 256
  25. #define EAGAIN WSAEWOULDBLOCK
  26. #define EINTR WSAEINTR
  27. #define EINPROGRESS WSAEINPROGRESS
  28. #define EWOULDBLOCK WSAEWOULDBLOCK
  29. #define ENOTCONN WSAENOTCONN
  30. #define ECONNRESET WSAECONNRESET
  31. #endif
  32. #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
  33. char* topics[] = {"TopicA", "TopicA/B", "Topic/C", "TopicA/C", "/TopicA"};
  34. char* wildtopics[] = {"TopicA/+", "+/C", "#", "/#", "/+", "+/+", "TopicA/#"};
  35. char* nosubscribe_topics[] = {"nosubscribe",};
  36. struct Options
  37. {
  38. char* connection; /**< connection to system under test. */
  39. char* clientid1;
  40. char* clientid2;
  41. char* username;
  42. char* password;
  43. int verbose;
  44. int MQTTVersion;
  45. int iterations;
  46. int run_dollar_topics_test;
  47. int run_subscribe_failure_test;
  48. } options =
  49. {
  50. "tcp://localhost:1883",
  51. "myclientid",
  52. "myclientid2",
  53. NULL,
  54. NULL,
  55. 0,
  56. MQTTVERSION_3_1_1,
  57. 1,
  58. 0,
  59. 0,
  60. };
  61. void usage(void)
  62. {
  63. printf("options:\n connection, clientid1, clientid2, username, password, MQTTversion, iterations, verbose\n");
  64. exit(EXIT_FAILURE);
  65. }
  66. void getopts(int argc, char** argv)
  67. {
  68. int count = 1;
  69. while (count < argc)
  70. {
  71. if (strcmp(argv[count], "--dollar_topics_test") == 0 || strcmp(argv[count], "--$") == 0)
  72. {
  73. options.run_dollar_topics_test = 1;
  74. printf("Running $ topics test\n");
  75. }
  76. else if (strcmp(argv[count], "--subscribe_failure_test") == 0 || strcmp(argv[count], "-s") == 0)
  77. {
  78. options.run_subscribe_failure_test = 1;
  79. printf("Running subscribe failure test\n");
  80. }
  81. else if (strcmp(argv[count], "--connection") == 0)
  82. {
  83. if (++count < argc)
  84. {
  85. options.connection = argv[count];
  86. printf("Setting connection to %s\n", options.connection);
  87. }
  88. else
  89. usage();
  90. }
  91. else if (strcmp(argv[count], "--clientid1") == 0)
  92. {
  93. if (++count < argc)
  94. {
  95. options.clientid1 = argv[count];
  96. printf("Setting clientid1 to %s\n", options.clientid1);
  97. }
  98. else
  99. usage();
  100. }
  101. else if (strcmp(argv[count], "--clientid2") == 0)
  102. {
  103. if (++count < argc)
  104. {
  105. options.clientid2 = argv[count];
  106. printf("Setting clientid2 to %s\n", options.clientid2);
  107. }
  108. else
  109. usage();
  110. }
  111. else if (strcmp(argv[count], "--username") == 0)
  112. {
  113. if (++count < argc)
  114. {
  115. options.username = argv[count];
  116. printf("Setting username to %s\n", options.username);
  117. }
  118. else
  119. usage();
  120. }
  121. else if (strcmp(argv[count], "--password") == 0)
  122. {
  123. if (++count < argc)
  124. {
  125. options.password = argv[count];
  126. printf("Setting password to %s\n", options.password);
  127. }
  128. else
  129. usage();
  130. }
  131. else if (strcmp(argv[count], "--MQTTversion") == 0)
  132. {
  133. if (++count < argc)
  134. {
  135. options.MQTTVersion = atoi(argv[count]);
  136. printf("Setting MQTT version to %d\n", options.MQTTVersion);
  137. }
  138. else
  139. usage();
  140. }
  141. else if (strcmp(argv[count], "--iterations") == 0)
  142. {
  143. if (++count < argc)
  144. {
  145. options.iterations = atoi(argv[count]);
  146. printf("Setting iterations to %d\n", options.iterations);
  147. }
  148. else
  149. usage();
  150. }
  151. else if (strcmp(argv[count], "--verbose") == 0)
  152. {
  153. options.verbose = 1;
  154. printf("\nSetting verbose on\n");
  155. }
  156. count++;
  157. }
  158. }
  159. #if defined(_WIN32) || defined(_WINDOWS)
  160. #define msleep Sleep
  161. #define START_TIME_TYPE DWORD
  162. static DWORD start_time = 0;
  163. START_TIME_TYPE start_clock(void)
  164. {
  165. return GetTickCount();
  166. }
  167. #elif defined(AIX)
  168. #define mqsleep sleep
  169. #define START_TIME_TYPE struct timespec
  170. START_TIME_TYPE start_clock(void)
  171. {
  172. static struct timespec start;
  173. clock_gettime(CLOCK_REALTIME, &start);
  174. return start;
  175. }
  176. #else
  177. #define msleep(A) usleep(A*1000)
  178. #define START_TIME_TYPE struct timeval
  179. /* TODO - unused - remove? static struct timeval start_time; */
  180. START_TIME_TYPE start_clock(void)
  181. {
  182. struct timeval start_time;
  183. gettimeofday(&start_time, NULL);
  184. return start_time;
  185. }
  186. #endif
  187. #define LOGA_DEBUG 0
  188. #define LOGA_INFO 1
  189. #include <stdarg.h>
  190. #include <time.h>
  191. #include <sys/timeb.h>
  192. void MyLog(int LOGA_level, char* format, ...)
  193. {
  194. static char msg_buf[256];
  195. va_list args;
  196. #if defined(_WIN32) || defined(_WINDOWS)
  197. struct timeb ts;
  198. #else
  199. struct timeval ts;
  200. #endif
  201. struct tm timeinfo;
  202. if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
  203. return;
  204. #if defined(_WIN32) || defined(_WINDOWS)
  205. ftime(&ts);
  206. localtime_s(&timeinfo, &ts.time);
  207. #else
  208. gettimeofday(&ts, NULL);
  209. localtime_r(&ts.tv_sec, &timeinfo);
  210. #endif
  211. strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo);
  212. #if defined(_WIN32) || defined(_WINDOWS)
  213. sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
  214. #else
  215. sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000L);
  216. #endif
  217. va_start(args, format);
  218. vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
  219. va_end(args);
  220. printf("%s\n", msg_buf);
  221. fflush(stdout);
  222. }
  223. int tests = 0;
  224. int failures = 0;
  225. void myassert(char* filename, int lineno, char* description, int value, char* format, ...)
  226. {
  227. ++tests;
  228. if (!value)
  229. {
  230. int count;
  231. va_list args;
  232. ++failures;
  233. printf("Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description);
  234. va_start(args, format);
  235. count = vprintf(format, args);
  236. va_end(args);
  237. if (count)
  238. printf("\n");
  239. //cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
  240. // description, filename, lineno);
  241. }
  242. else
  243. MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description);
  244. }
  245. #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
  246. #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
  247. typedef struct
  248. {
  249. char* topicName;
  250. int topicLen;
  251. MQTTClient_message* m;
  252. } messageStruct;
  253. messageStruct messagesArrived[1000];
  254. int messageCount = 0;
  255. int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* m)
  256. {
  257. messagesArrived[messageCount].topicName = topicName;
  258. messagesArrived[messageCount].topicLen = topicLen;
  259. messagesArrived[messageCount++].m = m;
  260. MyLog(LOGA_DEBUG, "Callback: %d message received on topic %s is %.*s.",
  261. messageCount, topicName, m->payloadlen, (char*)(m->payload));
  262. return 1;
  263. }
  264. void clearMessages(void)
  265. {
  266. int i;
  267. for (i = 0; i < messageCount; ++i)
  268. {
  269. MQTTClient_free(messagesArrived[i].topicName);
  270. MQTTClient_freeMessage(&messagesArrived[i].m);
  271. }
  272. messageCount = 0;
  273. }
  274. void cleanup(void)
  275. {
  276. // clean all client state
  277. char* clientids[] = {options.clientid1, options.clientid2};
  278. int i, rc;
  279. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
  280. MQTTClient aclient;
  281. MyLog(LOGA_INFO, "Cleaning up");
  282. opts.keepAliveInterval = 20;
  283. opts.cleansession = 1;
  284. opts.username = options.username;
  285. opts.password = options.password;
  286. opts.MQTTVersion = options.MQTTVersion;
  287. for (i = 0; i < 2; ++i)
  288. {
  289. rc = MQTTClient_create(&aclient, options.connection, clientids[i], MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  290. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  291. rc = MQTTClient_connect(aclient, &opts);
  292. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  293. rc = MQTTClient_disconnect(aclient, 100);
  294. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  295. MQTTClient_destroy(&aclient);
  296. }
  297. // clean retained messages
  298. rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  299. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  300. rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
  301. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  302. rc = MQTTClient_connect(aclient, &opts);
  303. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  304. rc = MQTTClient_subscribe(aclient, "#", 0);
  305. assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  306. msleep(2000); // wait for all retained messages to arrive
  307. rc = MQTTClient_unsubscribe(aclient, "#");
  308. assert("Good rc from unsubscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  309. for (i = 0; i < messageCount; ++i)
  310. {
  311. if (messagesArrived[i].m->retained)
  312. {
  313. MyLog(LOGA_INFO, "Deleting retained message for topic %s", (char*)messagesArrived[i].topicName);
  314. rc = MQTTClient_publish(aclient, messagesArrived[i].topicName, 0, "", 0, 1, NULL);
  315. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  316. }
  317. }
  318. rc = MQTTClient_disconnect(aclient, 100);
  319. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  320. MQTTClient_destroy(&aclient);
  321. clearMessages();
  322. MyLog(LOGA_INFO, "Finished cleaning up");
  323. }
  324. int basic_test(void)
  325. {
  326. int i, rc;
  327. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
  328. MQTTClient aclient;
  329. MyLog(LOGA_INFO, "Starting basic test");
  330. tests = failures = 0;
  331. opts.keepAliveInterval = 20;
  332. opts.cleansession = 1;
  333. opts.username = options.username;
  334. opts.password = options.password;
  335. opts.MQTTVersion = options.MQTTVersion;
  336. rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  337. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  338. rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
  339. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  340. rc = MQTTClient_connect(aclient, &opts);
  341. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  342. rc = MQTTClient_disconnect(aclient, 100);
  343. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  344. rc = MQTTClient_connect(aclient, &opts);
  345. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  346. rc = MQTTClient_subscribe(aclient, topics[0], 0);
  347. assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  348. rc = MQTTClient_publish(aclient, topics[0], 5, "qos 0", 0, 0, NULL);
  349. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  350. rc = MQTTClient_publish(aclient, topics[0], 5, "qos 1", 1, 0, NULL);
  351. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  352. rc = MQTTClient_publish(aclient, topics[0], 5, "qos 2", 2, 0, NULL);
  353. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  354. msleep(1000);
  355. rc = MQTTClient_disconnect(aclient, 10000);
  356. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  357. assert("3 Messages received", messageCount == 3, "messageCount was %d", messageCount);
  358. clearMessages();
  359. /*opts.MQTTVersion = MQTTVERSION_3_1;
  360. rc = MQTTClient_connect(aclient, &opts); // should fail - wrong protocol version
  361. assert("Bad rc from connect", rc == MQTTCLIENT_FAILURE, "rc was %d", rc);*/
  362. MQTTClient_destroy(&aclient);
  363. MyLog(LOGA_INFO, "Basic test %s", (failures == 0) ? "succeeded" : "failed");
  364. return failures;
  365. }
  366. int offline_message_queueing_test(void)
  367. {
  368. int i, rc;
  369. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
  370. MQTTClient aclient;
  371. MQTTClient bclient;
  372. MyLog(LOGA_INFO, "Offline message queueing test");
  373. tests = failures = 0;
  374. opts.keepAliveInterval = 20;
  375. opts.cleansession = 0;
  376. opts.username = options.username;
  377. opts.password = options.password;
  378. opts.MQTTVersion = options.MQTTVersion;
  379. rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  380. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  381. rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
  382. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  383. rc = MQTTClient_connect(aclient, &opts);
  384. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  385. rc = MQTTClient_subscribe(aclient, wildtopics[5], 2);
  386. assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  387. rc = MQTTClient_disconnect(aclient, 100);
  388. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  389. rc = MQTTClient_create(&bclient, options.connection, options.clientid2, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  390. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  391. opts.cleansession = 1;
  392. rc = MQTTClient_connect(bclient, &opts);
  393. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  394. rc = MQTTClient_publish(bclient, topics[1], 5, "qos 0", 0, 0, NULL);
  395. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  396. rc = MQTTClient_publish(bclient, topics[2], 5, "qos 1", 1, 0, NULL);
  397. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  398. rc = MQTTClient_publish(bclient, topics[3], 5, "qos 2", 2, 0, NULL);
  399. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  400. msleep(2000);
  401. rc = MQTTClient_disconnect(bclient, 100);
  402. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  403. MQTTClient_destroy(&bclient);
  404. opts.cleansession = 0;
  405. rc = MQTTClient_connect(aclient, &opts);
  406. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  407. msleep(1000); // receive the queued messages
  408. rc = MQTTClient_disconnect(aclient, 100);
  409. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  410. MQTTClient_destroy(&aclient);
  411. assert("2 or 3 messages received", messageCount == 3 || messageCount == 2, "messageCount was %d", messageCount);
  412. MyLog(LOGA_INFO, "This server %s queueing QoS 0 messages for offline clients", (messageCount == 3) ? "is" : "is not");
  413. clearMessages();
  414. MyLog(LOGA_INFO, "Offline message queueing test %s", (failures == 0) ? "succeeded" : "failed");
  415. return failures;
  416. }
  417. int retained_message_test(void)
  418. {
  419. int i, rc;
  420. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
  421. MQTTClient aclient;
  422. MyLog(LOGA_INFO, "Retained message test");
  423. tests = failures = 0;
  424. opts.keepAliveInterval = 20;
  425. opts.cleansession = 1;
  426. opts.username = options.username;
  427. opts.password = options.password;
  428. opts.MQTTVersion = options.MQTTVersion;
  429. assert("0 messages received", messageCount == 0, "messageCount was %d", messageCount);
  430. // set retained messages
  431. rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  432. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  433. rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
  434. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  435. rc = MQTTClient_connect(aclient, &opts);
  436. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  437. rc = MQTTClient_publish(aclient, topics[1], 5, "qos 0", 0, 1, NULL);
  438. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  439. rc = MQTTClient_publish(aclient, topics[2], 5, "qos 1", 1, 1, NULL);
  440. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  441. rc = MQTTClient_publish(aclient, topics[3], 5, "qos 2", 2, 1, NULL);
  442. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  443. msleep(1000);
  444. rc = MQTTClient_subscribe(aclient, wildtopics[5], 2);
  445. assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  446. msleep(2000);
  447. rc = MQTTClient_disconnect(aclient, 100);
  448. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  449. assert("3 messages received", messageCount == 3, "messageCount was %d", messageCount);
  450. for (i = 0; i < messageCount; ++i)
  451. {
  452. assert("messages should be retained", messagesArrived[i].m->retained, "retained was %d",
  453. messagesArrived[i].m->retained);
  454. MQTTClient_free(messagesArrived[i].topicName);
  455. MQTTClient_freeMessage(&messagesArrived[i].m);
  456. }
  457. messageCount = 0;
  458. // clear retained messages
  459. rc = MQTTClient_connect(aclient, &opts);
  460. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  461. rc = MQTTClient_publish(aclient, topics[1], 0, "", 0, 1, NULL);
  462. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  463. rc = MQTTClient_publish(aclient, topics[2], 0, "", 1, 1, NULL);
  464. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  465. rc = MQTTClient_publish(aclient, topics[3], 0, "", 2, 1, NULL);
  466. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  467. msleep(200); // wait for QoS 2 exchange to be completed
  468. rc = MQTTClient_subscribe(aclient, wildtopics[5], 2);
  469. assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  470. msleep(200);
  471. rc = MQTTClient_disconnect(aclient, 100);
  472. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  473. assert("0 messages received", messageCount == 0, "messageCount was %d", messageCount);
  474. MQTTClient_destroy(&aclient);
  475. MyLog(LOGA_INFO, "Retained message test %s", (failures == 0) ? "succeeded" : "failed");
  476. return failures;
  477. }
  478. #define SOCKET_ERROR -1
  479. int test6_socket_error(char* aString, int sock)
  480. {
  481. #if defined(_WIN32)
  482. int errno;
  483. #endif
  484. #if defined(_WIN32)
  485. errno = WSAGetLastError();
  486. #endif
  487. if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
  488. {
  489. if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
  490. printf("Socket error %d in %s for socket %d", errno, aString, sock);
  491. }
  492. return errno;
  493. }
  494. int test6_socket_close(int socket)
  495. {
  496. int rc;
  497. #if defined(_WIN32)
  498. if (shutdown(socket, SD_BOTH) == SOCKET_ERROR)
  499. test6_socket_error("shutdown", socket);
  500. if ((rc = closesocket(socket)) == SOCKET_ERROR)
  501. test6_socket_error("close", socket);
  502. #else
  503. if (shutdown(socket, SHUT_RDWR) == SOCKET_ERROR)
  504. test6_socket_error("shutdown", socket);
  505. if ((rc = close(socket)) == SOCKET_ERROR)
  506. test6_socket_error("close", socket);
  507. #endif
  508. return rc;
  509. }
  510. typedef struct
  511. {
  512. int socket;
  513. time_t lastContact;
  514. #if defined(OPENSSL)
  515. SSL* ssl;
  516. SSL_CTX* ctx;
  517. #endif
  518. } networkHandles;
  519. typedef struct
  520. {
  521. char* clientID; /**< the string id of the client */
  522. char* username; /**< MQTT v3.1 user name */
  523. char* password; /**< MQTT v3.1 password */
  524. unsigned int cleansession : 1; /**< MQTT clean session flag */
  525. unsigned int connected : 1; /**< whether it is currently connected */
  526. unsigned int good : 1; /**< if we have an error on the socket we turn this off */
  527. unsigned int ping_outstanding : 1;
  528. int connect_state : 4;
  529. networkHandles net;
  530. /* ... */
  531. } Clients;
  532. typedef struct
  533. {
  534. char* serverURI;
  535. Clients* c;
  536. MQTTClient_connectionLost* cl;
  537. MQTTClient_messageArrived* ma;
  538. MQTTClient_deliveryComplete* dc;
  539. void* context;
  540. int connect_sem;
  541. int rc; /* getsockopt return code in connect */
  542. int connack_sem;
  543. int suback_sem;
  544. int unsuback_sem;
  545. void* pack;
  546. } MQTTClients;
  547. int will_message_test(void)
  548. {
  549. int i, rc, count = 0;
  550. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
  551. MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
  552. MQTTClient aclient, bclient;
  553. MyLog(LOGA_INFO, "Will message test");
  554. tests = failures = 0;
  555. opts.keepAliveInterval = 2;
  556. opts.cleansession = 1;
  557. opts.username = options.username;
  558. opts.password = options.password;
  559. opts.MQTTVersion = options.MQTTVersion;
  560. opts.will = &wopts;
  561. opts.will->message = "client not disconnected";
  562. opts.will->qos = 1;
  563. opts.will->retained = 0;
  564. opts.will->topicName = topics[2];
  565. rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  566. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  567. rc = MQTTClient_connect(aclient, &opts);
  568. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  569. rc = MQTTClient_create(&bclient, options.connection, options.clientid2, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  570. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  571. rc = MQTTClient_setCallbacks(bclient, NULL, NULL, messageArrived, NULL);
  572. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  573. opts.keepAliveInterval = 20;
  574. opts.will = NULL;
  575. rc = MQTTClient_connect(bclient, &opts);
  576. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  577. rc = MQTTClient_subscribe(bclient, topics[2], 2);
  578. assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  579. msleep(100);
  580. test6_socket_close(((MQTTClients*)aclient)->c->net.socket);
  581. while (messageCount == 0 && ++count < 10)
  582. msleep(1000);
  583. rc = MQTTClient_disconnect(bclient, 100);
  584. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  585. MQTTClient_destroy(&bclient);
  586. assert("will message received", messageCount == 1, "messageCount was %d", messageCount);
  587. rc = MQTTClient_disconnect(aclient, 100);
  588. MQTTClient_destroy(&aclient);
  589. MyLog(LOGA_INFO, "Will message test %s", (failures == 0) ? "succeeded" : "failed");
  590. return failures;
  591. }
  592. int overlapping_subscriptions_test(void)
  593. {
  594. /* overlapping subscriptions. When there is more than one matching subscription for the same client for a topic,
  595. the server may send back one message with the highest QoS of any matching subscription, or one message for
  596. each subscription with a matching QoS. */
  597. int i, rc;
  598. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
  599. MQTTClient aclient;
  600. char* topicList[] = {wildtopics[6], wildtopics[0]};
  601. int qosList[] = {2, 1};
  602. MyLog(LOGA_INFO, "Starting overlapping subscriptions test");
  603. clearMessages();
  604. tests = failures = 0;
  605. opts.keepAliveInterval = 20;
  606. opts.cleansession = 1;
  607. opts.username = options.username;
  608. opts.password = options.password;
  609. opts.MQTTVersion = options.MQTTVersion;
  610. rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  611. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  612. rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
  613. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  614. rc = MQTTClient_connect(aclient, &opts);
  615. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  616. rc = MQTTClient_subscribeMany(aclient, 2, topicList, qosList);
  617. assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  618. rc = MQTTClient_publish(aclient, topics[3], strlen("overlapping topic filters") + 1,
  619. "overlapping topic filters", 2, 0, NULL);
  620. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  621. msleep(1000);
  622. assert("1 or 2 messages received", messageCount == 1 || messageCount == 2, "messageCount was %d", messageCount);
  623. if (messageCount == 1)
  624. {
  625. MyLog(LOGA_INFO, "This server is publishing one message for all matching overlapping subscriptions, not one for each.");
  626. assert("QoS should be 2", messagesArrived[0].m->qos == 2, "QoS was %d", messagesArrived[0].m->qos);
  627. }
  628. else
  629. {
  630. MyLog(LOGA_INFO, "This server is publishing one message per each matching overlapping subscription.");
  631. assert1("QoSs should be 1 and 2",
  632. (messagesArrived[0].m->qos == 2 && messagesArrived[1].m->qos == 1) ||
  633. (messagesArrived[0].m->qos == 1 && messagesArrived[1].m->qos == 2),
  634. "QoSs were %d %d", messagesArrived[0].m->qos, messagesArrived[1].m->qos);
  635. }
  636. rc = MQTTClient_disconnect(aclient, 100);
  637. MQTTClient_destroy(&aclient);
  638. MyLog(LOGA_INFO, "Overlapping subscription test %s", (failures == 0) ? "succeeded" : "failed");
  639. return failures;
  640. }
  641. int keepalive_test(void)
  642. {
  643. /* keepalive processing. We should be kicked off by the server if we don't send or receive any data, and don't send
  644. any pings either. */
  645. int i, rc, count = 0;
  646. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
  647. MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer;
  648. MQTTClient aclient, bclient;
  649. MyLog(LOGA_INFO, "Starting keepalive test");
  650. tests = failures = 0;
  651. clearMessages();
  652. opts.cleansession = 1;
  653. opts.username = options.username;
  654. opts.password = options.password;
  655. opts.MQTTVersion = options.MQTTVersion;
  656. opts.will = &wopts;
  657. opts.will->message = "keepalive expiry";
  658. opts.will->qos = 1;
  659. opts.will->retained = 0;
  660. opts.will->topicName = topics[4];
  661. opts.keepAliveInterval = 20;
  662. rc = MQTTClient_create(&bclient, options.connection, options.clientid2, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  663. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  664. rc = MQTTClient_setCallbacks(bclient, NULL, NULL, messageArrived, NULL);
  665. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  666. rc = MQTTClient_connect(bclient, &opts);
  667. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  668. rc = MQTTClient_subscribe(bclient, topics[4], 2);
  669. assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  670. opts.keepAliveInterval = 2;
  671. rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  672. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  673. rc = MQTTClient_connect(aclient, &opts);
  674. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  675. while (messageCount == 0 && ++count < 20)
  676. msleep(1000);
  677. rc = MQTTClient_disconnect(bclient, 100);
  678. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  679. assert("Should have will message", messageCount == 1, "messageCount was %d", messageCount);
  680. rc = MQTTClient_disconnect(aclient, 100);
  681. MQTTClient_destroy(&aclient);
  682. MyLog(LOGA_INFO, "Keepalive test %s", (failures == 0) ? "succeeded" : "failed");
  683. return failures;
  684. }
  685. int redelivery_on_reconnect_test(void)
  686. {
  687. /* redelivery on reconnect. When a QoS 1 or 2 exchange has not been completed, the server should retry the
  688. appropriate MQTT packets */
  689. int i, rc, count = 0;
  690. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
  691. MQTTClient aclient;
  692. MyLog(LOGA_INFO, "Starting redelivery on reconnect test");
  693. tests = failures = 0;
  694. clearMessages();
  695. opts.keepAliveInterval = 0;
  696. opts.cleansession = 0;
  697. opts.username = options.username;
  698. opts.password = options.password;
  699. opts.MQTTVersion = options.MQTTVersion;
  700. rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  701. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  702. rc = MQTTClient_connect(aclient, &opts);
  703. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  704. rc = MQTTClient_subscribe(aclient, wildtopics[6], 2);
  705. assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  706. MQTTClient_yield();
  707. // no background processing because no callback has been set
  708. rc = MQTTClient_publish(aclient, topics[1], 6, "qos 1", 2, 0, NULL);
  709. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  710. rc = MQTTClient_publish(aclient, topics[3], 6, "qos 2", 2, 0, NULL);
  711. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  712. rc = MQTTClient_disconnect(aclient, 0);
  713. assert("No messages should have been received yet", messageCount == 0, "messageCount was %d", messageCount);
  714. rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
  715. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  716. rc = MQTTClient_connect(aclient, &opts);
  717. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  718. while (messageCount < 2 && ++count < 5)
  719. msleep(1000);
  720. assert("Should have 2 messages", messageCount == 2, "messageCount was %d", messageCount);
  721. rc = MQTTClient_disconnect(aclient, 100);
  722. MQTTClient_destroy(&aclient);
  723. MyLog(LOGA_INFO, "Redelivery on reconnect test %s", (failures == 0) ? "succeeded" : "failed");
  724. return failures;
  725. }
  726. int zero_length_clientid_test(void)
  727. {
  728. int i, rc, count = 0;
  729. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
  730. MQTTClient aclient;
  731. MyLog(LOGA_INFO, "Starting zero length clientid test");
  732. tests = failures = 0;
  733. clearMessages();
  734. opts.keepAliveInterval = 0;
  735. opts.cleansession = 0;
  736. opts.username = options.username;
  737. opts.password = options.password;
  738. opts.MQTTVersion = options.MQTTVersion;
  739. rc = MQTTClient_create(&aclient, options.connection, "", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  740. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  741. rc = MQTTClient_connect(aclient, &opts);
  742. assert("rc 2 from connect", rc == 2, "rc was %d", rc); // this should always fail
  743. opts.cleansession = 1;
  744. rc = MQTTClient_connect(aclient, &opts);
  745. assert("Connack rc should be 0 or 2", rc == MQTTCLIENT_SUCCESS || rc == 2, "rc was %d", rc);
  746. MyLog(LOGA_INFO, "This server %s support zero length clientids", (rc == 2) ? "does not" : "does");
  747. if (rc == MQTTCLIENT_SUCCESS)
  748. rc = MQTTClient_disconnect(aclient, 100);
  749. MQTTClient_destroy(&aclient);
  750. MyLog(LOGA_INFO, "Zero length clientid test %s", (failures == 0) ? "succeeded" : "failed");
  751. return failures;
  752. }
  753. int dollar_topics_test(void)
  754. {
  755. /* $ topics. The specification says that a topic filter which starts with a wildcard does not match topic names that
  756. begin with a $. Publishing to a topic which starts with a $ may not be allowed on some servers (which is entirely valid),
  757. so this test will not work and should be omitted in that case.
  758. */
  759. int i, rc, count = 0;
  760. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
  761. MQTTClient aclient;
  762. char dollartopic[20];
  763. MyLog(LOGA_INFO, "Starting $ topics test");
  764. sprintf(dollartopic, "$%s", topics[1]);
  765. clearMessages();
  766. opts.keepAliveInterval = 5;
  767. opts.cleansession = 1;
  768. opts.username = options.username;
  769. opts.password = options.password;
  770. opts.MQTTVersion = options.MQTTVersion;
  771. rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  772. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  773. rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
  774. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  775. rc = MQTTClient_connect(aclient, &opts);
  776. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  777. rc = MQTTClient_subscribe(aclient, wildtopics[5], 2);
  778. assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  779. msleep(1000); // wait for any retained messages, hopefully
  780. clearMessages();
  781. rc = MQTTClient_publish(aclient, topics[1], 20, "not sent to dollar topic", 1, 0, NULL);
  782. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  783. rc = MQTTClient_publish(aclient, dollartopic, 20, "sent to dollar topic", 1, 0, NULL);
  784. assert("Good rc from publish", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  785. msleep(1000);
  786. assert("Should have 1 message", messageCount == 1, "messageCount was %d", messageCount);
  787. rc = MQTTClient_disconnect(aclient, 100);
  788. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  789. MQTTClient_destroy(&aclient);
  790. MyLog(LOGA_INFO, "$ topics test %s", (failures == 0) ? "succeeded" : "failed");
  791. return failures;
  792. }
  793. int subscribe_failure_test(void)
  794. {
  795. /* Subscribe failure. A new feature of MQTT 3.1.1 is the ability to send back negative reponses to subscribe
  796. requests. One way of doing this is to subscribe to a topic which is not allowed to be subscribed to.
  797. */
  798. int i, rc, count = 0;
  799. MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
  800. MQTTClient aclient;
  801. int subqos = 2;
  802. MyLog(LOGA_INFO, "Starting subscribe failure test");
  803. clearMessages();
  804. opts.keepAliveInterval = 5;
  805. opts.cleansession = 1;
  806. opts.username = options.username;
  807. opts.password = options.password;
  808. opts.MQTTVersion = options.MQTTVersion;
  809. rc = MQTTClient_create(&aclient, options.connection, options.clientid1, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  810. assert("good rc from create", rc == MQTTCLIENT_SUCCESS, "rc was %d\n", rc);
  811. rc = MQTTClient_setCallbacks(aclient, NULL, NULL, messageArrived, NULL);
  812. assert("Good rc from setCallbacks", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  813. rc = MQTTClient_connect(aclient, &opts);
  814. assert("Good rc from connect", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  815. rc = MQTTClient_subscribeMany(aclient, 1, &nosubscribe_topics[0], &subqos);
  816. assert("Good rc from subscribe", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  817. assert("0x80 rc from subscribe", subqos == 0x80, "subqos was %d", subqos);
  818. rc = MQTTClient_disconnect(aclient, 100);
  819. assert("Disconnect successful", rc == MQTTCLIENT_SUCCESS, "rc was %d", rc);
  820. MQTTClient_destroy(&aclient);
  821. MyLog(LOGA_INFO, "Subscribe failure test %s", (failures == 0) ? "succeeded" : "failed");
  822. return failures;
  823. }
  824. int main(int argc, char** argv)
  825. {
  826. int i;
  827. int all_failures = 0;
  828. getopts(argc, argv);
  829. for (i = 0; i < options.iterations; ++i)
  830. {
  831. cleanup();
  832. all_failures += basic_test() +
  833. offline_message_queueing_test() +
  834. retained_message_test() +
  835. will_message_test() +
  836. overlapping_subscriptions_test() +
  837. keepalive_test() +
  838. redelivery_on_reconnect_test() +
  839. zero_length_clientid_test();
  840. if (options.run_dollar_topics_test)
  841. all_failures += dollar_topics_test();
  842. if (options.run_subscribe_failure_test)
  843. all_failures += subscribe_failure_test();
  844. }
  845. MyLog(LOGA_INFO, "Test suite %s", (all_failures == 0) ? "succeeded" : "failed");
  846. }