[FFmpeg-devel] [PATCH 21/24] fftools/ffmpeg_filter: convert to the scheduler
Anton Khirnov
anton at khirnov.net
Sat Nov 4 09:56:30 EET 2023
---
fftools/ffmpeg.c | 44 +--
fftools/ffmpeg.h | 32 +-
fftools/ffmpeg_filter.c | 720 +++++++++++-----------------------------
3 files changed, 204 insertions(+), 592 deletions(-)
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index bd783fe674..1f21008588 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -138,30 +138,6 @@ static struct termios oldtty;
static int restore_tty;
#endif
-/* sub2video hack:
- Convert subtitles to video with alpha to insert them in filter graphs.
- This is a temporary solution until libavfilter gets real subtitles support.
- */
-
-static void sub2video_heartbeat(InputFile *infile, int64_t pts, AVRational tb)
-{
- /* When a frame is read from a file, examine all sub2video streams in
- the same file and send the sub2video frame again. Otherwise, decoded
- video frames could be accumulating in the filter graph while a filter
- (possibly overlay) is desperately waiting for a subtitle frame. */
- for (int i = 0; i < infile->nb_streams; i++) {
- InputStream *ist = infile->streams[i];
-
- if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
- continue;
-
- for (int j = 0; j < ist->nb_filters; j++)
- ifilter_sub2video_heartbeat(ist->filters[j], pts, tb);
- }
-}
-
-/* end of sub2video hack */
-
static void term_exit_sigsafe(void)
{
#if HAVE_TERMIOS_H
@@ -552,8 +528,8 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
if (is_last_report)
av_bprintf(&buf, "L");
- nb_frames_dup = ost->filter->nb_frames_dup;
- nb_frames_drop = ost->filter->nb_frames_drop;
+ nb_frames_dup = atomic_load(&ost->filter->nb_frames_dup);
+ nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop);
vid = 1;
}
@@ -890,9 +866,7 @@ static int choose_output(OutputStream **post)
for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
int64_t opts;
- if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) {
- opts = ost->filter->last_pts;
- } else {
+ {
opts = ost->last_mux_dts == AV_NOPTS_VALUE ?
INT64_MIN : ost->last_mux_dts;
}
@@ -1041,8 +1015,6 @@ static int process_input(int file_index, AVPacket *pkt)
ist = ifile->streams[pkt->stream_index];
- sub2video_heartbeat(ifile, pkt->pts, pkt->time_base);
-
ret = process_input_packet(ist, pkt, 0);
av_packet_unref(pkt);
@@ -1061,8 +1033,6 @@ static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
int ret;
if (ost->filter) {
- if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0)
- return ret;
if (!ist)
return 0;
} else {
@@ -1078,14 +1048,6 @@ static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
if (ret < 0)
return ret == AVERROR_EOF ? 0 : ret;
- // process_input() above might have caused output to become available
- // in multiple filtergraphs, so we process all of them
- for (int i = 0; i < nb_filtergraphs; i++) {
- ret = reap_filters(filtergraphs[i], 0);
- if (ret < 0)
- return ret;
- }
-
return 0;
}
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 975d8b737e..c1b61c83e7 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -84,9 +84,7 @@ enum HWAccelID {
};
enum FrameOpaque {
- FRAME_OPAQUE_REAP_FILTERS = 1,
- FRAME_OPAQUE_CHOOSE_INPUT,
- FRAME_OPAQUE_SUB_HEARTBEAT,
+ FRAME_OPAQUE_SUB_HEARTBEAT = 1,
FRAME_OPAQUE_EOF,
FRAME_OPAQUE_SEND_COMMAND,
};
@@ -313,11 +311,8 @@ typedef struct OutputFilter {
enum AVMediaType type;
- /* pts of the last frame received from this filter, in AV_TIME_BASE_Q */
- int64_t last_pts;
-
- uint64_t nb_frames_dup;
- uint64_t nb_frames_drop;
+ atomic_uint_least64_t nb_frames_dup;
+ atomic_uint_least64_t nb_frames_drop;
} OutputFilter;
typedef struct FilterGraph {
@@ -728,10 +723,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy);
*/
FrameData *frame_data(AVFrame *frame);
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference);
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb);
-
/**
* Set up fallback filtering parameters from a decoder context. They will only
* be used if no frames are ever sent on this input, otherwise the actual
@@ -752,26 +743,9 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch);
void fg_free(FilterGraph **pfg);
-/**
- * Perform a step of transcoding for the specified filter graph.
- *
- * @param[in] graph filter graph to consider
- * @param[out] best_ist input stream where a frame would allow to continue
- * @return 0 for success, <0 for error
- */
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist);
-
void fg_send_command(FilterGraph *fg, double time, const char *target,
const char *command, const char *arg, int all_filters);
-/**
- * Get and encode new output from specified filtergraph, without causing
- * activity.
- *
- * @return 0 for success, <0 for severe errors
- */
-int reap_filters(FilterGraph *fg, int flush);
-
int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
void enc_stats_write(OutputStream *ost, EncStats *es,
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index c01fc0e8ea..7e902670b4 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -21,8 +21,6 @@
#include <stdint.h>
#include "ffmpeg.h"
-#include "ffmpeg_utils.h"
-#include "thread_queue.h"
#include "libavfilter/avfilter.h"
#include "libavfilter/buffersink.h"
@@ -53,10 +51,11 @@ typedef struct FilterGraphPriv {
// true when the filtergraph contains only meta filters
// that do not modify the frame data
int is_meta;
+ // source filters are present in the graph
+ int have_sources;
int disable_conversions;
- int nb_inputs_bound;
- int nb_outputs_bound;
+ unsigned nb_outputs_done;
const char *graph_desc;
@@ -67,41 +66,6 @@ typedef struct FilterGraphPriv {
Scheduler *sch;
unsigned sch_idx;
-
- pthread_t thread;
- /**
- * Queue for sending frames from the main thread to the filtergraph. Has
- * nb_inputs+1 streams - the first nb_inputs stream correspond to
- * filtergraph inputs. Frames on those streams may have their opaque set to
- * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
- * EOF event for the correspondint stream. Will be immediately followed by
- * this stream being send-closed.
- * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of
- * a subtitle heartbeat event. Will only be sent for sub2video streams.
- *
- * The last stream is "control" - the main thread sends empty AVFrames with
- * opaque set to
- * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available
- * from filtergraph outputs. These frames are sent to corresponding
- * streams in queue_out. Finally an empty frame is sent to the control
- * stream in queue_out.
- * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are
- * available the terminating empty frame's opaque will contain the index+1
- * of the filtergraph input to which more input frames should be supplied.
- */
- ThreadQueue *queue_in;
- /**
- * Queue for sending frames from the filtergraph back to the main thread.
- * Has nb_outputs+1 streams - the first nb_outputs stream correspond to
- * filtergraph outputs.
- *
- * The last stream is "control" - see documentation for queue_in for more
- * details.
- */
- ThreadQueue *queue_out;
- // submitting frames to filter thread returned EOF
- // this only happens on thread exit, so is not per-input
- int eof_in;
} FilterGraphPriv;
static FilterGraphPriv *fgp_from_fg(FilterGraph *fg)
@@ -123,6 +87,9 @@ typedef struct FilterGraphThread {
// The output index is stored in frame opaque.
AVFifo *frame_queue_out;
+ // index of the next input to request from the scheduler
+ unsigned next_in;
+ // set to 1 after at least one frame passed through this output
int got_frame;
// EOF status of each input/output, as received by the thread
@@ -253,9 +220,6 @@ typedef struct OutputFilterPriv {
int64_t ts_offset;
int64_t next_pts;
FPSConvContext fps;
-
- // set to 1 after at least one frame passed through this output
- int got_frame;
} OutputFilterPriv;
static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
@@ -653,57 +617,6 @@ static int ifilter_has_all_input_formats(FilterGraph *fg)
static void *filter_thread(void *arg);
-// start the filtering thread once all inputs and outputs are bound
-static int fg_thread_try_start(FilterGraphPriv *fgp)
-{
- FilterGraph *fg = &fgp->fg;
- ObjPool *op;
- int ret = 0;
-
- if (fgp->nb_inputs_bound < fg->nb_inputs ||
- fgp->nb_outputs_bound < fg->nb_outputs)
- return 0;
-
- op = objpool_alloc_frames();
- if (!op)
- return AVERROR(ENOMEM);
-
- fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
- if (!fgp->queue_in) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
- }
-
- // at least one output is mandatory
- op = objpool_alloc_frames();
- if (!op)
- goto fail;
-
- fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
- if (!fgp->queue_out) {
- objpool_free(&op);
- goto fail;
- }
-
- ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
- if (ret) {
- ret = AVERROR(ret);
- av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n",
- fg->index, av_err2str(ret));
- goto fail;
- }
-
- return 0;
-fail:
- if (ret >= 0)
- ret = AVERROR(ENOMEM);
-
- tq_free(&fgp->queue_in);
- tq_free(&fgp->queue_out);
-
- return ret;
-}
-
static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in)
{
AVFilterContext *ctx = inout->filter_ctx;
@@ -729,7 +642,6 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg)
ofilter->graph = fg;
ofp->format = -1;
ofp->index = fg->nb_outputs - 1;
- ofilter->last_pts = AV_NOPTS_VALUE;
return ofilter;
}
@@ -760,10 +672,7 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
return AVERROR(ENOMEM);
}
- fgp->nb_inputs_bound++;
- av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
-
- return fg_thread_try_start(fgp);
+ return 0;
}
static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
@@ -902,10 +811,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
if (ret < 0)
return ret;
- fgp->nb_outputs_bound++;
- av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
-
- return fg_thread_try_start(fgp);
+ return 0;
}
static InputFilter *ifilter_alloc(FilterGraph *fg)
@@ -935,34 +841,6 @@ static InputFilter *ifilter_alloc(FilterGraph *fg)
return ifilter;
}
-static int fg_thread_stop(FilterGraphPriv *fgp)
-{
- void *ret;
-
- if (!fgp->queue_in)
- return 0;
-
- for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
- InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
- ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
-
- if (ifp)
- ifp->eof = 1;
-
- tq_send_finish(fgp->queue_in, i);
- }
-
- for (int i = 0; i <= fgp->fg.nb_outputs; i++)
- tq_receive_finish(fgp->queue_out, i);
-
- pthread_join(fgp->thread, &ret);
-
- tq_free(&fgp->queue_in);
- tq_free(&fgp->queue_out);
-
- return (int)(intptr_t)ret;
-}
-
void fg_free(FilterGraph **pfg)
{
FilterGraph *fg = *pfg;
@@ -972,8 +850,6 @@ void fg_free(FilterGraph **pfg)
return;
fgp = fgp_from_fg(fg);
- fg_thread_stop(fgp);
-
avfilter_graph_free(&fg->graph);
for (int j = 0; j < fg->nb_inputs; j++) {
InputFilter *ifilter = fg->inputs[j];
@@ -1072,6 +948,15 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch)
if (ret < 0)
goto fail;
+ for (unsigned i = 0; i < graph->nb_filters; i++) {
+ const AVFilter *f = graph->filters[i]->filter;
+ if (!avfilter_filter_pad_count(f, 0) &&
+ !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) {
+ fgp->have_sources = 1;
+ break;
+ }
+ }
+
for (AVFilterInOut *cur = inputs; cur; cur = cur->next) {
InputFilter *const ifilter = ifilter_alloc(fg);
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
@@ -1792,6 +1677,7 @@ static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
AVBufferRef *hw_device;
AVFilterInOut *inputs, *outputs, *cur;
int ret, i, simple = filtergraph_is_simple(fg);
+ int have_input_eof = 0;
const char *graph_desc = fgp->graph_desc;
cleanup_filtergraph(fg);
@@ -1914,11 +1800,18 @@ static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
ret = av_buffersrc_add_frame(ifp->filter, NULL);
if (ret < 0)
goto fail;
+ have_input_eof = 1;
}
}
- return 0;
+ if (have_input_eof) {
+ // make sure the EOF propagates to the end of the graph
+ ret = avfilter_graph_request_oldest(fg->graph);
+ if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
+ goto fail;
+ }
+ return 0;
fail:
cleanup_filtergraph(fg);
return ret;
@@ -2174,7 +2067,7 @@ static void video_sync_process(OutputFilterPriv *ofp, AVFrame *frame,
fps->frames_prev_hist[2]);
if (!*nb_frames && fps->last_dropped) {
- ofilter->nb_frames_drop++;
+ atomic_fetch_add(&ofilter->nb_frames_drop, 1);
fps->last_dropped++;
}
@@ -2252,21 +2145,23 @@ finish:
fps->frames_prev_hist[0] = *nb_frames_prev;
if (*nb_frames_prev == 0 && fps->last_dropped) {
- ofilter->nb_frames_drop++;
+ atomic_fetch_add(&ofilter->nb_frames_drop, 1);
av_log(ost, AV_LOG_VERBOSE,
"*** dropping frame %"PRId64" at ts %"PRId64"\n",
fps->frame_number, fps->last_frame->pts);
}
if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames > *nb_frames_prev)) {
+ uint64_t nb_frames_dup;
if (*nb_frames > dts_error_threshold * 30) {
av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too large, skipping\n", *nb_frames - 1);
- ofilter->nb_frames_drop++;
+ atomic_fetch_add(&ofilter->nb_frames_drop, 1);
*nb_frames = 0;
return;
}
- ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev);
+ nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup,
+ *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev));
av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames - 1);
- if (ofilter->nb_frames_dup > fps->dup_warning) {
+ if (nb_frames_dup > fps->dup_warning) {
av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames duplicated\n", fps->dup_warning);
fps->dup_warning *= 10;
}
@@ -2276,8 +2171,57 @@ finish:
fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY);
}
+static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt)
+{
+ FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
+ int ret;
+
+ // we are finished and no frames were ever seen at this output,
+ // at least initialize the encoder with a dummy frame
+ if (!fgt->got_frame) {
+ AVFrame *frame = fgt->frame;
+ FrameData *fd;
+
+ frame->time_base = ofp->tb_out;
+ frame->format = ofp->format;
+
+ frame->width = ofp->width;
+ frame->height = ofp->height;
+ frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
+
+ frame->sample_rate = ofp->sample_rate;
+ if (ofp->ch_layout.nb_channels) {
+ ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
+ if (ret < 0)
+ return ret;
+ }
+
+ fd = frame_data(frame);
+ if (!fd)
+ return AVERROR(ENOMEM);
+
+ fd->frame_rate_filter = ofp->fps.framerate;
+
+ av_assert0(!frame->buf[0]);
+
+ av_log(ofp->ofilter.ost, AV_LOG_WARNING,
+ "No filtered frames for output stream, trying to "
+ "initialize anyway.\n");
+
+ ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame);
+ if (ret < 0) {
+ av_frame_unref(frame);
+ return ret;
+ }
+ }
+
+ fgt->eof_out[ofp->index] = 1;
+
+ return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL);
+}
+
static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
AVFrame *frame_prev = ofp->fps.last_frame;
@@ -2324,28 +2268,17 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
frame_out = frame;
}
- if (buffer) {
- AVFrame *f = av_frame_alloc();
-
- if (!f) {
- av_frame_unref(frame_out);
- return AVERROR(ENOMEM);
- }
-
- av_frame_move_ref(f, frame_out);
- f->opaque = (void*)(intptr_t)ofp->index;
-
- ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
- if (ret < 0) {
- av_frame_free(&f);
- return AVERROR(ENOMEM);
- }
- } else {
- // return the frame to the main thread
- ret = tq_send(fgp->queue_out, ofp->index, frame_out);
+ {
+ // send the frame to consumers
+ ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame_out);
if (ret < 0) {
av_frame_unref(frame_out);
- fgt->eof_out[ofp->index] = 1;
+
+ if (!fgt->eof_out[ofp->index]) {
+ fgt->eof_out[ofp->index] = 1;
+ fgp->nb_outputs_done++;
+ }
+
return ret == AVERROR_EOF ? 0 : ret;
}
}
@@ -2366,16 +2299,14 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
av_frame_move_ref(frame_prev, frame);
}
- if (!frame) {
- tq_send_finish(fgp->queue_out, ofp->index);
- fgt->eof_out[ofp->index] = 1;
- }
+ if (!frame)
+ return close_output(ofp, fgt);
return 0;
}
static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
OutputStream *ost = ofp->ofilter.ost;
@@ -2385,8 +2316,8 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
ret = av_buffersink_get_frame_flags(filter, frame,
AV_BUFFERSINK_FLAG_NO_REQUEST);
- if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
- ret = fg_output_frame(ofp, fgt, NULL, buffer);
+ if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) {
+ ret = fg_output_frame(ofp, fgt, NULL);
return (ret < 0) ? ret : 1;
} else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
return 1;
@@ -2440,7 +2371,7 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
fd->frame_rate_filter = ofp->fps.framerate;
}
- ret = fg_output_frame(ofp, fgt, frame, buffer);
+ ret = fg_output_frame(ofp, fgt, frame);
av_frame_unref(frame);
if (ret < 0)
return ret;
@@ -2448,44 +2379,68 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
return 0;
}
-/* retrieve all frames available at filtergraph outputs and either send them to
- * the main thread (buffer=0) or buffer them for later (buffer=1) */
+/* retrieve all frames available at filtergraph outputs
+ * and send them to consumers */
static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
- AVFrame *frame, int buffer)
+ AVFrame *frame)
{
FilterGraphPriv *fgp = fgp_from_fg(fg);
- int ret = 0;
+ int did_step = 0;
- if (!fg->graph)
- return 0;
-
- // process buffered frames
- if (!buffer) {
- AVFrame *f;
-
- while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
- int out_idx = (intptr_t)f->opaque;
- f->opaque = NULL;
- ret = tq_send(fgp->queue_out, out_idx, f);
- av_frame_free(&f);
- if (ret < 0 && ret != AVERROR_EOF)
- return ret;
+ // graph not configured, just select the input to request
+ if (!fg->graph) {
+ for (int i = 0; i < fg->nb_inputs; i++) {
+ InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
+ if (ifp->format < 0 && !fgt->eof_in[i]) {
+ fgt->next_in = i;
+ return 0;
+ }
}
+
+ // This state - graph is not configured, but all inputs are either
+ // initialized or EOF - should be unreachable because sending EOF to a
+ // filter without even a fallback format should fail
+ av_assert0(0);
+ return AVERROR_BUG;
}
- /* Reap all buffers present in the buffer sinks */
- for (int i = 0; i < fg->nb_outputs; i++) {
- OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
- int ret = 0;
+ while (1) {
+ int ret;
- while (!ret) {
- ret = fg_output_step(ofp, fgt, frame, buffer);
- if (ret < 0)
- return ret;
+ ret = avfilter_graph_request_oldest(fg->graph);
+ if (ret == AVERROR(EAGAIN)) {
+ fgt->next_in = choose_input(fg, fgt);
+ break;
+ } else if (ret < 0) {
+ if (ret == AVERROR_EOF)
+ av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF, finishing\n");
+ else
+ av_log(fg, AV_LOG_ERROR,
+ "Error requesting a frame from the filtergraph: %s\n",
+ av_err2str(ret));
+ return ret;
}
- }
+ fgt->next_in = fg->nb_inputs;
- return 0;
+ // return after one iteration, so that scheduler can rate-control us
+ if (did_step && fgp->have_sources)
+ return 0;
+
+ /* Reap all buffers present in the buffer sinks */
+ for (int i = 0; i < fg->nb_outputs; i++) {
+ OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
+
+ ret = 0;
+ while (!ret) {
+ ret = fg_output_step(ofp, fgt, frame);
+ if (ret < 0)
+ return ret;
+ }
+ }
+ did_step = 1;
+ };
+
+ return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0;
}
static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
@@ -2561,6 +2516,9 @@ static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter,
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
int ret;
+ if (fgt->eof_in[ifp->index])
+ return 0;
+
fgt->eof_in[ifp->index] = 1;
if (ifp->filter) {
@@ -2662,7 +2620,7 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
return ret;
}
- ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0;
+ ret = fg->graph ? read_frames(fg, fgt, tmp) : 0;
av_frame_free(&tmp);
if (ret < 0)
return ret;
@@ -2695,80 +2653,6 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
return 0;
}
-static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
- AVFrame *frame)
-{
- const enum FrameOpaque msg = (intptr_t)frame->opaque;
- FilterGraph *fg = &fgp->fg;
- int graph_eof = 0;
- int ret;
-
- frame->opaque = NULL;
- av_assert0(msg > 0);
- av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]);
-
- if (!fg->graph) {
- // graph not configured yet, ignore all messages other than choosing
- // the input to read from
- if (msg != FRAME_OPAQUE_CHOOSE_INPUT)
- goto done;
-
- for (int i = 0; i < fg->nb_inputs; i++) {
- InputFilter *ifilter = fg->inputs[i];
- InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
- if (ifp->format < 0 && !fgt->eof_in[i]) {
- frame->opaque = (void*)(intptr_t)(i + 1);
- goto done;
- }
- }
-
- // This state - graph is not configured, but all inputs are either
- // initialized or EOF - should be unreachable because sending EOF to a
- // filter without even a fallback format should fail
- av_assert0(0);
- return AVERROR_BUG;
- }
-
- if (msg == FRAME_OPAQUE_SEND_COMMAND) {
- FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
- send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters);
- av_frame_unref(frame);
- goto done;
- }
-
- if (msg == FRAME_OPAQUE_CHOOSE_INPUT) {
- ret = avfilter_graph_request_oldest(fg->graph);
-
- graph_eof = ret == AVERROR_EOF;
-
- if (ret == AVERROR(EAGAIN)) {
- frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1);
- goto done;
- } else if (ret < 0 && !graph_eof)
- return ret;
- }
-
- ret = read_frames(fg, fgt, frame, 0);
- if (ret < 0) {
- av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n");
- return ret;
- }
-
- if (graph_eof)
- return AVERROR_EOF;
-
- // signal to the main thread that we are done processing the message
-done:
- ret = tq_send(fgp->queue_out, fg->nb_outputs, frame);
- if (ret < 0) {
- if (ret != AVERROR_EOF)
- av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
- return ret;
- }
-
- return 0;
-}
-
static void fg_thread_set_name(const FilterGraph *fg)
{
char name[16];
@@ -2855,294 +2739,94 @@ static void *filter_thread(void *arg)
InputFilter *ifilter;
InputFilterPriv *ifp;
enum FrameOpaque o;
- int input_idx, eof_frame;
+ unsigned input_idx = fgt.next_in;
- input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
- if (input_idx < 0 ||
- (input_idx == fg->nb_inputs && input_status < 0)) {
+ input_status = sch_filter_receive(fgp->sch, fgp->sch_idx,
+ &input_idx, fgt.frame);
+ if (input_status == AVERROR_EOF) {
av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
break;
+ } else if (input_status == AVERROR(EAGAIN)) {
+ // should only happen when we didn't request any input
+ av_assert0(input_idx == fg->nb_inputs);
+ goto read_frames;
}
+ av_assert0(input_status >= 0);
+
+ o = (intptr_t)fgt.frame->opaque;
o = (intptr_t)fgt.frame->opaque;
// message on the control stream
if (input_idx == fg->nb_inputs) {
- ret = msg_process(fgp, &fgt, fgt.frame);
- if (ret < 0)
- goto finish;
+ FilterCommand *fc;
+ av_assert0(o == FRAME_OPAQUE_SEND_COMMAND && fgt.frame->buf[0]);
+
+ fc = (FilterCommand*)fgt.frame->buf[0]->data;
+ send_command(fg, fc->time, fc->target, fc->command, fc->arg,
+ fc->all_filters);
+ av_frame_unref(fgt.frame);
continue;
}
// we received an input frame or EOF
ifilter = fg->inputs[input_idx];
ifp = ifp_from_ifilter(ifilter);
- eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF;
+
if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT;
ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL);
- } else if (input_status >= 0 && fgt.frame->buf[0]) {
+ } else if (fgt.frame->buf[0]) {
ret = send_frame(fg, &fgt, ifilter, fgt.frame);
} else {
- int64_t pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE;
- AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 };
- ret = send_eof(&fgt, ifilter, pts, tb);
+ av_assert1(o == FRAME_OPAQUE_EOF);
+ ret = send_eof(&fgt, ifilter, fgt.frame->pts, fgt.frame->time_base);
}
av_frame_unref(fgt.frame);
if (ret < 0)
+ goto finish;
+
+read_frames:
+ // retrieve all newly avalable frames
+ ret = read_frames(fg, &fgt, fgt.frame);
+ if (ret == AVERROR_EOF) {
+ av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n");
break;
-
- if (eof_frame) {
- // an EOF frame is immediately followed by sender closing
- // the corresponding stream, so retrieve that event
- input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
- av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index);
- }
-
- // signal to the main thread that we are done
- ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
- if (ret < 0) {
- if (ret == AVERROR_EOF)
- break;
-
- av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
+ } else if (ret < 0) {
+ av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: %s\n",
+ av_err2str(ret));
goto finish;
}
}
+ for (unsigned i = 0; i < fg->nb_outputs; i++) {
+ OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
+
+ if (fgt.eof_out[i])
+ continue;
+
+ ret = fg_output_frame(ofp, &fgt, NULL);
+ if (ret < 0)
+ goto finish;
+ }
+
finish:
// EOF is normal termination
if (ret == AVERROR_EOF)
ret = 0;
- for (int i = 0; i <= fg->nb_inputs; i++)
- tq_receive_finish(fgp->queue_in, i);
- for (int i = 0; i <= fg->nb_outputs; i++)
- tq_send_finish(fgp->queue_out, i);
-
fg_thread_uninit(&fgt);
- av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n");
-
return (void*)(intptr_t)ret;
}
-static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
- AVFrame *frame, enum FrameOpaque type)
-{
- InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
- int output_idx, ret;
-
- if (ifp->eof) {
- av_frame_unref(frame);
- return AVERROR_EOF;
- }
-
- frame->opaque = (void*)(intptr_t)type;
-
- ret = tq_send(fgp->queue_in, ifp->index, frame);
- if (ret < 0) {
- ifp->eof = 1;
- av_frame_unref(frame);
- return ret;
- }
-
- if (type == FRAME_OPAQUE_EOF)
- tq_send_finish(fgp->queue_in, ifp->index);
-
- // wait for the frame to be processed
- ret = tq_receive(fgp->queue_out, &output_idx, frame);
- av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
-
- return ret;
-}
-
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
-{
- FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
- int ret;
-
- if (keep_reference) {
- ret = av_frame_ref(fgp->frame, frame);
- if (ret < 0)
- return ret;
- } else
- av_frame_move_ref(fgp->frame, frame);
-
- return thread_send_frame(fgp, ifilter, fgp->frame, 0);
-}
-
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
- FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
- int ret;
-
- fgp->frame->pts = pts;
- fgp->frame->time_base = tb;
-
- ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
-
- return ret == AVERROR_EOF ? 0 : ret;
-}
-
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
- FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-
- fgp->frame->pts = pts;
- fgp->frame->time_base = tb;
-
- thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT);
-}
-
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
-{
- FilterGraphPriv *fgp = fgp_from_fg(graph);
- int ret, got_frames = 0;
-
- if (fgp->eof_in)
- return AVERROR_EOF;
-
- // signal to the filtering thread to return all frames it can
- av_assert0(!fgp->frame->buf[0]);
- fgp->frame->opaque = (void*)(intptr_t)(best_ist ?
- FRAME_OPAQUE_CHOOSE_INPUT :
- FRAME_OPAQUE_REAP_FILTERS);
-
- ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
- if (ret < 0) {
- fgp->eof_in = 1;
- goto finish;
- }
-
- while (1) {
- OutputFilter *ofilter;
- OutputFilterPriv *ofp;
- OutputStream *ost;
- int output_idx;
-
- ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-
- // EOF on the whole queue or the control stream
- if (output_idx < 0 ||
- (ret < 0 && output_idx == graph->nb_outputs))
- goto finish;
-
- // EOF for a specific stream
- if (ret < 0) {
- ofilter = graph->outputs[output_idx];
- ofp = ofp_from_ofilter(ofilter);
-
- // we are finished and no frames were ever seen at this output,
- // at least initialize the encoder with a dummy frame
- if (!ofp->got_frame) {
- AVFrame *frame = fgp->frame;
- FrameData *fd;
-
- frame->time_base = ofp->tb_out;
- frame->format = ofp->format;
-
- frame->width = ofp->width;
- frame->height = ofp->height;
- frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
-
- frame->sample_rate = ofp->sample_rate;
- if (ofp->ch_layout.nb_channels) {
- ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
- if (ret < 0)
- return ret;
- }
-
- fd = frame_data(frame);
- if (!fd)
- return AVERROR(ENOMEM);
-
- fd->frame_rate_filter = ofp->fps.framerate;
-
- av_assert0(!frame->buf[0]);
-
- av_log(ofilter->ost, AV_LOG_WARNING,
- "No filtered frames for output stream, trying to "
- "initialize anyway.\n");
-
- enc_open(ofilter->ost, frame);
- av_frame_unref(frame);
- }
-
- close_output_stream(graph->outputs[output_idx]->ost);
- continue;
- }
-
- // request was fully processed by the filtering thread,
- // return the input stream to read from, if needed
- if (output_idx == graph->nb_outputs) {
- int input_idx = (intptr_t)fgp->frame->opaque - 1;
- av_assert0(input_idx <= graph->nb_inputs);
-
- if (best_ist) {
- *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ?
- ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
-
- if (input_idx < 0 && !got_frames) {
- for (int i = 0; i < graph->nb_outputs; i++)
- graph->outputs[i]->ost->unavailable = 1;
- }
- }
- break;
- }
-
- // got a frame from the filtering thread, send it for encoding
- ofilter = graph->outputs[output_idx];
- ost = ofilter->ost;
- ofp = ofp_from_ofilter(ofilter);
-
- if (ost->finished) {
- av_frame_unref(fgp->frame);
- tq_receive_finish(fgp->queue_out, output_idx);
- continue;
- }
-
- if (fgp->frame->pts != AV_NOPTS_VALUE) {
- ofilter->last_pts = av_rescale_q(fgp->frame->pts,
- fgp->frame->time_base,
- AV_TIME_BASE_Q);
- }
-
- ret = enc_frame(ost, fgp->frame);
- av_frame_unref(fgp->frame);
- if (ret < 0)
- goto finish;
-
- ofp->got_frame = 1;
- got_frames = 1;
- }
-
-finish:
- if (ret < 0) {
- fgp->eof_in = 1;
- for (int i = 0; i < graph->nb_outputs; i++)
- close_output_stream(graph->outputs[i]->ost);
- }
-
- return ret;
-}
-
-int reap_filters(FilterGraph *fg, int flush)
-{
- return fg_transcode_step(fg, NULL);
-}
-
void fg_send_command(FilterGraph *fg, double time, const char *target,
const char *command, const char *arg, int all_filters)
{
FilterGraphPriv *fgp = fgp_from_fg(fg);
AVBufferRef *buf;
FilterCommand *fc;
- int output_idx, ret;
-
- if (!fgp->queue_in)
- return;
fc = av_mallocz(sizeof(*fc));
if (!fc)
@@ -3168,13 +2852,5 @@ void fg_send_command(FilterGraph *fg, double time, const char *target,
fgp->frame->buf[0] = buf;
fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
- ret = tq_send(fgp->queue_in, fg->nb_inputs + 1, fgp->frame);
- if (ret < 0) {
- av_frame_unref(fgp->frame);
- return;
- }
-
- // wait for the frame to be processed
- ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
- av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
+ sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame);
}
--
2.42.0
More information about the ffmpeg-devel
mailing list