test.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. /* ----header---- */
  2. /* common header */
  3. #include "ros/ros.h"
  4. #include <ros/callback_queue.h>
  5. #include <boost/circular_buffer.hpp>
  6. #include <vector>
  7. #include <stdio.h>
  8. #include <stdlib.h>
  9. #include <string.h>
  10. #include <signal.h>
  11. #include <sys/stat.h>
  12. #include <sys/select.h>
  13. #include <mqueue.h>
  14. #include <fcntl.h>
  15. #include <errno.h>
  16. #include <unistd.h>
  17. #include <pthread.h>
  18. #include "t_sync_message.h"
  19. /* user header */
  20. #include "cv_tracker/obj_label.h"
  21. #include "autoware_msgs/Centroids.h"
  22. #include "visualization_msgs/MarkerArray.h"
  23. /* ----mode---- */
  24. #define _REQ_PUB 1
  25. /* ----var---- */
  26. /* common var */
  27. bool buf_flag;
  28. pthread_mutex_t mutex;
  29. /* user var */
  30. boost::circular_buffer<cv_tracker::obj_label> obj_label_ringbuf(10);
  31. boost::circular_buffer<autoware_msgs::Centroids> cluster_centroids_ringbuf(10);
  32. ros::Publisher obj_label__pub;
  33. ros::Publisher cluster_centroids__pub;
  34. bool obj_pose_flag;
  35. /* ----function---- */
  36. double fabs_time_diff(std_msgs::Header *timespec1, std_msgs::Header *timespec2) {
  37. double time1 = (double)timespec1->stamp.sec + (double)timespec1->stamp.nsec/1000000000L;
  38. double time2 = (double)timespec2->stamp.sec + (double)timespec2->stamp.nsec/1000000000L;
  39. return fabs(time1 - time2);
  40. }
  41. double get_time(const std_msgs::Header *timespec) {
  42. return (double)timespec->stamp.sec + (double)timespec->stamp.nsec/1000000000L;
  43. }
  44. #if _REQ_PUB
  45. cv_tracker::obj_label* p_obj_label_buf;
  46. autoware_msgs::Centroids* p_cluster_centroids_buf;
  47. void publish_msg(cv_tracker::obj_label* p_obj_label_buf, autoware_msgs::Centroids* p_cluster_centroids_buf)
  48. {
  49. ROS_INFO("publish");
  50. obj_label__pub.publish(*p_obj_label_buf);
  51. cluster_centroids__pub.publish(*p_cluster_centroids_buf);
  52. }
  53. bool publish() {
  54. if (buf_flag) {
  55. //obj_label is empty
  56. if (obj_label_ringbuf.begin() == obj_label_ringbuf.end()) {
  57. ROS_INFO("obj_label ring buffer is empty");
  58. return false;
  59. }
  60. //cluster_centroids is empty
  61. if (cluster_centroids_ringbuf.begin() == cluster_centroids_ringbuf.end()) {
  62. ROS_INFO("cluster_centroids ring buffer is empty");
  63. return false;
  64. }
  65. // obj_label > cluster_centroids
  66. if (get_time(&(obj_label_ringbuf.front().header)) >= get_time(&(cluster_centroids_ringbuf.front().header))) {
  67. p_cluster_centroids_buf = &(cluster_centroids_ringbuf.front());
  68. boost::circular_buffer<cv_tracker::obj_label>::iterator it = obj_label_ringbuf.begin();
  69. if (obj_label_ringbuf.size() == 1) {
  70. p_obj_label_buf = &*it;
  71. publish_msg(p_obj_label_buf, p_cluster_centroids_buf);
  72. if (obj_pose_flag == true){
  73. buf_flag = false;
  74. obj_pose_flag = false;
  75. obj_label_ringbuf.clear();
  76. cluster_centroids_ringbuf.clear();
  77. }
  78. return true;
  79. } else {
  80. for (it++; it != obj_label_ringbuf.end(); it++) {
  81. if (fabs_time_diff(&(cluster_centroids_ringbuf.front().header), &((it-1)->header))
  82. < fabs_time_diff(&(cluster_centroids_ringbuf.front().header), &(it->header))) {
  83. p_obj_label_buf = &*(it-1);
  84. break;
  85. }
  86. }
  87. if (it == obj_label_ringbuf.end()) {
  88. p_obj_label_buf = &(obj_label_ringbuf.back());
  89. }
  90. }
  91. }
  92. // obj_label < cluster_centroids
  93. else {
  94. p_obj_label_buf = &(obj_label_ringbuf.front());
  95. boost::circular_buffer<autoware_msgs::Centroids>::iterator it = cluster_centroids_ringbuf.begin();
  96. if (cluster_centroids_ringbuf.size() == 1) {
  97. p_cluster_centroids_buf = &*it;
  98. publish_msg(p_obj_label_buf, p_cluster_centroids_buf);
  99. if (obj_pose_flag == true){
  100. buf_flag = false;
  101. obj_pose_flag = false;
  102. obj_label_ringbuf.clear();
  103. cluster_centroids_ringbuf.clear();
  104. }
  105. return true;
  106. }
  107. for (it++; it != cluster_centroids_ringbuf.end(); it++) {
  108. if (fabs_time_diff(&(obj_label_ringbuf.front().header), &((it-1)->header))
  109. < fabs_time_diff(&(obj_label_ringbuf.front().header), &(it->header))) {
  110. p_cluster_centroids_buf = &*(it-1);
  111. break;
  112. }
  113. }
  114. if (it == cluster_centroids_ringbuf.end()) {
  115. p_cluster_centroids_buf = &(cluster_centroids_ringbuf.back());
  116. }
  117. }
  118. publish_msg(p_obj_label_buf, p_cluster_centroids_buf);
  119. if (obj_pose_flag == true){
  120. buf_flag = false;
  121. obj_pose_flag = false;
  122. obj_label_ringbuf.clear();
  123. cluster_centroids_ringbuf.clear();
  124. }
  125. return true;
  126. } else {
  127. return false;
  128. }
  129. }
  130. void obj_label_callback(const cv_tracker::obj_label::ConstPtr& obj_label_msg) {
  131. pthread_mutex_lock(&mutex);
  132. obj_label_ringbuf.push_front(*obj_label_msg);
  133. //cluster_centroids is empty
  134. if (cluster_centroids_ringbuf.begin() == cluster_centroids_ringbuf.end()) {
  135. pthread_mutex_unlock(&mutex);
  136. ROS_INFO("cluster_centroids ring buffer is empty");
  137. return;
  138. }
  139. buf_flag = true;
  140. pthread_mutex_unlock(&mutex);
  141. pthread_mutex_lock(&mutex);
  142. if (obj_pose_flag == true) {
  143. publish();
  144. }
  145. pthread_mutex_unlock(&mutex);
  146. }
  147. void cluster_centroids_callback(const autoware_msgs::Centroids::ConstPtr& cluster_centroids_msg) {
  148. pthread_mutex_lock(&mutex);
  149. cluster_centroids_ringbuf.push_front(*cluster_centroids_msg);
  150. //obj_label is empty
  151. if (obj_label_ringbuf.begin() == obj_label_ringbuf.end()) {
  152. ROS_INFO("obj_label ring buffer is empty");
  153. pthread_mutex_unlock(&mutex);
  154. return;
  155. }
  156. buf_flag = true;
  157. pthread_mutex_unlock(&mutex);
  158. pthread_mutex_lock(&mutex);
  159. if (obj_pose_flag == true) {
  160. publish();
  161. }
  162. pthread_mutex_unlock(&mutex);
  163. }
  164. #else
  165. cv_tracker::obj_label obj_label_buf;
  166. autoware_msgs::Centroids cluster_centroids_buf;
  167. bool publish() {
  168. if (buf_flag) {
  169. ROS_INFO("publish");
  170. obj_label__pub.publish(obj_label_buf);
  171. cluster_centroids__pub.publish(cluster_centroids_buf);
  172. if (obj_pose_flag == true){
  173. buf_flag = false;
  174. obj_pose_flag = false;
  175. obj_label_ringbuf.clear();
  176. cluster_centroids_ringbuf.clear();
  177. }
  178. return true;
  179. } else {
  180. ROS_INFO("publish failed");
  181. return false;
  182. }
  183. }
  184. void obj_label_callback(const cv_tracker::obj_label::ConstPtr& obj_label_msg) {
  185. pthread_mutex_lock(&mutex);
  186. obj_label_ringbuf.push_front(*obj_label_msg);
  187. //cluster_centroids is empty
  188. if (cluster_centroids_ringbuf.begin() == cluster_centroids_ringbuf.end()) {
  189. pthread_mutex_unlock(&mutex);
  190. ROS_INFO("cluster_centroids ring buffer is empty");
  191. return;
  192. }
  193. buf_flag = true;
  194. // obj_label > cluster_centroids
  195. if (get_time(&(obj_label_ringbuf.front().header)) >= get_time(&(cluster_centroids_ringbuf.front().header))) {
  196. cluster_centroids_buf = cluster_centroids_ringbuf.front();
  197. boost::circular_buffer<cv_tracker::obj_label>::iterator it = obj_label_ringbuf.begin();
  198. if (obj_label_ringbuf.size() == 1) {
  199. obj_label_buf = *it;
  200. pthread_mutex_unlock(&mutex);
  201. pthread_mutex_lock(&mutex);
  202. if (obj_pose_flag == true) {
  203. publish();
  204. }
  205. pthread_mutex_unlock(&mutex);
  206. return;
  207. } else {
  208. for (it++; it != obj_label_ringbuf.end(); it++) {
  209. if (fabs_time_diff(&(cluster_centroids_ringbuf.front().header), &((it-1)->header))
  210. < fabs_time_diff(&(cluster_centroids_ringbuf.front().header), &(it->header))) {
  211. obj_label_buf = *(it-1);
  212. break;
  213. }
  214. }
  215. if (it == obj_label_ringbuf.end()) {
  216. obj_label_buf = obj_label_ringbuf.back();
  217. }
  218. }
  219. } else {
  220. obj_label_buf = obj_label_ringbuf.front();
  221. boost::circular_buffer<autoware_msgs::Centroids>::iterator it = cluster_centroids_ringbuf.begin();
  222. if (cluster_centroids_ringbuf.size() == 1) {
  223. cluster_centroids_buf = *it;
  224. pthread_mutex_unlock(&mutex);
  225. pthread_mutex_lock(&mutex);
  226. if (obj_pose_flag == true) {
  227. publish();
  228. }
  229. pthread_mutex_unlock(&mutex);
  230. return;
  231. }
  232. for (it++; it != cluster_centroids_ringbuf.end(); it++) {
  233. if (fabs_time_diff(&(obj_label_ringbuf.front().header), &((it-1)->header))
  234. < fabs_time_diff(&(obj_label_ringbuf.front().header), &(it->header))) {
  235. cluster_centroids_buf = *(it-1);
  236. break;
  237. }
  238. }
  239. if (it == cluster_centroids_ringbuf.end()) {
  240. cluster_centroids_buf = cluster_centroids_ringbuf.back();
  241. }
  242. }
  243. pthread_mutex_unlock(&mutex);
  244. }
  245. void cluster_centroids_callback(const autoware_msgs::Centroids::ConstPtr& cluster_centroids_msg) {
  246. pthread_mutex_lock(&mutex);
  247. cluster_centroids_ringbuf.push_front(*cluster_centroids_msg);
  248. //obj_label is empty
  249. if (obj_label_ringbuf.begin() == obj_label_ringbuf.end()) {
  250. ROS_INFO("obj_label ring buffer is empty");
  251. pthread_mutex_unlock(&mutex);
  252. return;
  253. }
  254. buf_flag = true;
  255. // obj_label > cluster_centroids
  256. if (get_time(&(obj_label_ringbuf.front().header)) >= get_time(&(cluster_centroids_ringbuf.front().header))) {
  257. cluster_centroids_buf = cluster_centroids_ringbuf.front();
  258. boost::circular_buffer<cv_tracker::obj_label>::iterator it = obj_label_ringbuf.begin();
  259. if (obj_label_ringbuf.size() == 1) {
  260. obj_label_buf = *it;
  261. pthread_mutex_unlock(&mutex);
  262. pthread_mutex_lock(&mutex);
  263. if (obj_pose_flag == true) {
  264. publish();
  265. }
  266. pthread_mutex_unlock(&mutex);
  267. return;
  268. } else {
  269. for (it++; it != obj_label_ringbuf.end(); it++) {
  270. if (fabs_time_diff(&(cluster_centroids_ringbuf.front().header), &((it-1)->header))
  271. < fabs_time_diff(&(cluster_centroids_ringbuf.front().header), &(it->header))) {
  272. obj_label_buf = *(it-1);
  273. break;
  274. }
  275. }
  276. if (it == obj_label_ringbuf.end()) {
  277. obj_label_buf = obj_label_ringbuf.back();
  278. }
  279. }
  280. } else {
  281. obj_label_buf = obj_label_ringbuf.front();
  282. boost::circular_buffer<autoware_msgs::Centroids>::iterator it = cluster_centroids_ringbuf.begin();
  283. if (cluster_centroids_ringbuf.size() == 1) {
  284. cluster_centroids_buf = *it;
  285. pthread_mutex_unlock(&mutex);
  286. pthread_mutex_lock(&mutex);
  287. if (obj_pose_flag == true) {
  288. publish();
  289. }
  290. pthread_mutex_unlock(&mutex);
  291. return;
  292. }
  293. for (it++; it != cluster_centroids_ringbuf.end(); it++) {
  294. if (fabs_time_diff(&(obj_label_ringbuf.front().header), &((it-1)->header))
  295. < fabs_time_diff(&(obj_label_ringbuf.front().header), &(it->header))) {
  296. cluster_centroids_buf = *(it-1);
  297. break;
  298. }
  299. }
  300. if (it == cluster_centroids_ringbuf.end()) {
  301. cluster_centroids_buf = cluster_centroids_ringbuf.back();
  302. }
  303. }
  304. pthread_mutex_unlock(&mutex);
  305. }
  306. #endif
  307. void obj_pose_callback(const visualization_msgs::MarkerArray::ConstPtr& obj_pose_msg) {
  308. pthread_mutex_lock(&mutex);
  309. obj_pose_flag = true;
  310. ROS_INFO("catch publish request");
  311. if (publish() == false) {
  312. ROS_INFO("waitting...");
  313. }
  314. pthread_mutex_unlock(&mutex);}
  315. void* thread(void* args)
  316. {
  317. ros::NodeHandle nh_rcv;
  318. ros::CallbackQueue rcv_callbackqueue;
  319. nh_rcv.setCallbackQueue(&rcv_callbackqueue);
  320. ros::Subscriber obj_pose_sub = nh_rcv.subscribe("obj_pose", 5, obj_pose_callback);
  321. while (nh_rcv.ok()) {
  322. rcv_callbackqueue.callAvailable(ros::WallDuration(1.0f));
  323. pthread_mutex_lock(&mutex);
  324. bool flag = (obj_pose_flag == false && buf_flag == true);
  325. if (flag) {
  326. ROS_INFO("timeout");
  327. if(!publish()) {
  328. /* when to publish is failure, republish */
  329. struct timespec sleep_time;
  330. sleep_time.tv_sec = 0;
  331. sleep_time.tv_nsec = 200000000; //5Hz
  332. while (!publish() || ros::ok())
  333. nanosleep(&sleep_time, NULL);
  334. }
  335. }
  336. pthread_mutex_unlock(&mutex);
  337. }
  338. return NULL;
  339. }
  340. int main(int argc, char **argv) {
  341. ros::init(argc, argv, "sync_car_obj_fusion");
  342. ros::NodeHandle nh;
  343. /* create server thread */
  344. pthread_t th;
  345. pthread_create(&th, NULL, thread, (void *)NULL );
  346. ros::Subscriber obj_label_sub = nh.subscribe("/obj_car/obj_label", 1, obj_label_callback);
  347. ros::Subscriber cluster_centroids_sub = nh.subscribe("/cluster_centroids", 1, cluster_centroids_callback);
  348. obj_label__pub = nh.advertise<cv_tracker::obj_label>("/obj_car/obj_label_", 5);
  349. cluster_centroids__pub = nh.advertise<autoware_msgs::Centroids>("/cluster_centroids_", 5);
  350. while (!buf_flag) {
  351. ros::spinOnce();
  352. usleep(100000);
  353. }
  354. pthread_mutex_lock(&mutex);
  355. if(!publish()) {
  356. /* when to publish is failure, republish */
  357. struct timespec sleep_time;
  358. sleep_time.tv_sec = 0;
  359. sleep_time.tv_nsec = 200000000; //5Hz
  360. while (!publish() || ros::ok())
  361. nanosleep(&sleep_time, NULL);
  362. }
  363. pthread_mutex_lock(&mutex);
  364. ros::spin();
  365. /* shutdown server thread */
  366. ROS_INFO("wait until shutdown a thread");
  367. pthread_kill(th, SIGINT);
  368. pthread_join(th, NULL);
  369. return 0;
  370. }