123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699 |
- #include "decoder.h"
- #include <c10/util/Logging.h>
- #include <libavutil/avutil.h>
- #include <future>
- #include <iostream>
- #include <mutex>
- #include "audio_stream.h"
- #include "cc_stream.h"
- #include "subtitle_stream.h"
- #include "util.h"
- #include "video_stream.h"
- namespace ffmpeg {
- namespace {
- constexpr size_t kIoBufferSize = 96 * 1024;
- constexpr size_t kIoPaddingSize = AV_INPUT_BUFFER_PADDING_SIZE;
- constexpr size_t kLogBufferSize = 1024;
- bool mapFfmpegType(AVMediaType media, MediaType* type) {
- switch (media) {
- case AVMEDIA_TYPE_AUDIO:
- *type = TYPE_AUDIO;
- return true;
- case AVMEDIA_TYPE_VIDEO:
- *type = TYPE_VIDEO;
- return true;
- case AVMEDIA_TYPE_SUBTITLE:
- *type = TYPE_SUBTITLE;
- return true;
- case AVMEDIA_TYPE_DATA:
- *type = TYPE_CC;
- return true;
- default:
- return false;
- }
- }
- std::unique_ptr<Stream> createStream(
- MediaType type,
- AVFormatContext* ctx,
- int idx,
- bool convertPtsToWallTime,
- const FormatUnion& format,
- int64_t loggingUuid) {
- switch (type) {
- case TYPE_AUDIO:
- return std::make_unique<AudioStream>(
- ctx, idx, convertPtsToWallTime, format.audio);
- case TYPE_VIDEO:
- return std::make_unique<VideoStream>(
- // negative loggingUuid indicates video streams.
- ctx,
- idx,
- convertPtsToWallTime,
- format.video,
- -loggingUuid);
- case TYPE_SUBTITLE:
- return std::make_unique<SubtitleStream>(
- ctx, idx, convertPtsToWallTime, format.subtitle);
- case TYPE_CC:
- return std::make_unique<CCStream>(
- ctx, idx, convertPtsToWallTime, format.subtitle);
- default:
- return nullptr;
- }
- }
- } // Namespace
- /* static */
- void Decoder::logFunction(void* avcl, int level, const char* cfmt, va_list vl) {
- if (!avcl) {
- // Nothing can be done here
- return;
- }
- AVClass* avclass = *reinterpret_cast<AVClass**>(avcl);
- if (!avclass) {
- // Nothing can be done here
- return;
- }
- Decoder* decoder = nullptr;
- if (strcmp(avclass->class_name, "AVFormatContext") == 0) {
- AVFormatContext* context = reinterpret_cast<AVFormatContext*>(avcl);
- if (context) {
- decoder = reinterpret_cast<Decoder*>(context->opaque);
- }
- } else if (strcmp(avclass->class_name, "AVCodecContext") == 0) {
- AVCodecContext* context = reinterpret_cast<AVCodecContext*>(avcl);
- if (context) {
- decoder = reinterpret_cast<Decoder*>(context->opaque);
- }
- } else if (strcmp(avclass->class_name, "AVIOContext") == 0) {
- AVIOContext* context = reinterpret_cast<AVIOContext*>(avcl);
- // only if opaque was assigned to Decoder pointer
- if (context && context->read_packet == Decoder::readFunction) {
- decoder = reinterpret_cast<Decoder*>(context->opaque);
- }
- } else if (strcmp(avclass->class_name, "SWResampler") == 0) {
- // expect AVCodecContext as parent
- if (avclass->parent_log_context_offset) {
- AVClass** parent =
- *(AVClass***)(((uint8_t*)avcl) + avclass->parent_log_context_offset);
- AVCodecContext* context = reinterpret_cast<AVCodecContext*>(parent);
- if (context) {
- decoder = reinterpret_cast<Decoder*>(context->opaque);
- }
- }
- } else if (strcmp(avclass->class_name, "SWScaler") == 0) {
- // cannot find a way to pass context pointer through SwsContext struct
- } else {
- VLOG(2) << "Unknown context class: " << avclass->class_name;
- }
- if (decoder != nullptr && decoder->enableLogLevel(level)) {
- char buf[kLogBufferSize] = {0};
- // Format the line
- int* prefix = decoder->getPrintPrefix();
- *prefix = 1;
- av_log_format_line(avcl, level, cfmt, vl, buf, sizeof(buf) - 1, prefix);
- // pass message to the decoder instance
- std::string msg(buf);
- decoder->logCallback(level, msg);
- }
- }
- bool Decoder::enableLogLevel(int level) const {
- return ssize_t(level) <= params_.logLevel;
- }
- void Decoder::logCallback(int level, const std::string& message) {
- LOG(INFO) << "Msg, uuid=" << params_.loggingUuid << " level=" << level
- << " msg=" << message;
- }
- /* static */
- int Decoder::shutdownFunction(void* ctx) {
- Decoder* decoder = (Decoder*)ctx;
- if (decoder == nullptr) {
- return 1;
- }
- return decoder->shutdownCallback();
- }
- int Decoder::shutdownCallback() {
- return interrupted_ ? 1 : 0;
- }
- /* static */
- int Decoder::readFunction(void* opaque, uint8_t* buf, int size) {
- Decoder* decoder = reinterpret_cast<Decoder*>(opaque);
- if (decoder == nullptr) {
- return 0;
- }
- return decoder->readCallback(buf, size);
- }
- /* static */
- int64_t Decoder::seekFunction(void* opaque, int64_t offset, int whence) {
- Decoder* decoder = reinterpret_cast<Decoder*>(opaque);
- if (decoder == nullptr) {
- return -1;
- }
- return decoder->seekCallback(offset, whence);
- }
- int Decoder::readCallback(uint8_t* buf, int size) {
- return seekableBuffer_.read(buf, size, params_.timeoutMs);
- }
- int64_t Decoder::seekCallback(int64_t offset, int whence) {
- return seekableBuffer_.seek(offset, whence, params_.timeoutMs);
- }
- /* static */
- void Decoder::initOnce() {
- static std::once_flag flagInit;
- std::call_once(flagInit, []() {
- #if LIBAVUTIL_VERSION_MAJOR < 56 // Before FFMPEG 4.0
- av_register_all();
- avcodec_register_all();
- #endif
- avformat_network_init();
- av_log_set_callback(Decoder::logFunction);
- av_log_set_level(AV_LOG_ERROR);
- VLOG(1) << "Registered ffmpeg libs";
- });
- }
- Decoder::Decoder() {
- initOnce();
- }
- Decoder::~Decoder() {
- cleanUp();
- }
- // Initialise the format context that holds information about the container and
- // fill it with minimal information about the format (codecs are not opened
- // here). Function reads in information about the streams from the container
- // into inputCtx and then passes it to decoder::openStreams. Finally, if seek is
- // specified within the decoder parameters, it seeks into the correct frame
- // (note, the seek defined here is "precise" seek).
- bool Decoder::init(
- const DecoderParameters& params,
- DecoderInCallback&& in,
- std::vector<DecoderMetadata>* metadata) {
- cleanUp();
- if ((params.uri.empty() || in) && (!params.uri.empty() || !in)) {
- LOG(ERROR)
- << "uuid=" << params_.loggingUuid
- << " either external URI gets provided or explicit input callback";
- return false;
- }
- // set callback and params
- params_ = params;
- if (!(inputCtx_ = avformat_alloc_context())) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " cannot allocate format context";
- return false;
- }
- AVInputFormat* fmt = nullptr;
- int result = 0;
- if (in) {
- ImageType type = ImageType::UNKNOWN;
- if ((result = seekableBuffer_.init(
- std::forward<DecoderInCallback>(in),
- params_.timeoutMs,
- params_.maxSeekableBytes,
- params_.isImage ? &type : nullptr)) < 0) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " can't initiate seekable buffer";
- cleanUp();
- return false;
- }
- if (params_.isImage) {
- const char* fmtName = "image2";
- switch (type) {
- case ImageType::JPEG:
- fmtName = "jpeg_pipe";
- break;
- case ImageType::PNG:
- fmtName = "png_pipe";
- break;
- case ImageType::TIFF:
- fmtName = "tiff_pipe";
- break;
- default:
- break;
- }
- fmt = (AVInputFormat*)av_find_input_format(fmtName);
- }
- const size_t avioCtxBufferSize = kIoBufferSize;
- uint8_t* avioCtxBuffer =
- (uint8_t*)av_malloc(avioCtxBufferSize + kIoPaddingSize);
- if (!avioCtxBuffer) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " av_malloc cannot allocate " << avioCtxBufferSize
- << " bytes";
- cleanUp();
- return false;
- }
- if (!(avioCtx_ = avio_alloc_context(
- avioCtxBuffer,
- avioCtxBufferSize,
- 0,
- reinterpret_cast<void*>(this),
- &Decoder::readFunction,
- nullptr,
- result == 1 ? &Decoder::seekFunction : nullptr))) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " avio_alloc_context failed";
- av_free(avioCtxBuffer);
- cleanUp();
- return false;
- }
- inputCtx_->pb = avioCtx_;
- inputCtx_->flags |= AVFMT_FLAG_CUSTOM_IO;
- }
- inputCtx_->opaque = reinterpret_cast<void*>(this);
- inputCtx_->interrupt_callback.callback = Decoder::shutdownFunction;
- inputCtx_->interrupt_callback.opaque = reinterpret_cast<void*>(this);
- // add network timeout
- inputCtx_->flags |= AVFMT_FLAG_NONBLOCK;
- AVDictionary* options = nullptr;
- if (params_.listen) {
- av_dict_set_int(&options, "listen", 1, 0);
- }
- if (params_.timeoutMs > 0) {
- av_dict_set_int(&options, "analyzeduration", params_.timeoutMs * 1000, 0);
- av_dict_set_int(&options, "stimeout", params_.timeoutMs * 1000, 0);
- av_dict_set_int(&options, "rw_timeout", params_.timeoutMs * 1000, 0);
- if (!params_.tlsCertFile.empty()) {
- av_dict_set(&options, "cert_file", params_.tlsCertFile.data(), 0);
- }
- if (!params_.tlsKeyFile.empty()) {
- av_dict_set(&options, "key_file", params_.tlsKeyFile.data(), 0);
- }
- }
- av_dict_set_int(&options, "probesize", params_.probeSize, 0);
- interrupted_ = false;
- // ffmpeg avformat_open_input call can hang if media source doesn't respond
- // set a guard for handle such situations, if requested
- std::promise<bool> p;
- std::future<bool> f = p.get_future();
- std::unique_ptr<std::thread> guard;
- if (params_.preventStaleness) {
- guard = std::make_unique<std::thread>([&f, this]() {
- auto timeout = std::chrono::milliseconds(params_.timeoutMs);
- if (std::future_status::timeout == f.wait_for(timeout)) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " cannot open stream within " << params_.timeoutMs
- << " ms";
- interrupted_ = true;
- }
- });
- }
- if (fmt) {
- result = avformat_open_input(&inputCtx_, nullptr, fmt, &options);
- } else {
- result =
- avformat_open_input(&inputCtx_, params_.uri.c_str(), nullptr, &options);
- }
- av_dict_free(&options);
- if (guard) {
- p.set_value(true);
- guard->join();
- guard.reset();
- }
- if (result < 0 || interrupted_) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " avformat_open_input failed, error="
- << Util::generateErrorDesc(result);
- cleanUp();
- return false;
- }
- result = avformat_find_stream_info(inputCtx_, nullptr);
- if (result < 0) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " avformat_find_stream_info failed, error="
- << Util::generateErrorDesc(result);
- cleanUp();
- return false;
- }
- if (!openStreams(metadata)) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid << " cannot activate streams";
- cleanUp();
- return false;
- }
- // SyncDecoder inherits Decoder which would override onInit.
- onInit();
- if (params.startOffset != 0) {
- auto offset = params.startOffset <= params.seekAccuracy
- ? 0
- : params.startOffset - params.seekAccuracy;
- av_seek_frame(inputCtx_, -1, offset, AVSEEK_FLAG_BACKWARD);
- }
- VLOG(1) << "Decoder initialized, log level: " << params_.logLevel;
- return true;
- }
- // open appropriate CODEC for every type of stream and move it to the class
- // variable `streams_` and make sure it is in range for decoding
- bool Decoder::openStreams(std::vector<DecoderMetadata>* metadata) {
- for (unsigned int i = 0; i < inputCtx_->nb_streams; i++) {
- // - find the corespondent format at params_.formats set
- MediaFormat format;
- #if LIBAVUTIL_VERSION_MAJOR < 56 // Before FFMPEG 4.0
- const auto media = inputCtx_->streams[i]->codec->codec_type;
- #else // FFMPEG 4.0+
- const auto media = inputCtx_->streams[i]->codecpar->codec_type;
- #endif
- if (!mapFfmpegType(media, &format.type)) {
- VLOG(1) << "Stream media: " << media << " at index " << i
- << " gets ignored, unknown type";
- continue; // unsupported type
- }
- // check format
- auto it = params_.formats.find(format);
- if (it == params_.formats.end()) {
- VLOG(1) << "Stream type: " << format.type << " at index: " << i
- << " gets ignored, caller is not interested";
- continue; // clients don't care about this media format
- }
- // do we have stream of this type?
- auto stream = findByType(format);
- // should we process this stream?
- if (it->stream == -2 || // all streams of this type are welcome
- (!stream && (it->stream == -1 || it->stream == i))) { // new stream
- VLOG(1) << "Stream type: " << format.type << " found, at index: " << i;
- auto stream = createStream(
- format.type,
- inputCtx_,
- i,
- params_.convertPtsToWallTime,
- it->format,
- params_.loggingUuid);
- CHECK(stream);
- if (stream->openCodec(metadata, params_.numThreads) < 0) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " open codec failed, stream_idx=" << i;
- return false;
- }
- streams_.emplace(i, std::move(stream));
- inRange_.set(i, true);
- }
- }
- return true;
- }
- void Decoder::shutdown() {
- cleanUp();
- }
- void Decoder::interrupt() {
- interrupted_ = true;
- }
- void Decoder::cleanUp() {
- if (!interrupted_) {
- interrupted_ = true;
- }
- if (inputCtx_) {
- for (auto& stream : streams_) {
- // Drain stream buffers.
- DecoderOutputMessage msg;
- while (msg.payload = nullptr, stream.second->flush(&msg, true) > 0) {
- }
- stream.second.reset();
- }
- streams_.clear();
- avformat_close_input(&inputCtx_);
- }
- if (avioCtx_) {
- av_freep(&avioCtx_->buffer);
- av_freep(&avioCtx_);
- }
- // reset callback
- seekableBuffer_.shutdown();
- }
- // function does actual work, derived class calls it in working thread
- // periodically. On success method returns 0, ENODATA on EOF, ETIMEDOUT if
- // no frames got decoded in the specified timeout time, AVERROR_BUFFER_TOO_SMALL
- // when unable to allocate packet and error on unrecoverable error
- int Decoder::getFrame(size_t workingTimeInMs) {
- if (inRange_.none()) {
- return ENODATA;
- }
- // decode frames until cache is full and leave thread
- // once decode() method gets called and grab some bytes
- // run this method again
- // init package
- // update 03/22: moving memory management to ffmpeg
- AVPacket* avPacket;
- avPacket = av_packet_alloc();
- if (avPacket == nullptr) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " decoder as not able to allocate the packet.";
- return AVERROR_BUFFER_TOO_SMALL;
- }
- avPacket->data = nullptr;
- avPacket->size = 0;
- auto end = std::chrono::steady_clock::now() +
- std::chrono::milliseconds(workingTimeInMs);
- // return true if elapsed time less than timeout
- auto watcher = [end]() -> bool {
- return std::chrono::steady_clock::now() <= end;
- };
- int result = 0;
- size_t decodingErrors = 0;
- bool decodedFrame = false;
- while (!interrupted_ && inRange_.any() && !decodedFrame) {
- if (watcher() == false) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid << " hit ETIMEDOUT";
- result = ETIMEDOUT;
- break;
- }
- result = av_read_frame(inputCtx_, avPacket);
- if (result == AVERROR(EAGAIN)) {
- VLOG(4) << "Decoder is busy...";
- std::this_thread::yield();
- result = 0; // reset error, EAGAIN is not an error at all
- // reset the packet to default settings
- av_packet_unref(avPacket);
- continue;
- } else if (result == AVERROR_EOF) {
- flushStreams();
- VLOG(1) << "End of stream";
- result = ENODATA;
- break;
- } else if (
- result == AVERROR(EPERM) && params_.skipOperationNotPermittedPackets) {
- // reset error, lets skip packets with EPERM
- result = 0;
- // reset the packet to default settings
- av_packet_unref(avPacket);
- continue;
- } else if (result < 0) {
- flushStreams();
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " error detected: " << Util::generateErrorDesc(result);
- break;
- }
- // get stream; if stream cannot be found reset the packet to
- // default settings
- auto stream = findByIndex(avPacket->stream_index);
- if (stream == nullptr || !inRange_.test(stream->getIndex())) {
- av_packet_unref(avPacket);
- continue;
- }
- size_t numConsecutiveNoBytes = 0;
- // it can be only partial decoding of the package bytes
- do {
- // decode package
- bool gotFrame = false;
- bool hasMsg = false;
- // packet either got consumed completely or not at all
- if ((result = processPacket(
- stream, avPacket, &gotFrame, &hasMsg, params_.fastSeek)) < 0) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " processPacket failed with code: " << result;
- break;
- }
- if (!gotFrame && params_.maxProcessNoBytes != 0 &&
- ++numConsecutiveNoBytes > params_.maxProcessNoBytes) {
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " exceeding max amount of consecutive no bytes";
- break;
- }
- if (result > 0) {
- numConsecutiveNoBytes = 0;
- }
- decodedFrame |= hasMsg;
- } while (result == 0);
- // post loop check
- if (result < 0) {
- if (params_.maxPackageErrors != 0 && // check errors
- ++decodingErrors >= params_.maxPackageErrors) { // reached the limit
- LOG(ERROR) << "uuid=" << params_.loggingUuid
- << " exceeding max amount of consecutive package errors";
- break;
- }
- } else {
- decodingErrors = 0; // reset on success
- }
- result = 0;
- av_packet_unref(avPacket);
- }
- av_packet_free(&avPacket);
- VLOG(2) << "Interrupted loop"
- << ", interrupted_ " << interrupted_ << ", inRange_.any() "
- << inRange_.any() << ", decodedFrame " << decodedFrame << ", result "
- << result;
- // loop can be terminated, either by:
- // 1. explicitly interrupted
- // 3. unrecoverable error or ENODATA (end of stream) or ETIMEDOUT (timeout)
- // 4. decoded frames pts are out of the specified range
- // 5. success decoded frame
- if (interrupted_) {
- return EINTR;
- }
- if (result != 0) {
- return result;
- }
- if (inRange_.none()) {
- return ENODATA;
- }
- return 0;
- }
- // find stream by stream index
- Stream* Decoder::findByIndex(int streamIndex) const {
- auto it = streams_.find(streamIndex);
- return it != streams_.end() ? it->second.get() : nullptr;
- }
- // find stream by type; note finds only the first stream of a given type
- Stream* Decoder::findByType(const MediaFormat& format) const {
- for (auto& stream : streams_) {
- if (stream.second->getMediaFormat().type == format.type) {
- return stream.second.get();
- }
- }
- return nullptr;
- }
- // given the stream and packet, decode the frame buffers into the
- // DecoderOutputMessage data structure via stream::decodePacket function.
- int Decoder::processPacket(
- Stream* stream,
- AVPacket* packet,
- bool* gotFrame,
- bool* hasMsg,
- bool fastSeek) {
- // decode package
- int result;
- DecoderOutputMessage msg;
- msg.payload = params_.headerOnly ? nullptr : createByteStorage(0);
- *hasMsg = false;
- if ((result = stream->decodePacket(
- packet, &msg, params_.headerOnly, gotFrame)) >= 0 &&
- *gotFrame) {
- // check end offset
- bool endInRange =
- params_.endOffset <= 0 || msg.header.pts <= params_.endOffset;
- inRange_.set(stream->getIndex(), endInRange);
- // if fastseek is enabled, we're returning the first
- // frame that we decode after (potential) seek.
- // By default, we perform accurate seek to the closest
- // following frame
- bool startCondition = true;
- if (!fastSeek) {
- startCondition = msg.header.pts >= params_.startOffset;
- }
- if (endInRange && startCondition) {
- *hasMsg = true;
- push(std::move(msg));
- }
- }
- return result;
- }
- void Decoder::flushStreams() {
- VLOG(1) << "Flushing streams...";
- for (auto& stream : streams_) {
- DecoderOutputMessage msg;
- while (msg.payload = (params_.headerOnly ? nullptr : createByteStorage(0)),
- stream.second->flush(&msg, params_.headerOnly) > 0) {
- // check end offset
- bool endInRange =
- params_.endOffset <= 0 || msg.header.pts <= params_.endOffset;
- inRange_.set(stream.second->getIndex(), endInRange);
- if (endInRange && msg.header.pts >= params_.startOffset) {
- push(std::move(msg));
- } else {
- msg.payload.reset();
- }
- }
- }
- }
- int Decoder::decode_all(const DecoderOutCallback& callback) {
- int result;
- do {
- DecoderOutputMessage out;
- if (0 == (result = decode(&out, params_.timeoutMs))) {
- callback(std::move(out));
- }
- } while (result == 0);
- return result;
- }
- } // namespace ffmpeg
|