[FFmpeg-devel] [PATCH 3/4] ffserver: Implement ffserver and add Makefile
Stephan Holljes
klaxa1337 at googlemail.com
Thu Apr 12 16:35:48 EEST 2018
---
Makefile | 15 ++
ffserver.c | 451 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 466 insertions(+)
create mode 100644 Makefile
create mode 100644 ffserver.c
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..a57393a
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,15 @@
+all: ffserver
+LAV_FLAGS = $(shell pkg-config --libs --cflags libavformat libavcodec libavutil)
+# LAV_FLAGS = -L/usr/local/lib -lavcodec -lavformat -lavutil
+
+ffserver: segment.o publisher.o ffserver.c
+ cc -g -Wall $(LAV_FLAGS) -lpthread -o ffserver segment.o publisher.o ffserver.c
+
+segment.o: segment.c segment.h
+ cc -g -Wall $(LAV_FLAGS) -lpthread -c segment.c
+
+publisher.o: publisher.c publisher.h
+ cc -g -Wall $(LAV_FLAGS) -lpthread -c publisher.c
+
+clean:
+ rm *.o server
diff --git a/ffserver.c b/ffserver.c
new file mode 100644
index 0000000..ecdcc64
--- /dev/null
+++ b/ffserver.c
@@ -0,0 +1,451 @@
+#include <stdio.h>
+#include <unistd.h>
+#include <inttypes.h>
+#include <pthread.h>
+
+#include <libavutil/log.h>
+#include <libavutil/timestamp.h>
+#include <libavutil/time.h>
+#include <libavutil/opt.h>
+#include <libavformat/avformat.h>
+#include <libavcodec/avcodec.h>
+
+#include "segment.h"
+#include "publisher.h"
+
+#define BUFFER_SECS 30
+#define LISTEN_TIMEOUT_MSEC 1000
+
+struct ReadInfo {
+ struct PublisherContext *pub;
+ AVFormatContext *ifmt_ctx;
+ char *in_filename;
+};
+
+struct WriteInfo {
+ struct PublisherContext *pub;
+ int thread_id;
+};
+
+struct AcceptInfo {
+ struct PublisherContext *pub;
+ AVFormatContext *ifmt_ctx;
+ const char *out_uri;
+};
+
+
+void *read_thread(void *arg)
+{
+ struct ReadInfo *info = (struct ReadInfo*) arg;
+ AVFormatContext *ifmt_ctx = info->ifmt_ctx;
+ int ret, i;
+ int video_idx = -1;
+ int id = 0;
+ int64_t pts, now, start;
+ struct Segment *seg = NULL;
+ AVPacket pkt;
+ AVStream *in_stream;
+ AVRational tb;
+ tb.num = 1;
+ tb.den = AV_TIME_BASE;
+ AVStream *stream;
+ AVCodecContext *avctx;
+ AVCodecParameters *params;
+ enum AVMediaType type;
+
+ if ((ret = avformat_find_stream_info(ifmt_ctx, NULL)) < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Could not get input stream info.\n");
+ goto end;
+ }
+
+ av_log(NULL, AV_LOG_INFO, "Finding video stream.\n");
+ for (i = 0; i < ifmt_ctx->nb_streams; i++) {
+ av_log(NULL, AV_LOG_DEBUG, "Checking stream %d\n", i);
+ stream = ifmt_ctx->streams[i];
+ avctx = avcodec_alloc_context3(NULL);
+ if (!avctx) {
+ av_log(NULL, AV_LOG_ERROR, "Could not allocate AVCodecContext.\n");
+ goto end;
+ }
+ if ((ret = avcodec_parameters_to_context(avctx, stream->codecpar)) < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Could not copy codec parameters.\n");
+ goto end;
+ }
+ params = stream->codecpar;
+ type = params->codec_type;
+ if (type == AVMEDIA_TYPE_VIDEO) {
+ video_idx = i;
+ break;
+ }
+ }
+ if (video_idx == -1) {
+ av_log(NULL, AV_LOG_ERROR, "No video stream found.\n");
+ goto end;
+ }
+
+
+ // All information needed to start segmenting the file is gathered now.
+ // start BUFFER_SECS seconds "in the past" to "catch up" to real-time. Has no effect on streamed sources.
+ start = av_gettime_relative() - BUFFER_SECS * AV_TIME_BASE;
+
+ // segmenting main-loop
+
+ for (;;) {
+ ret = av_read_frame(ifmt_ctx, &pkt);
+ if (ret < 0)
+ break;
+
+ in_stream = ifmt_ctx->streams[pkt.stream_index];
+ if (pkt.pts == AV_NOPTS_VALUE) {
+ pkt.pts = 0;
+ }
+ if (pkt.dts == AV_NOPTS_VALUE) {
+ pkt.dts = 0;
+ }
+
+ // current pts
+ pts = av_rescale_q(pkt.pts, in_stream->time_base, tb);
+
+ // current stream "uptime"
+ now = av_gettime_relative() - start;
+
+ // simulate real-time reading
+ while (pts > now) {
+ usleep(1000);
+ now = av_gettime_relative() - start;
+ }
+
+ // keyframe or first Segment
+ if ((pkt.flags & AV_PKT_FLAG_KEY && pkt.stream_index == video_idx) || !seg) {
+ if (seg) {
+ segment_close(seg);
+
+ publisher_push_segment(info->pub, seg);
+ av_log(NULL, AV_LOG_DEBUG, "New segment pushed.\n");
+ publish(info->pub);
+ av_log(NULL, AV_LOG_DEBUG, "Published new segment.\n");
+ }
+ segment_init(&seg, ifmt_ctx);
+ seg->id = id++;
+ av_log(NULL, AV_LOG_DEBUG, "Starting new segment, id: %d\n", seg->id);
+ }
+
+ segment_ts_append(seg, pkt.dts, pkt.pts);
+ ret = av_write_frame(seg->fmt_ctx, &pkt);
+ av_packet_unref(&pkt);
+ if (ret < 0) {
+ av_log(NULL,AV_LOG_ERROR, "av_write_frame() failed.\n");
+ goto end;
+ }
+ }
+
+ if (ret < 0 && ret != AVERROR_EOF) {
+ av_log(NULL, AV_LOG_ERROR, "Error occurred: %s\n", av_err2str(ret));
+ goto end;
+ }
+
+ segment_close(seg);
+ publisher_push_segment(info->pub, seg);
+ publish(info->pub);
+
+
+end:
+ if (avctx)
+ avcodec_free_context(&avctx);
+ avformat_close_input(&ifmt_ctx);
+ info->pub->shutdown = 1;
+ return NULL;
+}
+
+void write_segment(struct Client *c)
+{
+ struct Segment *seg;
+ int ret;
+ int pkt_count = 0;
+ if (av_fifo_size(c->buffer) > 0) {
+ AVFormatContext *fmt_ctx;
+ AVIOContext *avio_ctx;
+ AVPacket pkt;
+ struct SegmentReadInfo info;
+ unsigned char *avio_buffer;
+
+ av_fifo_generic_peek(c->buffer, &seg, sizeof(struct Segment*), NULL);
+ client_set_state(c, BUSY);
+ c->current_segment_id = seg->id;
+ info.buf = seg->buf;
+ info.left = seg->size;
+
+ if (!(fmt_ctx = avformat_alloc_context())) {
+ av_log(NULL, AV_LOG_ERROR, "Could not allocate format context\n");
+ return;
+ }
+
+ avio_buffer = (unsigned char*) av_malloc(AV_BUFSIZE);
+ avio_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 0, &info, &segment_read, NULL, NULL);
+
+ fmt_ctx->pb = avio_ctx;
+ ret = avformat_open_input(&fmt_ctx, NULL, seg->ifmt, NULL);
+ if (ret < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Could not open input\n");
+ av_free(avio_ctx->buffer);
+ return;
+ }
+
+ ret = avformat_find_stream_info(fmt_ctx, NULL);
+ if (ret < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Could not find stream information\n");
+ av_free(avio_ctx->buffer);
+ return;
+ }
+
+ av_log(NULL, AV_LOG_DEBUG, "Client: %d, Segment: %d\n", c->id, seg->id);
+
+ for (;;) {
+ ret = av_read_frame(fmt_ctx, &pkt);
+ if (ret < 0)
+ break;
+
+ pkt.dts = seg->ts[pkt_count];
+ pkt.pts = seg->ts[pkt_count+1];
+ pkt_count += 2;
+
+ ret = av_write_frame(c->ofmt_ctx, &pkt);
+ av_packet_unref(&pkt);
+ if (ret < 0) {
+ av_log(NULL, AV_LOG_DEBUG, "write_frame failed, disconnecting client\n");
+ avformat_close_input(&fmt_ctx);
+ av_free(avio_ctx->buffer);
+ client_disconnect(c);
+ return;
+ }
+ }
+ avformat_close_input(&fmt_ctx);
+ av_free(avio_ctx->buffer);
+ avformat_free_context(fmt_ctx);
+ avio_context_free(&avio_ctx);
+ av_fifo_drain(c->buffer, sizeof(struct Segment*));
+ segment_unref(seg);
+ client_set_state(c, WRITABLE);
+ } else {
+ client_set_state(c, WAIT);
+ }
+}
+
+void *accept_thread(void *arg)
+{
+ struct AcceptInfo *info = (struct AcceptInfo*) arg;
+ const char *out_uri = info->out_uri;
+ char *method, *resource;
+ char status[4096];
+ AVIOContext *client;
+ AVIOContext *server = NULL;
+ AVFormatContext *ofmt_ctx = NULL;
+ AVOutputFormat *ofmt;
+ AVDictionary *opts = NULL;
+ AVDictionary *mkvopts = NULL;
+ AVStream *in_stream, *out_stream;
+ AVCodecContext *codec_ctx;
+ int ret, i, reply_code, handshake;
+
+ if ((ret = av_dict_set(&opts, "listen", "2", 0)) < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Failed to set listen mode for server: %s\n", av_err2str(ret));
+ return NULL;
+ }
+
+ if ((ret = av_dict_set_int(&opts, "listen_timeout", LISTEN_TIMEOUT_MSEC, 0)) < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Failed to set listen_timeout for server: %s\n", av_err2str(ret));
+ return NULL;
+ }
+
+ if ((ret = avio_open2(&server, out_uri, AVIO_FLAG_WRITE, NULL, &opts)) < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Failed to open server: %s\n", av_err2str(ret));
+ return NULL;
+ }
+
+ for (;;) {
+ if (info->pub->shutdown)
+ break;
+ publisher_gen_status_json(info->pub, status);
+ av_log(NULL, AV_LOG_DEBUG, status);
+ reply_code = 200;
+ client = NULL;
+ av_log(NULL, AV_LOG_DEBUG, "Accepting new clients.\n");
+
+ if ((ret = avio_accept(server, &client)) < 0) {
+ av_log(NULL, AV_LOG_DEBUG, "Timeout or error, retrying to accept.\n");
+ continue;
+ }
+
+ client->seekable = 0;
+ if ((ret = av_dict_set(&mkvopts, "live", "1", 0)) < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Failed to set live mode for matroska: %s\n", av_err2str(ret));
+ continue;
+ }
+
+ if (publisher_reserve_client(info->pub)) {
+ av_log(NULL, AV_LOG_WARNING, "No more client slots free, Returning 503.\n");
+ reply_code = 503;
+ }
+
+ while ((handshake = avio_handshake(client)) > 0) {
+ av_opt_get(client, "method", AV_OPT_SEARCH_CHILDREN, &method);
+ av_opt_get(client, "resource", AV_OPT_SEARCH_CHILDREN, &resource);
+ av_log(NULL, AV_LOG_DEBUG, "method: %s resource: %s\n", method, resource);
+ if (method && strlen(method) && strncmp("GET", method, 3)) {
+ reply_code = 400;
+ }
+ free(method);
+ free(resource);
+ }
+
+ if (handshake < 0)
+ reply_code = 400;
+
+ if ((ret = av_opt_set_int(client, "reply_code", reply_code, AV_OPT_SEARCH_CHILDREN)) < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Failed to set reply_code: %s.\n", av_err2str(ret));
+ continue;
+ }
+
+ if (reply_code != 200) {
+ publisher_cancel_reserve(info->pub);
+ avio_close(client);
+ continue;
+ }
+
+ avformat_alloc_output_context2(&ofmt_ctx, NULL, "matroska", NULL);
+ if (!ofmt_ctx) {
+ av_log(NULL, AV_LOG_ERROR, "Could not allocate output format context.\n");
+ continue;
+ }
+
+ ofmt_ctx->flags |= AVFMT_FLAG_GENPTS;
+ ofmt = ofmt_ctx->oformat;
+ ofmt->flags &= AVFMT_NOFILE;
+
+ for (i = 0; i < info->ifmt_ctx->nb_streams; i++) {
+ in_stream = info->ifmt_ctx->streams[i];
+ codec_ctx = avcodec_alloc_context3(NULL);
+ avcodec_parameters_to_context(codec_ctx, in_stream->codecpar);
+ out_stream = avformat_new_stream(ofmt_ctx, codec_ctx->codec);
+ avcodec_free_context(&codec_ctx);
+
+ if (!out_stream) {
+ av_log(NULL, AV_LOG_ERROR, "Could not allocate output stream.\n");
+ continue;
+ }
+
+ ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar);
+ if (ret < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Failed to copy context from input to output stream codec context: %s.\n", av_err2str(ret));
+ continue;
+ }
+ av_dict_copy(&out_stream->metadata, in_stream->metadata, 0);
+ }
+
+ ofmt_ctx->pb = client;
+ ret = avformat_write_header(ofmt_ctx, &mkvopts);
+ if (ret < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Could not write header to client: %s.\n", av_err2str(ret));
+ continue;
+ }
+ publisher_add_client(info->pub, ofmt_ctx);
+ ofmt_ctx = NULL;
+
+ }
+ av_log(NULL, AV_LOG_INFO, "Shutting down http server.\n");
+ ret = avio_close(server);
+ av_log(NULL, AV_LOG_INFO, "Shut down http server: %d\n", ret);
+ return NULL;
+}
+
+void *write_thread(void *arg)
+{
+ struct WriteInfo *info = (struct WriteInfo*) arg;
+ int i, nb_free;
+ struct Client *c;
+ for(;;) {
+ nb_free = 0;
+ usleep(500000);
+ av_log(NULL, AV_LOG_DEBUG, "Checking clients, thread: %d\n", info->thread_id);
+ for (i = 0; i < MAX_CLIENTS; i++) {
+ c = &info->pub->clients[i];
+ switch(c->state) {
+ case WRITABLE:
+ write_segment(c);
+ if (info->pub->shutdown && info->pub->current_segment_id == c->current_segment_id) {
+ client_disconnect(c);
+ }
+ continue;
+ case FREE:
+ nb_free++;
+ default:
+ continue;
+ }
+ }
+ if (info->pub->shutdown && nb_free == MAX_CLIENTS)
+ break;
+ }
+
+ return NULL;
+}
+
+
+int main(int argc, char *argv[])
+{
+ struct ReadInfo rinfo;
+ struct AcceptInfo ainfo;
+ struct WriteInfo *winfos;
+ struct PublisherContext *pub;
+ int ret, i;
+ pthread_t r_thread, a_thread;
+ pthread_t *w_threads;
+
+ AVFormatContext *ifmt_ctx = NULL;
+
+ rinfo.in_filename = "pipe:0";
+ ainfo.out_uri = "http://0:8080";
+ if (argc > 1)
+ rinfo.in_filename = argv[1];
+
+ //av_register_all();
+ avformat_network_init();
+
+ av_log_set_level(AV_LOG_DEBUG);
+
+ if ((ret = avformat_open_input(&ifmt_ctx, rinfo.in_filename, NULL, NULL))) {
+ av_log(NULL, AV_LOG_ERROR, "main: Could not open input\n");
+ return 1;
+ }
+
+ publisher_init(&pub);
+
+ rinfo.ifmt_ctx = ifmt_ctx;
+ rinfo.pub = pub;
+ ainfo.ifmt_ctx = ifmt_ctx;
+ ainfo.pub = pub;
+
+ w_threads = (pthread_t*) malloc(sizeof(pthread_t) * pub->nb_threads);
+ winfos = (struct WriteInfo*) malloc(sizeof(struct WriteInfo) * pub->nb_threads);
+
+ for (i = 0; i < pub->nb_threads; i++) {
+ winfos[i].pub = pub;
+ winfos[i].thread_id = i;
+ pthread_create(&w_threads[i], NULL, write_thread, &winfos[i]);
+ }
+
+ pthread_create(&r_thread, NULL, read_thread, &rinfo);
+
+ accept_thread(&ainfo);
+
+ pthread_join(r_thread, NULL);
+
+ for (i = 0; i < pub->nb_threads; i++) {
+ pthread_join(w_threads[i], NULL);
+ }
+ free(w_threads);
+ free(winfos);
+
+ publisher_free(pub);
+ free(pub);
+ return 0;
+}
--
2.16.2
More information about the ffmpeg-devel
mailing list