stream.cpp 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. #include "stream.h"
  2. #include <c10/util/Logging.h>
  3. #include <stdio.h>
  4. #include <string.h>
  5. #include "util.h"
  6. namespace ffmpeg {
  7. const AVRational timeBaseQ = AVRational{1, AV_TIME_BASE};
  8. Stream::Stream(
  9. AVFormatContext* inputCtx,
  10. MediaFormat format,
  11. bool convertPtsToWallTime,
  12. int64_t loggingUuid)
  13. : inputCtx_(inputCtx),
  14. format_(format),
  15. convertPtsToWallTime_(convertPtsToWallTime),
  16. loggingUuid_(loggingUuid) {}
  17. Stream::~Stream() {
  18. if (frame_) {
  19. av_free(frame_);
  20. }
  21. if (codecCtx_) {
  22. avcodec_free_context(&codecCtx_);
  23. }
  24. }
  25. // look up the proper CODEC querying the function
  26. AVCodec* Stream::findCodec(AVCodecParameters* params) {
  27. return (AVCodec*)avcodec_find_decoder(params->codec_id);
  28. }
  29. // Allocate memory for the AVCodecContext, which will hold the context for
  30. // decode/encode process. Then fill this codec context with CODEC parameters
  31. // defined in stream parameters. Open the codec, and allocate the global frame
  32. // defined in the header file
  33. int Stream::openCodec(std::vector<DecoderMetadata>* metadata, int num_threads) {
  34. AVStream* steam = inputCtx_->streams[format_.stream];
  35. AVCodec* codec = findCodec(steam->codecpar);
  36. if (!codec) {
  37. LOG(ERROR) << "LoggingUuid #" << loggingUuid_
  38. << ", avcodec_find_decoder failed for codec_id: "
  39. << int(steam->codecpar->codec_id);
  40. return AVERROR(EINVAL);
  41. }
  42. if (!(codecCtx_ = avcodec_alloc_context3(codec))) {
  43. LOG(ERROR) << "LoggingUuid #" << loggingUuid_
  44. << ", avcodec_alloc_context3 failed";
  45. return AVERROR(ENOMEM);
  46. }
  47. // multithreading heuristics
  48. // if user defined,
  49. if (num_threads > max_threads) {
  50. num_threads = max_threads;
  51. }
  52. if (num_threads > 0) {
  53. // if user defined, respect that
  54. // note that default thread_type will be used
  55. codecCtx_->thread_count = num_threads;
  56. } else {
  57. // otherwise set sensible defaults
  58. // with the special case for the different MPEG4 codecs
  59. // that don't have threading context functions
  60. if (codecCtx_->codec->capabilities & AV_CODEC_CAP_INTRA_ONLY) {
  61. codecCtx_->thread_type = FF_THREAD_FRAME;
  62. codecCtx_->thread_count = 2;
  63. } else {
  64. codecCtx_->thread_count = 8;
  65. codecCtx_->thread_type = FF_THREAD_SLICE;
  66. }
  67. }
  68. int ret;
  69. // Copy codec parameters from input stream to output codec context
  70. if ((ret = avcodec_parameters_to_context(codecCtx_, steam->codecpar)) < 0) {
  71. LOG(ERROR) << "LoggingUuid #" << loggingUuid_
  72. << ", avcodec_parameters_to_context failed";
  73. return ret;
  74. }
  75. // after avcodec_open2, value of codecCtx_->time_base is NOT meaningful
  76. if ((ret = avcodec_open2(codecCtx_, codec, nullptr)) < 0) {
  77. LOG(ERROR) << "LoggingUuid #" << loggingUuid_
  78. << ", avcodec_open2 failed: " << Util::generateErrorDesc(ret);
  79. avcodec_free_context(&codecCtx_);
  80. codecCtx_ = nullptr;
  81. return ret;
  82. }
  83. frame_ = av_frame_alloc();
  84. switch (format_.type) {
  85. case TYPE_VIDEO:
  86. fps_ = av_q2d(av_guess_frame_rate(inputCtx_, steam, nullptr));
  87. break;
  88. case TYPE_AUDIO:
  89. fps_ = codecCtx_->sample_rate;
  90. break;
  91. default:
  92. fps_ = 30.0;
  93. }
  94. if ((ret = initFormat())) {
  95. LOG(ERROR) << "initFormat failed, type: " << format_.type;
  96. }
  97. if (metadata) {
  98. DecoderMetadata header;
  99. header.format = format_;
  100. header.fps = fps_;
  101. header.num = steam->time_base.num;
  102. header.den = steam->time_base.den;
  103. header.duration =
  104. av_rescale_q(steam->duration, steam->time_base, timeBaseQ);
  105. metadata->push_back(header);
  106. }
  107. return ret;
  108. }
  109. // send the raw data packet (compressed frame) to the decoder, through the codec
  110. // context and receive the raw data frame (uncompressed frame) from the
  111. // decoder, through the same codec context
  112. int Stream::analyzePacket(const AVPacket* packet, bool* gotFrame) {
  113. int consumed = 0;
  114. int result = avcodec_send_packet(codecCtx_, packet);
  115. if (result == AVERROR(EAGAIN)) {
  116. *gotFrame = false; // no bytes get consumed, fetch frame
  117. } else if (result == AVERROR_EOF) {
  118. *gotFrame = false; // more than one flush packet
  119. if (packet) {
  120. // got packet after flush, this is an error
  121. return result;
  122. }
  123. } else if (result < 0) {
  124. LOG(ERROR) << "avcodec_send_packet failed, err: "
  125. << Util::generateErrorDesc(result);
  126. return result; // error
  127. } else {
  128. consumed = packet ? packet->size : 0; // all bytes get consumed
  129. }
  130. result = avcodec_receive_frame(codecCtx_, frame_);
  131. if (result >= 0) {
  132. *gotFrame = true; // frame is available
  133. } else if (result == AVERROR(EAGAIN)) {
  134. *gotFrame = false; // no frames at this time, needs more packets
  135. if (!consumed) {
  136. // precaution, if no packages got consumed and no frames are available
  137. return result;
  138. }
  139. } else if (result == AVERROR_EOF) {
  140. *gotFrame = false; // the last frame has been flushed
  141. // precaution, if no more frames are available assume we consume all bytes
  142. consumed = 0;
  143. } else { // error
  144. LOG(ERROR) << "avcodec_receive_frame failed, err: "
  145. << Util::generateErrorDesc(result);
  146. return result;
  147. }
  148. return consumed;
  149. }
  150. // General decoding function:
  151. // given the packet, analyse the metadata, and write the
  152. // metadata and the buffer to the DecoderOutputImage.
  153. int Stream::decodePacket(
  154. const AVPacket* packet,
  155. DecoderOutputMessage* out,
  156. bool headerOnly,
  157. bool* hasMsg) {
  158. int consumed;
  159. bool gotFrame = false;
  160. *hasMsg = false;
  161. if ((consumed = analyzePacket(packet, &gotFrame)) >= 0 &&
  162. (packet == nullptr || gotFrame)) {
  163. int result;
  164. if ((result = getMessage(out, !gotFrame, headerOnly)) < 0) {
  165. return result; // report error
  166. }
  167. *hasMsg = result > 0;
  168. }
  169. return consumed;
  170. }
  171. int Stream::flush(DecoderOutputMessage* out, bool headerOnly) {
  172. bool hasMsg = false;
  173. int result = decodePacket(nullptr, out, headerOnly, &hasMsg);
  174. if (result < 0) {
  175. avcodec_flush_buffers(codecCtx_);
  176. return result;
  177. }
  178. if (!hasMsg) {
  179. avcodec_flush_buffers(codecCtx_);
  180. return 0;
  181. }
  182. return 1;
  183. }
  184. // Sets the header and payload via stream::setHeader and copyFrameBytes
  185. // functions that are defined in type stream subclass (VideoStream, AudioStream,
  186. // ...)
  187. int Stream::getMessage(DecoderOutputMessage* out, bool flush, bool headerOnly) {
  188. if (flush) {
  189. // only flush of audio frames makes sense
  190. if (format_.type == TYPE_AUDIO) {
  191. int processed = 0;
  192. size_t total = 0;
  193. // grab all audio bytes by chunks
  194. do {
  195. if ((processed = copyFrameBytes(out->payload.get(), flush)) < 0) {
  196. return processed;
  197. }
  198. total += processed;
  199. } while (processed);
  200. if (total) {
  201. // set header if message bytes are available
  202. setHeader(&out->header, flush);
  203. return 1;
  204. }
  205. }
  206. return 0;
  207. } else {
  208. if (format_.type == TYPE_AUDIO) {
  209. int processed = 0;
  210. if ((processed = copyFrameBytes(out->payload.get(), flush)) < 0) {
  211. return processed;
  212. }
  213. if (processed) {
  214. // set header if message bytes are available
  215. setHeader(&out->header, flush);
  216. return 1;
  217. }
  218. return 0;
  219. } else {
  220. // set header
  221. setHeader(&out->header, flush);
  222. if (headerOnly) {
  223. // Only header is requisted
  224. return 1;
  225. }
  226. return copyFrameBytes(out->payload.get(), flush);
  227. }
  228. }
  229. }
  230. void Stream::setHeader(DecoderHeader* header, bool flush) {
  231. header->seqno = numGenerator_++;
  232. setFramePts(header, flush);
  233. if (convertPtsToWallTime_) {
  234. keeper_.adjust(header->pts);
  235. }
  236. header->format = format_;
  237. header->keyFrame = 0;
  238. header->fps = std::numeric_limits<double>::quiet_NaN();
  239. }
  240. void Stream::setFramePts(DecoderHeader* header, bool flush) {
  241. if (flush) {
  242. header->pts = nextPts_; // already in us
  243. } else {
  244. header->pts = frame_->best_effort_timestamp;
  245. if (header->pts == AV_NOPTS_VALUE) {
  246. header->pts = nextPts_;
  247. } else {
  248. header->pts = av_rescale_q(
  249. header->pts,
  250. inputCtx_->streams[format_.stream]->time_base,
  251. timeBaseQ);
  252. }
  253. switch (format_.type) {
  254. case TYPE_AUDIO:
  255. nextPts_ = header->pts + frame_->nb_samples * AV_TIME_BASE / fps_;
  256. break;
  257. case TYPE_VIDEO:
  258. nextPts_ = header->pts + AV_TIME_BASE / fps_;
  259. break;
  260. default:
  261. nextPts_ = header->pts;
  262. }
  263. }
  264. }
  265. } // namespace ffmpeg