sync_impl.hpp 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. /* ----header---- */
  2. /* common header */
  3. #include "ros/ros.h"
  4. #include "std_msgs/Header.h"
  5. #include <std_msgs/Float64.h>
  6. #include <ros/callback_queue.h>
  7. #include <boost/circular_buffer.hpp>
  8. #include <vector>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <signal.h>
  13. #include <sys/stat.h>
  14. #include <sys/select.h>
  15. #include <mqueue.h>
  16. #include <fcntl.h>
  17. #include <errno.h>
  18. #include <unistd.h>
  19. #include <pthread.h>
  20. #define PRINT
  21. extern "C" double fabs_time_diff(std_msgs::Header *timespec1, std_msgs::Header *timespec2) {
  22. double time1 = (double)timespec1->stamp.sec + (double)timespec1->stamp.nsec/1000000000L;
  23. double time2 = (double)timespec2->stamp.sec + (double)timespec2->stamp.nsec/1000000000L;
  24. return fabs(time1 - time2);
  25. }
  26. extern "C" double get_time(const std_msgs::Header *timespec) {
  27. return (double)timespec->stamp.sec + (double)timespec->stamp.nsec/1000000000L;
  28. }
  29. template<typename T1, typename T2, typename T3>
  30. Synchronizer<T1, T2, T3>::Synchronizer(const std::string sub1_topic, const std::string sub2_topic, const std::string pub1_topic, const std::string pub2_topic, const std::string req_topic, const std::string ns) :
  31. type1_ringbuf_(10), type2_ringbuf_(10)
  32. {
  33. /* init */
  34. pthread_mutex_init(&mutex_, NULL);
  35. ros::NodeHandle nh;
  36. buf_flag_ = false;
  37. is_req_ = false;
  38. type1_sub_ = nh.subscribe(sub1_topic, 1, &Synchronizer::type1_callback, this);
  39. type2_sub_ = nh.subscribe(sub2_topic, 1, &Synchronizer::type2_callback, this);
  40. #if 0
  41. ros::NodeHandle nh_rcv;
  42. nh_rcv.setCallbackQueue(&rcv_callbackqueue_);
  43. req_sub_ = nh_rcv.subscribe(req_topic, 1, &Synchronizer::req_callback, this);
  44. #endif
  45. req_topic_ = req_topic;
  46. type1_pub_ = nh.advertise<T1>(ns+pub1_topic, 5);
  47. type2_pub_ = nh.advertise<T2>(ns+pub2_topic, 5);
  48. sync_time_diff_pub_ = nh.advertise<std_msgs::Float64>("/"+ns+"/time_diff", 5);
  49. }
  50. template<typename T1, typename T2, typename T3>
  51. void Synchronizer<T1, T2, T3>::run() {
  52. /* create server thread */
  53. pthread_t th;
  54. pthread_create(&th, NULL, launchThread, this);
  55. while (!buf_flag_) {
  56. ros::spinOnce();
  57. usleep(100000);
  58. if(!ros::ok())
  59. return;
  60. }
  61. pthread_mutex_lock(&mutex_);
  62. if(!publish()) {
  63. /* when to publish is failure, republish */
  64. struct timespec sleep_time;
  65. sleep_time.tv_sec = 0;
  66. sleep_time.tv_nsec = 200000000; //5Hz
  67. while (!publish() && ros::ok())
  68. nanosleep(&sleep_time, NULL);
  69. }
  70. pthread_mutex_unlock(&mutex_);
  71. ros::spin();
  72. /* shutdown server thread */
  73. ROS_DEBUG("wait until shutdown a thread");
  74. pthread_mutex_trylock(&mutex_);
  75. pthread_mutex_unlock(&mutex_);
  76. pthread_kill(th, SIGINT);
  77. pthread_join(th, NULL);
  78. }
  79. template<typename T1, typename T2, typename T3>
  80. void Synchronizer<T1, T2, T3>::thread() {
  81. #if 1
  82. ros::NodeHandle nh_rcv;
  83. ros::CallbackQueue rcv_callbackqueue_;
  84. nh_rcv.setCallbackQueue(&rcv_callbackqueue_);
  85. ros::Subscriber req_sub = nh_rcv.subscribe(req_topic_, 1, &Synchronizer::req_callback, this);
  86. while (nh_rcv.ok()) {
  87. #else
  88. while (ros::ok()) {
  89. #endif
  90. rcv_callbackqueue_.callAvailable(ros::WallDuration(3.0f));
  91. pthread_mutex_lock(&mutex_);
  92. bool flag = (is_req_ == false && buf_flag_ == true);
  93. if (flag) {
  94. ROS_DEBUG("timeout");
  95. if(!publish()) {
  96. /* when to publish is failure, republish */
  97. struct timespec sleep_time;
  98. sleep_time.tv_sec = 0;
  99. sleep_time.tv_nsec = 200000000; //5Hz
  100. while (!publish() && ros::ok() && nh_rcv.ok())
  101. nanosleep(&sleep_time, NULL);
  102. }
  103. }
  104. pthread_mutex_unlock(&mutex_);
  105. }
  106. }
  107. template<typename T1, typename T2, typename T3>
  108. void Synchronizer<T1, T2, T3>::publish_msg(T1* p_type1_buf, T2* p_type2_buf) {
  109. ROS_DEBUG("publish");
  110. type1_pub_.publish(*p_type1_buf);
  111. type2_pub_.publish(*p_type2_buf);
  112. std_msgs::Float64 time_diff;
  113. time_diff.data = fabs_time_diff(&p_type1_buf->header, &p_type2_buf->header);
  114. sync_time_diff_pub_.publish(time_diff);
  115. }
  116. template<typename T1, typename T2, typename T3>
  117. bool Synchronizer<T1, T2, T3>::publish() {
  118. if (buf_flag_) {
  119. //image_obj_ranged is empty
  120. if (type1_ringbuf_.begin() == type1_ringbuf_.end()) {
  121. ROS_DEBUG("type1 ring buffer is empty");
  122. return false;
  123. }
  124. //image_raw is empty
  125. if (type2_ringbuf_.begin() == type2_ringbuf_.end()) {
  126. ROS_DEBUG("type2 ring buffer is empty");
  127. return false;
  128. }
  129. // image_obj_ranged > image_raw
  130. if (get_time(&(type1_ringbuf_.front().header)) >= get_time(&(type2_ringbuf_.front().header))) {
  131. p_type2_buf_ = &(type2_ringbuf_.front());
  132. typename boost::circular_buffer<T1>::iterator it = type1_ringbuf_.begin();
  133. if (type1_ringbuf_.size() == 1) {
  134. p_type1_buf_ = &*it;
  135. publish_msg(p_type1_buf_, p_type2_buf_);
  136. if (is_req_ == true){
  137. buf_flag_ = false;
  138. is_req_ = false;
  139. if (fabs_time_diff(&p_type1_buf_->header, &p_type2_buf_->header) == 0.0) {
  140. type1_ringbuf_.clear();
  141. type2_ringbuf_.clear();
  142. }
  143. }
  144. return true;
  145. } else {
  146. for (it++; it != type1_ringbuf_.end(); it++) {
  147. if (fabs_time_diff(&(type2_ringbuf_.front().header), &((it-1)->header))
  148. < fabs_time_diff(&(type2_ringbuf_.front().header), &(it->header))) {
  149. p_type1_buf_ = &*(it-1);
  150. break;
  151. }
  152. }
  153. if (it == type1_ringbuf_.end()) {
  154. p_type1_buf_ = &(type1_ringbuf_.back());
  155. }
  156. }
  157. }
  158. // image_obj_ranged < image_raw
  159. else {
  160. p_type1_buf_ = &(type1_ringbuf_.front());
  161. typename boost::circular_buffer<T2>::iterator it = type2_ringbuf_.begin();
  162. if (type2_ringbuf_.size() == 1) {
  163. p_type2_buf_ = &*it;
  164. publish_msg(p_type1_buf_, p_type2_buf_);
  165. if (is_req_ == true){
  166. buf_flag_ = false;
  167. is_req_ = false;
  168. if (fabs_time_diff(&p_type1_buf_->header, &p_type2_buf_->header) == 0.0) {
  169. type1_ringbuf_.clear();
  170. type2_ringbuf_.clear();
  171. }
  172. }
  173. return true;
  174. }
  175. for (it++; it != type2_ringbuf_.end(); it++) {
  176. if (fabs_time_diff(&(type1_ringbuf_.front().header), &((it-1)->header))
  177. < fabs_time_diff(&(type1_ringbuf_.front().header), &(it->header))) {
  178. p_type2_buf_ = &*(it-1);
  179. break;
  180. }
  181. }
  182. if (it == type2_ringbuf_.end()) {
  183. p_type2_buf_ = &(type2_ringbuf_.back());
  184. }
  185. }
  186. publish_msg(p_type1_buf_, p_type2_buf_);
  187. if (is_req_ == true){
  188. buf_flag_ = false;
  189. is_req_ = false;
  190. if (fabs_time_diff(&p_type1_buf_->header, &p_type2_buf_->header) == 0.0) {
  191. type1_ringbuf_.clear();
  192. type2_ringbuf_.clear();
  193. }
  194. }
  195. return true;
  196. } else {
  197. return false;
  198. }
  199. }
  200. template<typename T1, typename T2, typename T3>
  201. void Synchronizer<T1, T2, T3>::type1_callback(const typename T1::ConstPtr& type1_msg) {
  202. ROS_DEBUG("catch type1 topic");
  203. pthread_mutex_lock(&mutex_);
  204. type1_ringbuf_.push_front(*type1_msg);
  205. //image_raw is empty
  206. if (type2_ringbuf_.begin() == type2_ringbuf_.end()) {
  207. ROS_DEBUG("image_raw ring buffer is empty");
  208. buf_flag_ = false;
  209. pthread_mutex_unlock(&mutex_);
  210. return;
  211. }
  212. buf_flag_ = true;
  213. pthread_mutex_unlock(&mutex_);
  214. pthread_mutex_lock(&mutex_);
  215. if (is_req_ == true) {
  216. publish();
  217. }
  218. pthread_mutex_unlock(&mutex_);
  219. }
  220. template<typename T1, typename T2, typename T3>
  221. void Synchronizer<T1, T2, T3>::type2_callback(const typename T2::ConstPtr& type2_msg) {
  222. ROS_DEBUG("catch type2 topic");
  223. pthread_mutex_lock(&mutex_);
  224. type2_ringbuf_.push_front(*type2_msg);
  225. //image_obj_ranged is empty
  226. if (type1_ringbuf_.begin() == type1_ringbuf_.end()) {
  227. ROS_DEBUG("image_obj_ranged ring buffer is empty");
  228. buf_flag_ = false;
  229. pthread_mutex_unlock(&mutex_);
  230. return;
  231. }
  232. buf_flag_ = true;
  233. pthread_mutex_unlock(&mutex_);
  234. pthread_mutex_lock(&mutex_);
  235. if (is_req_ == true) {
  236. publish();
  237. }
  238. pthread_mutex_unlock(&mutex_);
  239. }
  240. template<typename T1, typename T2, typename T3>
  241. void Synchronizer<T1, T2, T3>::req_callback(const typename T3::ConstPtr& req_msg) {
  242. pthread_mutex_lock(&mutex_);
  243. is_req_ = true;
  244. ROS_DEBUG("catch publish request");
  245. if (publish() == false) {
  246. ROS_DEBUG("waitting...");
  247. }
  248. pthread_mutex_unlock(&mutex_);
  249. }