[FFmpeg-cvslog] applehttp: Restructure the demuxer to use a custom AVIOContext
Martin Storsjö
git at videolan.org
Tue Apr 5 02:33:09 CEST 2011
ffmpeg | branch: master | Martin Storsjö <martin at martin.st> | Mon Mar 21 00:21:56 2011 +0200| [6cc7f1398257d4ffa89f79d52f10b2cabd9ad232] | committer: Martin Storsjö
applehttp: Restructure the demuxer to use a custom AVIOContext
This avoids issues where EOF at the end of the segment is given
the variant demuxer. Now the demuxers only see one single data
stream (as when using the applehttp protocol handler).
> http://git.videolan.org/gitweb.cgi/ffmpeg.git/?a=commit;h=6cc7f1398257d4ffa89f79d52f10b2cabd9ad232
---
libavformat/applehttp.c | 332 +++++++++++++++++++++++-----------------------
1 files changed, 166 insertions(+), 166 deletions(-)
diff --git a/libavformat/applehttp.c b/libavformat/applehttp.c
index 815013d..7ab3ab7 100644
--- a/libavformat/applehttp.c
+++ b/libavformat/applehttp.c
@@ -30,6 +30,9 @@
#include "avformat.h"
#include "internal.h"
#include <unistd.h>
+#include "avio_internal.h"
+
+#define INITIAL_BUFFER_SIZE 32768
/*
* An apple http stream consists of a playlist with media segment files,
@@ -56,7 +59,11 @@ struct segment {
struct variant {
int bandwidth;
char url[MAX_URL_SIZE];
- AVIOContext *pb;
+ AVIOContext pb;
+ uint8_t* read_buffer;
+ URLContext *input;
+ AVFormatContext *parent;
+ int index;
AVFormatContext *ctx;
AVPacket pkt;
int stream_offset;
@@ -66,16 +73,17 @@ struct variant {
int start_seq_no;
int n_segments;
struct segment **segments;
- int needed;
+ int needed, cur_needed;
+ int cur_seq_no;
+ int64_t last_load_time;
};
typedef struct AppleHTTPContext {
int n_variants;
struct variant **variants;
int cur_seq_no;
- int64_t last_load_time;
- int64_t last_packet_dts;
- int max_start_seq, min_end_seq;
+ int end_of_segment;
+ int first_packet;
} AppleHTTPContext;
static int read_chomp_line(AVIOContext *s, char *buf, int maxlen)
@@ -102,8 +110,9 @@ static void free_variant_list(AppleHTTPContext *c)
struct variant *var = c->variants[i];
free_segment_list(var);
av_free_packet(&var->pkt);
- if (var->pb)
- avio_close(var->pb);
+ av_free(var->pb.buffer);
+ if (var->input)
+ url_close(var->input);
if (var->ctx) {
var->ctx->pb = NULL;
av_close_input_file(var->ctx);
@@ -238,7 +247,8 @@ static int parse_playlist(AppleHTTPContext *c, const char *url,
}
}
}
- c->last_load_time = av_gettime();
+ if (var)
+ var->last_load_time = av_gettime();
fail:
if (close_in)
@@ -246,6 +256,71 @@ fail:
return ret;
}
+static int read_data(void *opaque, uint8_t *buf, int buf_size)
+{
+ struct variant *v = opaque;
+ AppleHTTPContext *c = v->parent->priv_data;
+ int ret, i;
+
+restart:
+ if (!v->input) {
+reload:
+ /* If this is a live stream and target_duration has elapsed since
+ * the last playlist reload, reload the variant playlists now. */
+ if (!v->finished &&
+ av_gettime() - v->last_load_time >= v->target_duration*1000000 &&
+ (ret = parse_playlist(c, v->url, v, NULL)) < 0)
+ return ret;
+ if (v->cur_seq_no < v->start_seq_no) {
+ av_log(NULL, AV_LOG_WARNING,
+ "skipping %d segments ahead, expired from playlists\n",
+ v->start_seq_no - v->cur_seq_no);
+ v->cur_seq_no = v->start_seq_no;
+ }
+ if (v->cur_seq_no >= v->start_seq_no + v->n_segments) {
+ if (v->finished)
+ return AVERROR_EOF;
+ while (av_gettime() - v->last_load_time <
+ v->target_duration*1000000) {
+ if (url_interrupt_cb())
+ return AVERROR_EXIT;
+ usleep(100*1000);
+ }
+ /* Enough time has elapsed since the last reload */
+ goto reload;
+ }
+
+ ret = url_open(&v->input,
+ v->segments[v->cur_seq_no - v->start_seq_no]->url,
+ URL_RDONLY);
+ if (ret < 0)
+ return ret;
+ }
+ ret = url_read(v->input, buf, buf_size);
+ if (ret > 0)
+ return ret;
+ if (ret < 0 && ret != AVERROR_EOF)
+ return ret;
+ url_close(v->input);
+ v->input = NULL;
+ v->cur_seq_no++;
+
+ c->end_of_segment = 1;
+ c->cur_seq_no = v->cur_seq_no;
+
+ v->needed = 0;
+ for (i = v->stream_offset; i < v->stream_offset + v->ctx->nb_streams; i++) {
+ if (v->parent->streams[i]->discard < AVDISCARD_ALL)
+ v->needed = 1;
+ }
+ if (!v->needed) {
+ av_log(v->parent, AV_LOG_INFO, "No longer receiving variant %d\n",
+ v->index);
+ return AVERROR_EOF;
+ }
+ goto restart;
+}
+
static int applehttp_read_header(AVFormatContext *s, AVFormatParameters *ap)
{
AppleHTTPContext *c = s->priv_data;
@@ -284,20 +359,35 @@ static int applehttp_read_header(AVFormatContext *s, AVFormatParameters *ap)
s->duration = duration * AV_TIME_BASE;
}
- c->min_end_seq = INT_MAX;
/* Open the demuxer for each variant */
for (i = 0; i < c->n_variants; i++) {
struct variant *v = c->variants[i];
+ AVInputFormat *in_fmt = NULL;
if (v->n_segments == 0)
continue;
- c->max_start_seq = FFMAX(c->max_start_seq, v->start_seq_no);
- c->min_end_seq = FFMIN(c->min_end_seq, v->start_seq_no +
- v->n_segments);
- ret = av_open_input_file(&v->ctx, v->segments[0]->url, NULL, 0, NULL);
+
+ v->index = i;
+ v->needed = 1;
+ v->parent = s;
+
+ /* If this is a live stream with more than 3 segments, start at the
+ * third last segment. */
+ v->cur_seq_no = v->start_seq_no;
+ if (!v->finished && v->n_segments > 3)
+ v->cur_seq_no = v->start_seq_no + v->n_segments - 3;
+
+ v->read_buffer = av_malloc(INITIAL_BUFFER_SIZE);
+ ffio_init_context(&v->pb, v->read_buffer, INITIAL_BUFFER_SIZE, 0, v,
+ read_data, NULL, NULL);
+ v->pb.seekable = 0;
+ ret = av_probe_input_buffer(&v->pb, &in_fmt, v->segments[0]->url,
+ NULL, 0, 0);
+ if (ret < 0)
+ goto fail;
+ ret = av_open_input_stream(&v->ctx, &v->pb, v->segments[0]->url,
+ in_fmt, NULL);
if (ret < 0)
goto fail;
- avio_close(v->ctx->pb);
- v->ctx->pb = NULL;
v->stream_offset = stream_offset;
/* Create new AVStreams for each stream in this variant */
for (j = 0; j < v->ctx->nb_streams; j++) {
@@ -310,13 +400,8 @@ static int applehttp_read_header(AVFormatContext *s, AVFormatParameters *ap)
}
stream_offset += v->ctx->nb_streams;
}
- c->last_packet_dts = AV_NOPTS_VALUE;
- c->cur_seq_no = c->max_start_seq;
- /* If this is a live stream with more than 3 segments, start at the
- * third last segment. */
- if (!c->variants[0]->finished && c->min_end_seq - c->max_start_seq > 3)
- c->cur_seq_no = c->min_end_seq - 2;
+ c->first_packet = 1;
return 0;
fail:
@@ -324,98 +409,61 @@ fail:
return ret;
}
-static int open_variant(AppleHTTPContext *c, struct variant *var, int skip)
+static int recheck_discard_flags(AVFormatContext *s, int first)
{
- int ret;
+ AppleHTTPContext *c = s->priv_data;
+ int i, changed = 0;
- if (c->cur_seq_no < var->start_seq_no) {
- av_log(NULL, AV_LOG_WARNING,
- "seq %d not available in variant %s, skipping\n",
- var->start_seq_no, var->url);
- return 0;
+ /* Check if any new streams are needed */
+ for (i = 0; i < c->n_variants; i++)
+ c->variants[i]->cur_needed = 0;;
+
+ for (i = 0; i < s->nb_streams; i++) {
+ AVStream *st = s->streams[i];
+ struct variant *var = c->variants[s->streams[i]->id];
+ if (st->discard < AVDISCARD_ALL)
+ var->cur_needed = 1;
}
- if (c->cur_seq_no - var->start_seq_no >= var->n_segments)
- return c->variants[0]->finished ? AVERROR_EOF : 0;
- ret = avio_open(&var->pb,
- var->segments[c->cur_seq_no - var->start_seq_no]->url,
- URL_RDONLY);
- if (ret < 0)
- return ret;
- var->ctx->pb = var->pb;
- /* If this is a new segment in parallel with another one already opened,
- * skip ahead so they're all at the same dts. */
- if (skip && c->last_packet_dts != AV_NOPTS_VALUE) {
- while (1) {
- ret = av_read_frame(var->ctx, &var->pkt);
- if (ret < 0) {
- if (ret == AVERROR_EOF) {
- reset_packet(&var->pkt);
- return 0;
- }
- return ret;
- }
- if (var->pkt.dts >= c->last_packet_dts)
- break;
- av_free_packet(&var->pkt);
+ for (i = 0; i < c->n_variants; i++) {
+ struct variant *v = c->variants[i];
+ if (v->cur_needed && !v->needed) {
+ v->needed = 1;
+ changed = 1;
+ v->cur_seq_no = c->cur_seq_no;
+ v->pb.eof_reached = 0;
+ av_log(s, AV_LOG_INFO, "Now receiving variant %d\n", i);
+ } else if (first && !v->cur_needed && v->needed) {
+ if (v->input)
+ url_close(v->input);
+ v->input = NULL;
+ v->needed = 0;
+ changed = 1;
+ av_log(s, AV_LOG_INFO, "No longer receiving variant %d\n", i);
}
}
- return 0;
+ return changed;
}
static int applehttp_read_packet(AVFormatContext *s, AVPacket *pkt)
{
AppleHTTPContext *c = s->priv_data;
- int ret, i, minvariant = -1, first = 1, needed = 0, changed = 0,
- variants = 0;
+ int ret, i, minvariant = -1;
- /* Recheck the discard flags - which streams are desired at the moment */
- for (i = 0; i < c->n_variants; i++)
- c->variants[i]->needed = 0;
- for (i = 0; i < s->nb_streams; i++) {
- AVStream *st = s->streams[i];
- struct variant *var = c->variants[s->streams[i]->id];
- if (st->discard < AVDISCARD_ALL) {
- var->needed = 1;
- needed++;
- }
- /* Copy the discard flag to the chained demuxer, to indicate which
- * streams are desired. */
- var->ctx->streams[i - var->stream_offset]->discard = st->discard;
+ if (c->first_packet) {
+ recheck_discard_flags(s, 1);
+ c->first_packet = 0;
}
- if (!needed)
- return AVERROR_EOF;
+
start:
+ c->end_of_segment = 0;
for (i = 0; i < c->n_variants; i++) {
struct variant *var = c->variants[i];
- /* Close unneeded streams, open newly requested streams */
- if (var->pb && !var->needed) {
- av_log(s, AV_LOG_DEBUG,
- "Closing variant stream %d, no longer needed\n", i);
- av_free_packet(&var->pkt);
- reset_packet(&var->pkt);
- avio_close(var->pb);
- var->pb = NULL;
- changed = 1;
- } else if (!var->pb && var->needed) {
- if (first)
- av_log(s, AV_LOG_DEBUG, "Opening variant stream %d\n", i);
- if (first && !var->finished)
- if ((ret = parse_playlist(c, var->url, var, NULL)) < 0)
- return ret;
- ret = open_variant(c, var, first);
- if (ret < 0)
- return ret;
- changed = 1;
- }
- /* Count the number of open variants */
- if (var->pb)
- variants++;
/* Make sure we've got one buffered packet from each open variant
* stream */
- if (var->pb && !var->pkt.data) {
+ if (var->needed && !var->pkt.data) {
ret = av_read_frame(var->ctx, &var->pkt);
if (ret < 0) {
- if (!var->pb->eof_reached)
+ if (!var->pb.eof_reached)
return ret;
reset_packet(&var->pkt);
}
@@ -427,71 +475,18 @@ start:
minvariant = i;
}
}
- if (first && changed)
- av_log(s, AV_LOG_INFO, "Receiving %d variant streams\n", variants);
+ if (c->end_of_segment) {
+ if (recheck_discard_flags(s, 0))
+ goto start;
+ }
/* If we got a packet, return it */
if (minvariant >= 0) {
*pkt = c->variants[minvariant]->pkt;
pkt->stream_index += c->variants[minvariant]->stream_offset;
reset_packet(&c->variants[minvariant]->pkt);
- c->last_packet_dts = pkt->dts;
return 0;
}
- /* No more packets - eof reached in all variant streams, close the
- * current segments. */
- for (i = 0; i < c->n_variants; i++) {
- struct variant *var = c->variants[i];
- if (var->pb) {
- avio_close(var->pb);
- var->pb = NULL;
- }
- }
- /* Indicate that we're opening the next segment, not opening a new
- * variant stream in parallel, so we shouldn't try to skip ahead. */
- first = 0;
- c->cur_seq_no++;
-reload:
- if (!c->variants[0]->finished) {
- /* If this is a live stream and target_duration has elapsed since
- * the last playlist reload, reload the variant playlists now. */
- int64_t now = av_gettime();
- if (now - c->last_load_time >= c->variants[0]->target_duration*1000000) {
- c->max_start_seq = 0;
- c->min_end_seq = INT_MAX;
- for (i = 0; i < c->n_variants; i++) {
- struct variant *var = c->variants[i];
- if (var->needed) {
- if ((ret = parse_playlist(c, var->url, var, NULL)) < 0)
- return ret;
- c->max_start_seq = FFMAX(c->max_start_seq,
- var->start_seq_no);
- c->min_end_seq = FFMIN(c->min_end_seq,
- var->start_seq_no + var->n_segments);
- }
- }
- }
- }
- if (c->cur_seq_no < c->max_start_seq) {
- av_log(NULL, AV_LOG_WARNING,
- "skipping %d segments ahead, expired from playlists\n",
- c->max_start_seq - c->cur_seq_no);
- c->cur_seq_no = c->max_start_seq;
- }
- /* If more segments exist, open the next one */
- if (c->cur_seq_no < c->min_end_seq)
- goto start;
- /* We've reached the end of the playlists - return eof if this is a
- * non-live stream, wait until the next playlist reload if it is live. */
- if (c->variants[0]->finished)
- return AVERROR_EOF;
- while (av_gettime() - c->last_load_time <
- c->variants[0]->target_duration*1000000) {
- if (url_interrupt_cb())
- return AVERROR_EXIT;
- usleep(100*1000);
- }
- /* Enough time has elapsed since the last reload */
- goto reload;
+ return AVERROR_EOF;
}
static int applehttp_close(AVFormatContext *s)
@@ -506,38 +501,43 @@ static int applehttp_read_seek(AVFormatContext *s, int stream_index,
int64_t timestamp, int flags)
{
AppleHTTPContext *c = s->priv_data;
- int64_t pos = 0;
- int i;
- struct variant *var = c->variants[0];
+ int i, j, ret;
if ((flags & AVSEEK_FLAG_BYTE) || !c->variants[0]->finished)
return AVERROR(ENOSYS);
/* Reset the variants */
- c->last_packet_dts = AV_NOPTS_VALUE;
for (i = 0; i < c->n_variants; i++) {
struct variant *var = c->variants[i];
- if (var->pb) {
- avio_close(var->pb);
- var->pb = NULL;
+ if (var->input) {
+ url_close(var->input);
+ var->input = NULL;
}
av_free_packet(&var->pkt);
reset_packet(&var->pkt);
+ var->pb.eof_reached = 0;
}
timestamp = av_rescale_rnd(timestamp, 1, stream_index >= 0 ?
s->streams[stream_index]->time_base.den :
AV_TIME_BASE, flags & AVSEEK_FLAG_BACKWARD ?
AV_ROUND_DOWN : AV_ROUND_UP);
- /* Locate the segment that contains the target timestamp */
- for (i = 0; i < var->n_segments; i++) {
- if (timestamp >= pos && timestamp < pos + var->segments[i]->duration) {
- c->cur_seq_no = var->start_seq_no + i;
- return 0;
+ ret = AVERROR(EIO);
+ for (i = 0; i < c->n_variants; i++) {
+ struct variant *var = c->variants[i];
+ int64_t pos = 0;
+ /* Locate the segment that contains the target timestamp */
+ for (j = 0; j < var->n_segments; j++) {
+ if (timestamp >= pos &&
+ timestamp < pos + var->segments[j]->duration) {
+ var->cur_seq_no = var->start_seq_no + j;
+ ret = 0;
+ break;
+ }
+ pos += var->segments[j]->duration;
}
- pos += var->segments[i]->duration;
}
- return AVERROR(EIO);
+ return ret;
}
static int applehttp_probe(AVProbeData *p)
More information about the ffmpeg-cvslog
mailing list