123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509 |
- /*******************************************************************************
- * Copyright (c) 2012, 2018 IBM Corp., and others
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v10.html
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Ian Craggs - initial contribution
- * Guilherme Maciel Ferreira - add keep alive option
- * Ian Craggs - add full capability
- *******************************************************************************/
- #include "MQTTAsync.h"
- #include "pubsub_opts.h"
- #include <stdio.h>
- #include <signal.h>
- #include <string.h>
- #include <stdlib.h>
- #if defined(WIN32)
- #include <windows.h>
- #define sleep Sleep
- #else
- #include <unistd.h>
- #include <sys/time.h>
- #include <unistd.h>
- #endif
- #if defined(_WRS_KERNEL)
- #include <OsWrapper.h>
- #endif
- volatile int toStop = 0;
- struct pubsub_opts opts =
- {
- 1, 0, 0, 0, "\n", 100, /* debug/app options */
- NULL, NULL, 1, 0, 0, /* message options */
- MQTTVERSION_DEFAULT, NULL, "paho-c-pub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */
- NULL, NULL, 0, 0, /* will options */
- 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */
- 0, {NULL, NULL}, /* MQTT V5 options */
- };
- MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
- MQTTProperty property;
- MQTTProperties props = MQTTProperties_initializer;
- void mysleep(int ms)
- {
- #if defined(WIN32)
- Sleep(ms);
- #else
- usleep(ms * 1000);
- #endif
- }
- void cfinish(int sig)
- {
- signal(SIGINT, NULL);
- toStop = 1;
- }
- int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
- {
- /* not expecting any messages */
- return 1;
- }
- static int disconnected = 0;
- void onDisconnect5(void* context, MQTTAsync_successData5* response)
- {
- disconnected = 1;
- }
- void onDisconnect(void* context, MQTTAsync_successData* response)
- {
- disconnected = 1;
- }
- static int connected = 0;
- void myconnect(MQTTAsync client);
- int mypublish(MQTTAsync client, int datalen, char* data);
- void onConnectFailure5(void* context, MQTTAsync_failureData5* response)
- {
- fprintf(stderr, "Connect failed, rc %s reason code %s\n",
- MQTTAsync_strerror(response->code),
- MQTTReasonCode_toString(response->reasonCode));
- connected = -1;
- MQTTAsync client = (MQTTAsync)context;
- }
- void onConnectFailure(void* context, MQTTAsync_failureData* response)
- {
- fprintf(stderr, "Connect failed, rc %s\n", response ? MQTTAsync_strerror(response->code) : "none");
- connected = -1;
- MQTTAsync client = (MQTTAsync)context;
- }
- void onConnect5(void* context, MQTTAsync_successData5* response)
- {
- MQTTAsync client = (MQTTAsync)context;
- int rc = 0;
- if (opts.verbose)
- printf("Connected\n");
- if (opts.null_message == 1)
- rc = mypublish(client, 0, "");
- else if (opts.message)
- rc = mypublish(client, (int)strlen(opts.message), opts.message);
- else if (opts.filename)
- {
- int data_len = 0;
- char* buffer = readfile(&data_len, &opts);
- if (buffer == NULL)
- toStop = 1;
- else
- {
- rc = mypublish(client, data_len, buffer);
- free(buffer);
- }
- }
- connected = 1;
- }
- void onConnect(void* context, MQTTAsync_successData* response)
- {
- MQTTAsync client = (MQTTAsync)context;
- int rc = 0;
- if (opts.verbose)
- printf("Connected\n");
- if (opts.null_message == 1)
- rc = mypublish(client, 0, "");
- else if (opts.message)
- rc = mypublish(client, (int)strlen(opts.message), opts.message);
- else if (opts.filename)
- {
- int data_len = 0;
- char* buffer = readfile(&data_len, &opts);
- if (buffer == NULL)
- toStop = 1;
- else
- {
- rc = mypublish(client, data_len, buffer);
- free(buffer);
- }
- }
- connected = 1;
- }
- static int published = 0;
- void onPublishFailure5(void* context, MQTTAsync_failureData5* response)
- {
- if (opts.verbose)
- fprintf(stderr, "Publish failed, rc %s reason code %s\n",
- MQTTAsync_strerror(response->code),
- MQTTReasonCode_toString(response->reasonCode));
- published = -1;
- }
- void onPublishFailure(void* context, MQTTAsync_failureData* response)
- {
- if (opts.verbose)
- fprintf(stderr, "Publish failed, rc %s\n", MQTTAsync_strerror(response->code));
- published = -1;
- }
- void onPublish5(void* context, MQTTAsync_successData5* response)
- {
- if (opts.verbose)
- printf("Publish succeeded, reason code %s\n",
- MQTTReasonCode_toString(response->reasonCode));
- if (opts.null_message || opts.message || opts.filename)
- toStop = 1;
- published = 1;
- }
- void onPublish(void* context, MQTTAsync_successData* response)
- {
- if (opts.verbose)
- printf("Publish succeeded\n");
- if (opts.null_message || opts.message || opts.filename)
- toStop = 1;
- published = 1;
- }
- static int onSSLError(const char *str, size_t len, void *context)
- {
- MQTTAsync client = (MQTTAsync)context;
- return fprintf(stderr, "SSL error: %s\n", str);
- }
- static unsigned int onPSKAuth(const char* hint,
- char* identity,
- unsigned int max_identity_len,
- unsigned char* psk,
- unsigned int max_psk_len,
- void* context)
- {
- int psk_len;
- int k, n;
- int rc = 0;
- struct pubsub_opts* opts = context;
- printf("Trying TLS-PSK auth with hint: %s\n", hint);
- if (opts->psk == NULL || opts->psk_identity == NULL)
- {
- printf("No PSK entered\n");
- goto exit;
- }
- /* psk should be array of bytes. This is a quick and dirty way to
- * convert hex to bytes without input validation */
- psk_len = strlen(opts->psk) / 2;
- if (psk_len > max_psk_len)
- {
- printf("PSK too long\n");
- goto exit;
- }
- for (k=0, n=0; k < psk_len; k++, n += 2)
- {
- sscanf(&opts->psk[n], "%2hhx", &psk[k]);
- }
- /* identity should be NULL terminated string */
- strncpy(identity, opts->psk_identity, max_identity_len);
- if (identity[max_identity_len - 1] != '\0')
- {
- printf("Identity too long\n");
- goto exit;
- }
- /* Function should return length of psk on success. */
- rc = psk_len;
- exit:
- return rc;
- }
- void myconnect(MQTTAsync client)
- {
- MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
- MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
- int rc = 0;
- if (opts.verbose)
- printf("Connecting\n");
- if (opts.MQTTVersion == MQTTVERSION_5)
- {
- MQTTAsync_connectOptions conn_opts5 = MQTTAsync_connectOptions_initializer5;
- conn_opts = conn_opts5;
- conn_opts.onSuccess5 = onConnect5;
- conn_opts.onFailure5 = onConnectFailure5;
- conn_opts.cleanstart = 1;
- }
- else
- {
- conn_opts.onSuccess = onConnect;
- conn_opts.onFailure = onConnectFailure;
- conn_opts.cleansession = 1;
- }
- conn_opts.keepAliveInterval = opts.keepalive;
- conn_opts.username = opts.username;
- conn_opts.password = opts.password;
- conn_opts.MQTTVersion = opts.MQTTVersion;
- conn_opts.context = client;
- conn_opts.automaticReconnect = 1;
- if (opts.will_topic) /* will options */
- {
- will_opts.message = opts.will_payload;
- will_opts.topicName = opts.will_topic;
- will_opts.qos = opts.will_qos;
- will_opts.retained = opts.will_retain;
- conn_opts.will = &will_opts;
- }
- if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
- strncmp(opts.connection, "wss://", 6) == 0))
- {
- if (opts.insecure)
- ssl_opts.verify = 0;
- else
- ssl_opts.verify = 1;
- ssl_opts.CApath = opts.capath;
- ssl_opts.keyStore = opts.cert;
- ssl_opts.trustStore = opts.cafile;
- ssl_opts.privateKey = opts.key;
- ssl_opts.privateKeyPassword = opts.keypass;
- ssl_opts.enabledCipherSuites = opts.ciphers;
- ssl_opts.ssl_error_cb = onSSLError;
- ssl_opts.ssl_error_context = client;
- ssl_opts.ssl_psk_cb = onPSKAuth;
- ssl_opts.ssl_psk_context = &opts;
- conn_opts.ssl = &ssl_opts;
- }
- connected = 0;
- if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
- {
- fprintf(stderr, "Failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
- exit(EXIT_FAILURE);
- }
- }
- int mypublish(MQTTAsync client, int datalen, char* data)
- {
- int rc;
- if (opts.verbose)
- printf("Publishing data of length %d\n", datalen);
- rc = MQTTAsync_send(client, opts.topic, datalen, data, opts.qos, opts.retained, &pub_opts);
- if (opts.verbose && rc != MQTTASYNC_SUCCESS && !opts.quiet)
- fprintf(stderr, "Error from MQTTAsync_send: %s\n", MQTTAsync_strerror(rc));
- return rc;
- }
- void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
- {
- fprintf(stderr, "Trace : %d, %s\n", level, message);
- }
- int main(int argc, char** argv)
- {
- MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
- MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
- MQTTAsync client;
- char* buffer = NULL;
- char* url = NULL;
- int rc = 0;
- const char* version = NULL;
- const char* program_name = "paho_c_pub";
- MQTTAsync_nameValue* infos = MQTTAsync_getVersionInfo();
- #if !defined(WIN32)
- struct sigaction sa;
- #endif
- if (argc < 2)
- usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
- if (getopts(argc, argv, &opts) != 0)
- usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
- if (opts.connection)
- url = opts.connection;
- else
- {
- url = malloc(100);
- sprintf(url, "%s:%s", opts.host, opts.port);
- }
- if (opts.verbose)
- printf("URL is %s\n", url);
- if (opts.tracelevel > 0)
- {
- MQTTAsync_setTraceCallback(trace_callback);
- MQTTAsync_setTraceLevel(opts.tracelevel);
- }
- create_opts.sendWhileDisconnected = 1;
- if (opts.MQTTVersion >= MQTTVERSION_5)
- create_opts.MQTTVersion = MQTTVERSION_5;
- rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
- if (rc != MQTTASYNC_SUCCESS)
- {
- if (!opts.quiet)
- fprintf(stderr, "Failed to create client, return code: %s\n", MQTTAsync_strerror(rc));
- exit(EXIT_FAILURE);
- }
- #if defined(WIN32)
- signal(SIGINT, cfinish);
- signal(SIGTERM, cfinish);
- #else
- memset(&sa, 0, sizeof(struct sigaction));
- sa.sa_handler = cfinish;
- sa.sa_flags = 0;
- sigaction(SIGINT, &sa, NULL);
- sigaction(SIGTERM, &sa, NULL);
- #endif
- rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL);
- if (rc != MQTTASYNC_SUCCESS)
- {
- if (!opts.quiet)
- fprintf(stderr, "Failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc));
- exit(EXIT_FAILURE);
- }
- if (opts.MQTTVersion >= MQTTVERSION_5)
- {
- pub_opts.onSuccess5 = onPublish5;
- pub_opts.onFailure5 = onPublishFailure5;
- if (opts.message_expiry > 0)
- {
- property.identifier = MQTTPROPERTY_CODE_MESSAGE_EXPIRY_INTERVAL;
- property.value.integer4 = opts.message_expiry;
- MQTTProperties_add(&props, &property);
- }
- if (opts.user_property.name)
- {
- property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY;
- property.value.data.data = opts.user_property.name;
- property.value.data.len = (int)strlen(opts.user_property.name);
- property.value.value.data = opts.user_property.value;
- property.value.value.len = (int)strlen(opts.user_property.value);
- MQTTProperties_add(&props, &property);
- }
- pub_opts.properties = props;
- }
- else
- {
- pub_opts.onSuccess = onPublish;
- pub_opts.onFailure = onPublishFailure;
- }
- myconnect(client);
- while (!toStop)
- {
- int data_len = 0;
- int delim_len = 0;
- if (opts.stdin_lines)
- {
- buffer = malloc(opts.maxdatalen);
- delim_len = (int)strlen(opts.delimiter);
- do
- {
- buffer[data_len++] = getchar();
- if (data_len > delim_len)
- {
- if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
- break;
- }
- } while (data_len < opts.maxdatalen);
- rc = mypublish(client, data_len, buffer);
- }
- else
- mysleep(100);
- }
- if (opts.message == 0 && opts.null_message == 0 && opts.filename == 0)
- free(buffer);
- if (opts.MQTTVersion >= MQTTVERSION_5)
- disc_opts.onSuccess5 = onDisconnect5;
- else
- disc_opts.onSuccess = onDisconnect;
- if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
- {
- if (!opts.quiet)
- fprintf(stderr, "Failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
- exit(EXIT_FAILURE);
- }
- while (!disconnected)
- mysleep(100);
- MQTTAsync_destroy(&client);
- return EXIT_SUCCESS;
- }
|