[FFmpeg-devel] [PATCH 20/24] fftools/ffmpeg_dec: convert to the scheduler
Anton Khirnov
anton at khirnov.net
Sat Nov 4 09:56:29 EET 2023
---
fftools/ffmpeg.c | 22 ---
fftools/ffmpeg.h | 13 +-
fftools/ffmpeg_dec.c | 315 ++++++++++---------------------------------
3 files changed, 70 insertions(+), 280 deletions(-)
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 611ac4621d..bd783fe674 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -778,11 +778,6 @@ static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eo
int ret = 0;
int eof_reached = 0;
- if (ist->decoding_needed) {
- ret = dec_packet(ist, pkt, no_eof);
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
- }
if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
eof_reached = 1;
@@ -994,18 +989,6 @@ static void reset_eagain(void)
ost->unavailable = 0;
}
-static void decode_flush(InputFile *ifile)
-{
- for (int i = 0; i < ifile->nb_streams; i++) {
- InputStream *ist = ifile->streams[i];
-
- if (ist->discard || !ist->decoding_needed)
- continue;
-
- dec_packet(ist, NULL, 1);
- }
-}
-
/*
* Return
* - 0 -- one packet was read and processed
@@ -1021,11 +1004,6 @@ static int process_input(int file_index, AVPacket *pkt)
ret = 0;
- if (ret == 1) {
- /* the input file is looped: flush the decoders */
- decode_flush(ifile);
- return AVERROR(EAGAIN);
- }
if (ret < 0) {
if (ret != AVERROR_EOF) {
av_log(ifile, AV_LOG_ERROR,
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 73b3e54fb0..975d8b737e 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -61,6 +61,8 @@
#define FFMPEG_OPT_TOP 1
#define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1
+#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D')
+
enum VideoSyncMethod {
VSYNC_AUTO = -1,
VSYNC_PASSTHROUGH,
@@ -796,17 +798,6 @@ int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input);
int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
void dec_free(Decoder **pdec);
-/**
- * Submit a packet for decoding
- *
- * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and
- * mark all downstreams as finished.
- *
- * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush
- * decoders and await further input.
- */
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
-
int enc_alloc(Encoder **penc, const AVCodec *codec,
Scheduler *sch, unsigned sch_idx);
void enc_free(Encoder **penc);
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index 53e14f061e..a81f83fc92 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -54,24 +54,6 @@ struct Decoder {
Scheduler *sch;
unsigned sch_idx;
-
- pthread_t thread;
- /**
- * Queue for sending coded packets from the main thread to
- * the decoder thread.
- *
- * An empty packet is sent to flush the decoder without terminating
- * decoding.
- */
- ThreadQueue *queue_in;
- /**
- * Queue for sending decoded frames from the decoder thread
- * to the main thread.
- *
- * An empty frame is sent to signal that a single packet has been fully
- * processed.
- */
- ThreadQueue *queue_out;
};
// data that is local to the decoder thread and not visible outside of it
@@ -80,24 +62,6 @@ typedef struct DecThreadContext {
AVPacket *pkt;
} DecThreadContext;
-static int dec_thread_stop(Decoder *d)
-{
- void *ret;
-
- if (!d->queue_in)
- return 0;
-
- tq_send_finish(d->queue_in, 0);
- tq_receive_finish(d->queue_out, 0);
-
- pthread_join(d->thread, &ret);
-
- tq_free(&d->queue_in);
- tq_free(&d->queue_out);
-
- return (intptr_t)ret;
-}
-
void dec_free(Decoder **pdec)
{
Decoder *dec = *pdec;
@@ -105,8 +69,6 @@ void dec_free(Decoder **pdec)
if (!dec)
return;
- dec_thread_stop(dec);
-
av_frame_free(&dec->frame);
av_packet_free(&dec->pkt);
@@ -148,25 +110,6 @@ fail:
return AVERROR(ENOMEM);
}
-static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
-{
- int i, ret = 0;
-
- for (i = 0; i < ist->nb_filters; i++) {
- ret = ifilter_send_frame(ist->filters[i], decoded_frame,
- i < ist->nb_filters - 1 ||
- ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
- if (ret == AVERROR_EOF)
- ret = 0; /* ignore */
- if (ret < 0) {
- av_log(NULL, AV_LOG_ERROR,
- "Failed to inject frame into filter network: %s\n", av_err2str(ret));
- break;
- }
- }
- return ret;
-}
-
static AVRational audio_samplerate_update(void *logctx, Decoder *d,
const AVFrame *frame)
{
@@ -421,36 +364,31 @@ static int process_subtitle(InputStream *ist, AVFrame *frame)
if (!subtitle)
return 0;
- ret = send_frame_to_filters(ist, frame);
+ ret = sch_dec_send(d->sch, d->sch_idx, frame);
if (ret < 0)
- return ret;
+ av_frame_unref(frame);
- subtitle = (AVSubtitle*)frame->buf[0]->data;
- if (!subtitle->num_rects)
- return 0;
-
- for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
- OutputStream *ost = ist->outputs[oidx];
- if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE)
- continue;
-
- ret = enc_subtitle(output_files[ost->file_index], ost, subtitle);
- if (ret < 0)
- return ret;
- }
-
- return 0;
+ return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
}
static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
AVFrame *frame)
{
- Decoder *d = ist->decoder;
+ Decoder *d = ist->decoder;
AVPacket *flush_pkt = NULL;
AVSubtitle subtitle;
int got_output;
int ret;
+ if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) {
+ frame->pts = pkt->pts;
+ frame->time_base = pkt->time_base;
+ frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT;
+
+ ret = sch_dec_send(d->sch, d->sch_idx, frame);
+ return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+ }
+
if (!pkt) {
flush_pkt = av_packet_alloc();
if (!flush_pkt)
@@ -473,7 +411,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
ist->frames_decoded++;
- // XXX the queue for transferring data back to the main thread runs
+ // XXX the queue for transferring data to consumers runs
// on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
// inside the frame
// eventually, subtitles should be switched to use AVFrames natively
@@ -486,26 +424,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
frame->width = ist->dec_ctx->width;
frame->height = ist->dec_ctx->height;
- ret = tq_send(d->queue_out, 0, frame);
- if (ret < 0)
- av_frame_unref(frame);
-
- return ret;
-}
-
-static int send_filter_eof(InputStream *ist)
-{
- Decoder *d = ist->decoder;
- int i, ret;
-
- for (i = 0; i < ist->nb_filters; i++) {
- int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
- d->last_frame_pts + d->last_frame_duration_est;
- ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb);
- if (ret < 0)
- return ret;
- }
- return 0;
+ return process_subtitle(ist, frame);
}
static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
@@ -612,9 +531,11 @@ static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
ist->frames_decoded++;
- ret = tq_send(d->queue_out, 0, frame);
- if (ret < 0)
- return ret;
+ ret = sch_dec_send(d->sch, d->sch_idx, frame);
+ if (ret < 0) {
+ av_frame_unref(frame);
+ return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+ }
}
}
@@ -656,7 +577,6 @@ fail:
void *decoder_thread(void *arg)
{
InputStream *ist = arg;
- InputFile *ifile = input_files[ist->file_index];
Decoder *d = ist->decoder;
DecThreadContext dt;
int ret = 0, input_status = 0;
@@ -668,19 +588,30 @@ void *decoder_thread(void *arg)
dec_thread_set_name(ist);
while (!input_status) {
- int dummy, flush_buffers;
+ int flush_buffers, have_data;
- input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
- flush_buffers = input_status >= 0 && !dt.pkt->buf;
- if (!dt.pkt->buf)
+ input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt);
+ have_data = input_status >= 0 &&
+ (dt.pkt->buf || dt.pkt->side_data_elems ||
+ (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT);
+ flush_buffers = input_status >= 0 && !have_data;
+ if (!have_data)
av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
flush_buffers ? "flush" : "EOF");
- ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
+ ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame);
av_packet_unref(dt.pkt);
av_frame_unref(dt.frame);
+ // AVERROR_EOF - EOF from the decoder
+ // AVERROR_EXIT - EOF from the scheduler
+ // we treat them differently when flushing
+ if (ret == AVERROR_EXIT) {
+ ret = AVERROR_EOF;
+ flush_buffers = 0;
+ }
+
if (ret == AVERROR_EOF) {
av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
flush_buffers ? "resetting" : "finishing");
@@ -688,11 +619,10 @@ void *decoder_thread(void *arg)
if (!flush_buffers)
break;
- /* report last frame duration to the demuxer thread */
+ /* report last frame duration to the scheduler */
if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
- Timestamp ts = { .ts = d->last_frame_pts + d->last_frame_duration_est,
- .tb = d->last_frame_tb };
- av_thread_message_queue_send(ifile->audio_ts_queue, &ts, 0);
+ dt.pkt->pts = d->last_frame_pts + d->last_frame_duration_est;
+ dt.pkt->time_base = d->last_frame_tb;
}
avcodec_flush_buffers(ist->dec_ctx);
@@ -701,149 +631,47 @@ void *decoder_thread(void *arg)
av_err2str(ret));
break;
}
-
- // signal to the consumer thread that the entire packet was processed
- ret = tq_send(d->queue_out, 0, dt.frame);
- if (ret < 0) {
- if (ret != AVERROR_EOF)
- av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
- break;
- }
}
// EOF is normal thread termination
if (ret == AVERROR_EOF)
ret = 0;
+ // on success send EOF timestamp to our downstreams
+ if (ret >= 0) {
+ float err_rate;
+
+ av_frame_unref(dt.frame);
+
+ dt.frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_EOF;
+ dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
+ d->last_frame_pts + d->last_frame_duration_est;
+ dt.frame->time_base = d->last_frame_tb;
+
+ ret = sch_dec_send(d->sch, d->sch_idx, dt.frame);
+ if (ret < 0 && ret != AVERROR_EOF) {
+ av_log(NULL, AV_LOG_FATAL,
+ "Error signalling EOF timestamp: %s\n", av_err2str(ret));
+ goto finish;
+ }
+ ret = 0;
+
+ err_rate = (ist->frames_decoded || ist->decode_errors) ?
+ ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
+ if (err_rate > max_error_rate) {
+ av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
+ err_rate, max_error_rate);
+ ret = FFMPEG_ERROR_RATE_EXCEEDED;
+ } else if (err_rate)
+ av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
+ }
+
finish:
- tq_receive_finish(d->queue_in, 0);
- tq_send_finish (d->queue_out, 0);
-
- // make sure the demuxer does not get stuck waiting for audio durations
- // that will never arrive
- if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
- av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, AVERROR_EOF);
-
dec_thread_uninit(&dt);
- av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
-
return (void*)(intptr_t)ret;
}
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
-{
- Decoder *d = ist->decoder;
- int ret = 0, thread_ret;
-
- // thread already joined
- if (!d->queue_in)
- return AVERROR_EOF;
-
- // send the packet/flush request/EOF to the decoder thread
- if (pkt || no_eof) {
- av_packet_unref(d->pkt);
-
- if (pkt) {
- ret = av_packet_ref(d->pkt, pkt);
- if (ret < 0)
- goto finish;
- }
-
- ret = tq_send(d->queue_in, 0, d->pkt);
- if (ret < 0)
- goto finish;
- } else
- tq_send_finish(d->queue_in, 0);
-
- // retrieve all decoded data for the packet
- while (1) {
- int dummy;
-
- ret = tq_receive(d->queue_out, &dummy, d->frame);
- if (ret < 0)
- goto finish;
-
- // packet fully processed
- if (!d->frame->buf[0])
- return 0;
-
- // process the decoded frame
- if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
- ret = process_subtitle(ist, d->frame);
- } else {
- ret = send_frame_to_filters(ist, d->frame);
- }
- av_frame_unref(d->frame);
- if (ret < 0)
- goto finish;
- }
-
-finish:
- thread_ret = dec_thread_stop(d);
- if (thread_ret < 0) {
- av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
- av_err2str(thread_ret));
- ret = err_merge(ret, thread_ret);
- }
- // non-EOF errors here are all fatal
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
-
- // signal EOF to our downstreams
- ret = send_filter_eof(ist);
- if (ret < 0) {
- av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
- return ret;
- }
-
- return AVERROR_EOF;
-}
-
-static int dec_thread_start(InputStream *ist)
-{
- Decoder *d = ist->decoder;
- ObjPool *op;
- int ret = 0;
-
- op = objpool_alloc_packets();
- if (!op)
- return AVERROR(ENOMEM);
-
- d->queue_in = tq_alloc(1, 1, op, pkt_move);
- if (!d->queue_in) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- op = objpool_alloc_frames();
- if (!op)
- goto fail;
-
- d->queue_out = tq_alloc(1, 4, op, frame_move);
- if (!d->queue_out) {
- objpool_free(&op);
- goto fail;
- }
-
- ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
- if (ret) {
- ret = AVERROR(ret);
- av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
- av_err2str(ret));
- goto fail;
- }
-
- return 0;
-fail:
- if (ret >= 0)
- ret = AVERROR(ENOMEM);
-
- tq_free(&d->queue_in);
- tq_free(&d->queue_out);
- return ret;
-}
-
static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
{
InputStream *ist = s->opaque;
@@ -1095,12 +923,5 @@ int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
if (ret < 0)
return ret;
- ret = dec_thread_start(ist);
- if (ret < 0) {
- av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
- av_err2str(ret));
- return ret;
- }
-
return 0;
}
--
2.42.0
More information about the ffmpeg-devel
mailing list