[FFmpeg-devel] [PATCH 1/1] libavformat/rtmp: Implements RTMP reconnect feature

Jordi Cenzano jordi.cenzano at gmail.com
Mon Dec 6 03:30:30 EET 2021


>
> > Nowadays when you are streaming to a live platform if the RTMP(s)
> > server needs to restarted for any reason (ex: deploy new version)
> > the RTMP connection is interrupted (probably after some draining time).
> > Facebook will publish a proposal to avoid that by sending a
> > GoAway message in the RTMP protocol.
> > This code is the reference client implementation of that proposal.
> > AFAIK other big live platforms showed their interest in implementing
> > this mechanism.
> > This can be already tested against Facebook live production using
> > the querystring parameter ?ccr_sec=120 (that indicates the backend
> > to send a disconnect signal after those seconds)
>
> It seems like this approach is operating from the assumption that the
> time to setup a new connection and process all RPCs necessary to send
> media is on the order of normal jitter. Or am I misunderstanding?
>

You are right!

>
> For many services I don't really think that's the case.
>

Fair enough

>
> And even with a very fast publish response we are looking at like 1.5
> RTTs for TCP, another 1.5 for TLS, another 1.5 for RTMP handshake,
> another 1 RTT for RTMP connect, an RTT on createStream, and an RTT on
> publish. That's like 7.5 RTTs (or 300 ms at 40ms RTT) where we are
> leaving the media flow on pause while we are re-building the connection.
>

Yes, and we are aware that is NOT ideal. But we thought it would be better
to add ~400ms of latency (that in most cases the disruption will be hidden
by the player buffer) rather than just drop frames until the next IDR, that
will produce (usually) a much longer disruption since GOPs are usually 2s+
long

>
> This also seems to conflate rebootstrapping a media decode session vs
> re-bootstrapping an RMTP session. The cost of doing this seems to be
> sending your biggest frame after a pause to resolve a bunch of
> synchronous RPCs on a relatively fresh TCP connection.
>

YES, and as I mention we know that is NOT ideal at all.

BTW here we are talking about LIVE streams.

>
> > ---
> >  libavformat/rtmppkt.c   |  19 +++
> >  libavformat/rtmppkt.h   |  10 ++
> >  libavformat/rtmpproto.c | 356 +++++++++++++++++++++++++++++++++++++---
> >  3 files changed, 359 insertions(+), 26 deletions(-)
> >
> >[...]
> > diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h
> > index a15d2a5773..cdb901df89 100644
> > --- a/libavformat/rtmppkt.h
> > +++ b/libavformat/rtmppkt.h
> > @@ -59,6 +59,7 @@ typedef enum RTMPPacketType {
> >      RTMP_PT_SHARED_OBJ,         ///< shared object
> >      RTMP_PT_INVOKE,             ///< invoke some stream action
> >      RTMP_PT_METADATA     = 22,  ///< FLV metadata
> > +    RTMP_PT_GO_AWAY      = 32,  ///< Indicates please reconnect ASAP,
> server is about to go down
> >  } RTMPPacketType;
> >
>
> I'm curious as to why this is a new top level message rather than just
> another
> type 20 command message. Message types have a small address space while
> commands have a large address space and a well chosen command name is
> unlikely
> to conflict with (and therefore can be used in concert with) any other
> protocol
> extensions.
>

We thought about it, and it seemed more logical to add it as a packet type
since it was affecting the RTMP connections itself. If you read the
definition of Command message, that seems more oriented to send data /
actions over the underlying connection.

Thanks a lot for your comments Alex, happy to hear ideas to improve this
proposal. And sorry for my late reply

>
> > [snip]
>
>
> On Sun, Sep 26, 2021 at 1:52 PM Jordi Cenzano <jordi.cenzano at gmail.com>
> wrote:
> >
> > Nowadays when you are streaming to a live platform if the RTMP(s)
> > server needs to restarted for any reason (ex: deploy new version)
> > the RTMP connection is interrupted (probably after some draining time).
> > Facebook will publish a proposal to avoid that by sending a
> > GoAway message in the RTMP protocol.
> > This code is the reference client implementation of that proposal.
> > AFAIK other big live platforms showed their interest in implementing
> > this mechanism.
> > This can be already tested against Facebook live production using
> > the querystring parameter ?ccr_sec=120 (that indicates the backend
> > to send a disconnect signal after those seconds)
> > ---
> >  libavformat/rtmppkt.c   |  19 +++
> >  libavformat/rtmppkt.h   |  10 ++
> >  libavformat/rtmpproto.c | 356 +++++++++++++++++++++++++++++++++++++---
> >  3 files changed, 359 insertions(+), 26 deletions(-)
> >
> > diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c
> > index 4b97c0833f..84ec72740d 100644
> > --- a/libavformat/rtmppkt.c
> > +++ b/libavformat/rtmppkt.c
> > @@ -405,6 +405,25 @@ int ff_rtmp_packet_write(URLContext *h, RTMPPacket
> *pkt,
> >      return written;
> >  }
> >
> > +int ff_rtmp_packet_clone(RTMPPacket *pkt_dst, const RTMPPacket *pkt_src)
> > +{
> > +    if (pkt_src->size) {
> > +        pkt_dst->data = av_realloc(NULL, pkt_src->size);
> > +        if (!pkt_dst->data)
> > +            return AVERROR(ENOMEM);
> > +        else
> > +            memcpy(pkt_dst->data, pkt_src->data, pkt_src->size);
> > +    }
> > +    pkt_dst->size       = pkt_src->size;
> > +    pkt_dst->channel_id = pkt_src->channel_id;
> > +    pkt_dst->type       = pkt_src->type;
> > +    pkt_dst->timestamp  = pkt_src->timestamp;
> > +    pkt_dst->extra      = pkt_src->extra;
> > +    pkt_dst->ts_field   = pkt_src->ts_field;
> > +
> > +    return 0;
> > +}
> > +
> >  int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id,
> RTMPPacketType type,
> >                            int timestamp, int size)
> >  {
> > diff --git a/libavformat/rtmppkt.h b/libavformat/rtmppkt.h
> > index a15d2a5773..cdb901df89 100644
> > --- a/libavformat/rtmppkt.h
> > +++ b/libavformat/rtmppkt.h
> > @@ -59,6 +59,7 @@ typedef enum RTMPPacketType {
> >      RTMP_PT_SHARED_OBJ,         ///< shared object
> >      RTMP_PT_INVOKE,             ///< invoke some stream action
> >      RTMP_PT_METADATA     = 22,  ///< FLV metadata
> > +    RTMP_PT_GO_AWAY      = 32,  ///< Indicates please reconnect ASAP,
> server is about to go down
> >  } RTMPPacketType;
> >
> >  /**
> > @@ -99,6 +100,15 @@ typedef struct RTMPPacket {
> >  int ff_rtmp_packet_create(RTMPPacket *pkt, int channel_id,
> RTMPPacketType type,
> >                            int timestamp, int size);
> >
> > +/**
> > + * Clone RTMP packet
> > + *
> > + * @param pkt_dst packet destination
> > + * @param pkt_src packet source
> > + * @return zero on success, negative value otherwise
> > + */
> > +int ff_rtmp_packet_clone(RTMPPacket *pkt_dst, const RTMPPacket
> *pkt_src);
> > +
> >  /**
> >   * Free RTMP packet.
> >   *
> > diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
> > index b14d23b919..ea37b9880a 100644
> > --- a/libavformat/rtmpproto.c
> > +++ b/libavformat/rtmpproto.c
> > @@ -124,11 +124,21 @@ typedef struct RTMPContext {
> >      int           nb_streamid;                ///< The next stream id
> to return on createStream calls
> >      double        duration;                   ///< Duration of the
> stream in seconds as returned by the server (only valid if non-zero)
> >      int           tcp_nodelay;                ///< Use TCP_NODELAY to
> disable Nagle's algorithm if set to 1
> > +    int           reconnect_interval;         ///< Forces a reconnected
> every Xs (in media time)
> >      char          username[50];
> >      char          password[50];
> >      char          auth_params[500];
> >      int           do_reconnect;
> > +    uint32_t      last_reconnect_timestamp;
> >      int           auth_tried;
> > +    int           force_reconnection_now;
> > +    int           go_away_received;
> > +    AVDictionary* original_opts;
> > +    char          original_uri[TCURL_MAX_LENGTH];
> > +    int           original_flags;
> > +    RTMPPacket    last_avc_seq_header_pkt;    ///< rtmp packet, used to
> save last AVC video header, used on reconnection
> > +    RTMPPacket    last_aac_seq_header_pkt;    ///< rtmp packet, used to
> save last AAC audio header, used on reconnection
> > +    RTMPPacket    last_metadata_pkt;        ///< rtmp packet, used to
> save last onMetadata info, used on reconnection
> >  } RTMPContext;
> >
> >  #define PLAYER_KEY_OPEN_PART_LEN 30   ///< length of partial key used
> for first client digest signing
> > @@ -224,7 +234,7 @@ static void free_tracked_methods(RTMPContext *rt)
> >      rt->nb_tracked_methods   = 0;
> >  }
> >
> > -static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
> > +static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int
> track, int destroy)
> >  {
> >      int ret;
> >
> > @@ -248,7 +258,9 @@ static int rtmp_send_packet(RTMPContext *rt,
> RTMPPacket *pkt, int track)
> >      ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size,
> >                                 &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
> >  fail:
> > -    ff_rtmp_packet_destroy(pkt);
> > +    if (destroy)
> > +        ff_rtmp_packet_destroy(pkt);
> > +
> >      return ret;
> >  }
> >
> > @@ -336,6 +348,9 @@ static int gen_connect(URLContext *s, RTMPContext
> *rt)
> >      if (!rt->is_input) {
> >          ff_amf_write_field_name(&p, "type");
> >          ff_amf_write_string(&p, "nonprivate");
> > +        // Indicates accepts goaway
> > +        ff_amf_write_field_name(&p, "supportsGoAway");
> > +        ff_amf_write_bool(&p, 1);
> >      }
> >      ff_amf_write_field_name(&p, "flashVer");
> >      ff_amf_write_string(&p, rt->flashver);
> > @@ -400,7 +415,7 @@ static int gen_connect(URLContext *s, RTMPContext
> *rt)
> >
> >      pkt.size = p - pkt.data;
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >
> > @@ -611,7 +626,7 @@ static int gen_release_stream(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_null(&p);
> >      ff_amf_write_string(&p, rt->playpath);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -635,7 +650,7 @@ static int gen_fcpublish_stream(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_null(&p);
> >      ff_amf_write_string(&p, rt->playpath);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -659,7 +674,7 @@ static int gen_fcunpublish_stream(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_null(&p);
> >      ff_amf_write_string(&p, rt->playpath);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -683,7 +698,7 @@ static int gen_create_stream(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_number(&p, ++rt->nb_invokes);
> >      ff_amf_write_null(&p);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >
> > @@ -709,7 +724,7 @@ static int gen_delete_stream(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_null(&p);
> >      ff_amf_write_number(&p, rt->stream_id);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -733,7 +748,7 @@ static int gen_get_stream_length(URLContext *s,
> RTMPContext *rt)
> >      ff_amf_write_null(&p);
> >      ff_amf_write_string(&p, rt->playpath);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -754,7 +769,7 @@ static int gen_buffer_time(URLContext *s,
> RTMPContext *rt)
> >      bytestream_put_be32(&p, rt->stream_id);
> >      bytestream_put_be32(&p, rt->client_buffer_time);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -782,7 +797,7 @@ static int gen_play(URLContext *s, RTMPContext *rt)
> >      ff_amf_write_string(&p, rt->playpath);
> >      ff_amf_write_number(&p, rt->live * 1000);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  static int gen_seek(URLContext *s, RTMPContext *rt, int64_t timestamp)
> > @@ -805,7 +820,7 @@ static int gen_seek(URLContext *s, RTMPContext *rt,
> int64_t timestamp)
> >      ff_amf_write_null(&p); //as usual, the first null param
> >      ff_amf_write_number(&p, timestamp); //where we want to jump
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -832,7 +847,7 @@ static int gen_pause(URLContext *s, RTMPContext *rt,
> int pause, uint32_t timesta
> >      ff_amf_write_bool(&p, pause); // pause or unpause
> >      ff_amf_write_number(&p, timestamp); //where we pause the stream
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -859,7 +874,7 @@ static int gen_publish(URLContext *s, RTMPContext
> *rt)
> >      ff_amf_write_string(&p, rt->playpath);
> >      ff_amf_write_string(&p, "live");
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -885,7 +900,7 @@ static int gen_pong(URLContext *s, RTMPContext *rt,
> RTMPPacket *ppkt)
> >      bytestream_put_be16(&p, 7); // PingResponse
> >      bytestream_put_be32(&p, AV_RB32(ppkt->data+2));
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -906,7 +921,7 @@ static int gen_swf_verification(URLContext *s,
> RTMPContext *rt)
> >      bytestream_put_be16(&p, 27);
> >      memcpy(p, rt->swfverification, 42);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -925,7 +940,7 @@ static int gen_window_ack_size(URLContext *s,
> RTMPContext *rt)
> >      p = pkt.data;
> >      bytestream_put_be32(&p, rt->max_sent_unacked);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  /**
> > @@ -946,7 +961,7 @@ static int gen_check_bw(URLContext *s, RTMPContext
> *rt)
> >      ff_amf_write_number(&p, ++rt->nb_invokes);
> >      ff_amf_write_null(&p);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -965,7 +980,7 @@ static int gen_bytes_read(URLContext *s, RTMPContext
> *rt, uint32_t ts)
> >      p = pkt.data;
> >      bytestream_put_be32(&p, rt->bytes_read);
> >
> > -    return rtmp_send_packet(rt, &pkt, 0);
> > +    return rtmp_send_packet(rt, &pkt, 0, 1);
> >  }
> >
> >  static int gen_fcsubscribe_stream(URLContext *s, RTMPContext *rt,
> > @@ -985,7 +1000,7 @@ static int gen_fcsubscribe_stream(URLContext *s,
> RTMPContext *rt,
> >      ff_amf_write_null(&p);
> >      ff_amf_write_string(&p, subscribe);
> >
> > -    return rtmp_send_packet(rt, &pkt, 1);
> > +    return rtmp_send_packet(rt, &pkt, 1, 1);
> >  }
> >
> >  /**
> > @@ -2153,6 +2168,16 @@ static int handle_invoke_status(URLContext *s,
> RTMPPacket *pkt)
> >      return 0;
> >  }
> >
> > +static int handle_go_away(URLContext *s, RTMPPacket *pkt) {
> > +    RTMPContext *rt = s->priv_data;
> > +
> > +    av_log(s, AV_LOG_TRACE, "go away signal received");
> > +
> > +    rt->go_away_received = 1;
> > +
> > +    return 0;
> > +}
> > +
> >  static int handle_invoke(URLContext *s, RTMPPacket *pkt)
> >  {
> >      RTMPContext *rt = s->priv_data;
> > @@ -2331,6 +2356,10 @@ static int rtmp_parse_result(URLContext *s,
> RTMPContext *rt, RTMPPacket *pkt)
> >          if ((ret = handle_invoke(s, pkt)) < 0)
> >              return ret;
> >          break;
> > +    case RTMP_PT_GO_AWAY:
> > +        if ((ret = handle_go_away(s, pkt)) < 0)
> > +            return ret;
> > +        break;
> >      case RTMP_PT_VIDEO:
> >      case RTMP_PT_AUDIO:
> >      case RTMP_PT_METADATA:
> > @@ -2513,6 +2542,15 @@ static int rtmp_close(URLContext *h)
> >      free_tracked_methods(rt);
> >      av_freep(&rt->flv_data);
> >      ffurl_closep(&rt->stream);
> > +    if (rt->last_avc_seq_header_pkt.size)
> > +        ff_rtmp_packet_destroy(&rt->last_avc_seq_header_pkt);
> > +
> > +    if (rt->last_aac_seq_header_pkt.size)
> > +        ff_rtmp_packet_destroy(&rt->last_aac_seq_header_pkt);
> > +
> > +    if (rt->last_metadata_pkt.size)
> > +        ff_rtmp_packet_destroy(&rt->last_metadata_pkt);
> > +
> >      return ret;
> >  }
> >
> > @@ -2871,14 +2909,23 @@ reconnect:
> >                  goto fail;
> >          }
> >      } else {
> > -        rt->flv_size = 0;
> > -        rt->flv_data = NULL;
> > -        rt->flv_off  = 0;
> > -        rt->skip_bytes = 13;
> > +        // Do not clean buffers if it is a forced reconnection
> > +        if (rt->force_reconnection_now <= 0) {
> > +            rt->flv_size = 0;
> > +            rt->flv_data = NULL;
> > +            rt->flv_off  = 0;
> > +            rt->skip_bytes = 13;
> > +        }
> >      }
> >
> >      s->max_packet_size = rt->stream->max_packet_size;
> >      s->is_streamed     = 1;
> > +
> > +    // Copy original params
> > +    av_dict_copy(&rt->original_opts, *opts, 0);
> > +    rt->original_flags = flags;
> > +    av_strlcpy(rt->original_uri, uri, TCURL_MAX_LENGTH);
> > +
> >      return 0;
> >
> >  fail:
> > @@ -2951,6 +2998,107 @@ static int rtmp_pause(URLContext *s, int pause)
> >      return 0;
> >  }
> >
> > +/**
> > + * Reconnect RTMP connection.
> > +*/
> > +static int rtmp_reconnect(URLContext *s) {
> > +    RTMPContext *rt = s->priv_data;
> > +    int i;
> > +
> > +    // Close current RTMP connection
> > +    av_log(s, AV_LOG_INFO, "reconnecting!\n");
> > +
> > +    ffurl_closep(&rt->stream);
> > +    rt->do_reconnect = 0;
> > +    rt->nb_invokes   = 0;
> > +    for (i = 0; i < 2; i++)
> > +        memset(rt->prev_pkt[i], 0, sizeof(**rt->prev_pkt) *
> rt->nb_prev_pkt[i]);
> > +
> > +    free_tracked_methods(rt);
> > +
> > +    // Connect RTMP again using orignal values
> > +    return rtmp_open(s, rt->original_uri, rt->original_flags,
> &rt->original_opts);
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains an AAC header
> > +*/
> > +static int rtmp_packet_is_aac_audio_header(RTMPPacket *pkt) {
> > +    uint8_t sound_format;
> > +    uint8_t aac_packet_type;
> > +
> > +    if ((!pkt) || (pkt->size < 2) || pkt->type != RTMP_PT_AUDIO)
> > +        return 0;
> > +
> > +    sound_format = (pkt->data[0] & 0xF0) >> 4;
> > +    aac_packet_type = pkt->data[1];
> > +    // Check codec == AVC and avc contains seq header
> > +    if (sound_format == 10 && aac_packet_type == 0)
> > +        return 1;
> > +
> > +    return 0;
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains an AVC header
> > +*/
> > +static int rtmp_packet_is_avc_video_header(RTMPPacket *pkt) {
> > +    uint8_t codec_id;
> > +    uint8_t avc_packet_type;
> > +
> > +    if ((!pkt) || (pkt->size < 2) || pkt->type != RTMP_PT_VIDEO)
> > +        return 0;
> > +
> > +    codec_id = pkt->data[0] & 0xF;
> > +    avc_packet_type = pkt->data[1];
> > +    // Check codec == AVC and avc contains seq header
> > +    if (codec_id == 7 && avc_packet_type == 0)
> > +        return 1;
> > +
> > +    return 0;
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains video IDR point
> > +*/
> > +static int rtmp_packet_is_video_avc_IDR(RTMPPacket *pkt) {
> > +    uint8_t frame_type;
> > +    uint8_t codec_id;
> > +
> > +    if ((!pkt) || (pkt->size < 1) || pkt->type != RTMP_PT_VIDEO)
> > +        return 0;
> > +
> > +    frame_type = (pkt->data[0] & 0xF0) >> 4;
> > +    codec_id = pkt->data[0] & 0xF;
> > +    // Check codec == AVC and videoFrame == Keyframe / seekable
> (assuming that means IDR)
> > +    if (codec_id == 7 && frame_type == 1)
> > +        return 1;
> > +
> > +    return 0;
> > +}
> > +
> > +/**
> > + * Checks RTMP packet and return 1 when it contains onMetadata info
> > +*/
> > +static int rtmp_packet_is_onMetadata_packet(RTMPPacket *pkt) {
> > +    uint8_t commandbuffer[64];
> > +    int stringlen;
> > +    GetByteContext gbc;
> > +
> > +    if ((!pkt) || (pkt->size < 10) || pkt->type != RTMP_PT_NOTIFY)
> > +        return 0;
> > +
> > +    bytestream2_init(&gbc, pkt->data, pkt->size);
> > +    if (ff_amf_read_string(&gbc, commandbuffer,
> sizeof(commandbuffer),&stringlen))
> > +        return 0;
> > +
> > +    // onMetadata is prepended by "@setDataFrame"
> > +    if (!strcmp(commandbuffer, "@setDataFrame"))
> > +        return 1;
> > +
> > +    return 0;
> > +}
> > +
> >  static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
> >  {
> >      RTMPContext *rt = s->priv_data;
> > @@ -2960,6 +3108,8 @@ static int rtmp_write(URLContext *s, const uint8_t
> *buf, int size)
> >      const uint8_t *buf_temp = buf;
> >      uint8_t c;
> >      int ret;
> > +    int execute_reconnection = 0;
> > +    int is_idr = 0;
> >
> >      do {
> >          if (rt->skip_bytes) {
> > @@ -2988,8 +3138,13 @@ static int rtmp_write(URLContext *s, const
> uint8_t *buf, int size)
> >              bytestream_get_be24(&header);
> >              rt->flv_size = pktsize;
> >
> > -            if (pkttype == RTMP_PT_VIDEO)
> > +            if (pkttype == RTMP_PT_VIDEO) {
> >                  channel = RTMP_VIDEO_CHANNEL;
> > +                rt->has_video = 1;
> > +            }
> > +            if (pkttype == RTMP_PT_AUDIO) {
> > +                rt->has_audio = 1;
> > +            }
> >
> >              if (((pkttype == RTMP_PT_VIDEO || pkttype == RTMP_PT_AUDIO)
> && ts == 0) ||
> >                  pkttype == RTMP_PT_NOTIFY) {
> > @@ -3047,7 +3202,155 @@ static int rtmp_write(URLContext *s, const
> uint8_t *buf, int size)
> >                  }
> >              }
> >
> > -            if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0)) < 0)
> > +            // Check if a reconnection is required
> > +            // Per interval
> > +            if ((rt->reconnect_interval > 0) &&
> > +                (rt->out_pkt.timestamp >= (rt->last_reconnect_timestamp
> + rt->reconnect_interval * 1000))) {
> > +                rt->last_reconnect_timestamp = rt->out_pkt.timestamp;
> > +                rt->force_reconnection_now = 1;
> > +                av_log(s, AV_LOG_TRACE,
> > +                       "trigered internal interval reconnection\n");
> > +            }
> > +            // Per go away signal
> > +            if (rt->go_away_received > 0) {
> > +                rt->go_away_received = 0;
> > +                rt->force_reconnection_now = 1;
> > +                av_log(s, AV_LOG_TRACE,
> > +                       "detected go away signal from the peer\n");
> > +            }
> > +
> > +            if (rtmp_packet_is_avc_video_header(&rt->out_pkt)) {
> > +                // Save last video header
> > +                if (rt->last_avc_seq_header_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "freeing last video header packet saved\n");
> > +
> ff_rtmp_packet_destroy(&rt->last_avc_seq_header_pkt);
> > +                }
> > +                // Save AVC seq header packet
> > +                if ((ret =
> ff_rtmp_packet_clone(&rt->last_avc_seq_header_pkt, &rt->out_pkt)) < 0) {
> > +                    return ret;
> > +                }
> > +                av_log(s, AV_LOG_DEBUG, "saved video header packet\n");
> > +            } else if (rtmp_packet_is_aac_audio_header(&rt->out_pkt)) {
> > +                // Save last audio header
> > +                if (rt->last_aac_seq_header_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "freeing last audio header packet saved\n");
> > +
> ff_rtmp_packet_destroy(&rt->last_aac_seq_header_pkt);
> > +                }
> > +                // Save AAC seq header packet
> > +                if ((ret =
> ff_rtmp_packet_clone(&rt->last_aac_seq_header_pkt, &rt->out_pkt)) < 0) {
> > +                    return ret;
> > +                }
> > +                av_log(s, AV_LOG_DEBUG, "saved audio header packet\n");
> > +            } else if (rtmp_packet_is_onMetadata_packet(&rt->out_pkt)) {
> > +                // Save last onMetadata packet
> > +                if (rt->last_metadata_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "freeing last onMetadata packet saved\n");
> > +                    ff_rtmp_packet_destroy(&rt->last_metadata_pkt);
> > +                }
> > +                // Save onMetadata packet
> > +                if ((ret = ff_rtmp_packet_clone(&rt->last_metadata_pkt,
> &rt->out_pkt)) < 0) {
> > +                    return ret;
> > +                }
> > +                av_log(s, AV_LOG_DEBUG, "saved onMetadata packet\n");
> > +            }
> > +
> > +            // Reconnection has been requested
> > +            if (rt->force_reconnection_now >= 1) {
> > +                // Check if packet is video IDR
> > +                is_idr = rtmp_packet_is_video_avc_IDR(&rt->out_pkt);
> > +                av_log(s, AV_LOG_DEBUG,
> > +                       "looking for the right disconnect point. Is IDR:
> %d, "
> > +                       "has_video: %d, has_audio: %d, state: %d, "
> > +                       "last_avc_seq_header_pkt.size: %d, "
> > +                       "last_aac_seq_header_pkt.size: %d\n",
> > +                       is_idr, rt->has_video, rt->has_audio, rt->state,
> > +                       rt->last_avc_seq_header_pkt.size,
> > +                       rt->last_aac_seq_header_pkt.size);
> > +
> > +                if (rt->has_video && rt->has_audio &&
> > +                    (rt->state == STATE_PUBLISHING)) {
> > +                    // If we only video let's do the reconnection in an
> IDR
> > +                    // frame when we have both headers saved
> > +                    if (is_idr && rt->last_avc_seq_header_pkt.size &&
> > +                        rt->last_aac_seq_header_pkt.size)
> > +                        execute_reconnection = 1;
> > +                } else if (rt->has_video && !rt->has_audio &&
> > +                           (rt->state == STATE_PUBLISHING)) {
> > +                    // If we have video and NO audio let's do the
> reconnection
> > +                    // in an IDR frame when we have video header saved
> > +                    if (is_idr && rt->last_avc_seq_header_pkt.size)
> > +                        execute_reconnection = 1;
> > +                } else if (!rt->has_video &&
> > +                           rt->has_audio & (rt->state ==
> STATE_PUBLISHING)) {
> > +                    // If we have only audio let's do the reconnection
> when we
> > +                    // have the audio header saved
> > +                    if (rt->last_aac_seq_header_pkt.size)
> > +                        execute_reconnection = 1;
> > +                } else {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "reconnection is requested but can NOT be
> executed "
> > +                           "now, waiting! rt->state: %d, has_video: %d,
> "
> > +                           "has_audio: %d, is_idr: %d\n",
> > +                           rt->state, rt->has_video, rt->has_audio,
> is_idr);
> > +                }
> > +            }
> > +
> > +            if (execute_reconnection) {
> > +                execute_reconnection = 0;
> > +
> > +                av_log(s, AV_LOG_DEBUG,
> > +                       "executing reconnection. rt->flv_off: %d,
> rt->flv_size: "
> > +                       "%d\n",
> > +                       rt->flv_off, rt->flv_size);
> > +
> > +                if ((ret = rtmp_reconnect(s)) < 0)
> > +                    return ret;
> > +
> > +                // Reconnect executed, clear the flag
> > +                rt->force_reconnection_now = 0;
> > +
> > +                av_log(s, AV_LOG_DEBUG,
> > +                       "reconnected. rt->flv_off: %d, rt->flv_size:
> %d\n",
> > +                       rt->flv_off, rt->flv_size);
> > +
> > +                // Send last video header if it is saved
> > +                if (rt->last_avc_seq_header_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "sending last saved video header\n");
> > +                    rt->last_avc_seq_header_pkt.timestamp =
> > +                        rt->out_pkt.timestamp;
> > +                    if ((ret = rtmp_send_packet(
> > +                             rt, &rt->last_avc_seq_header_pkt, 0, 0)) <
> 0)
> > +                        return ret;
> > +                }
> > +
> > +                // Send last audio header if it is saved
> > +                if (rt->last_aac_seq_header_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "sending last saved audio header\n");
> > +                    rt->last_aac_seq_header_pkt.timestamp =
> > +                        rt->out_pkt.timestamp;
> > +                    if ((ret = rtmp_send_packet(
> > +                             rt, &rt->last_aac_seq_header_pkt, 0, 0)) <
> 0)
> > +                        return ret;
> > +                }
> > +
> > +                // Send last onMetadata packet, optional
> > +                if (rt->last_metadata_pkt.size) {
> > +                    av_log(s, AV_LOG_DEBUG,
> > +                           "sending last saved onMetadata header\n");
> > +                    rt->last_metadata_pkt.timestamp =
> rt->out_pkt.timestamp;
> > +                    if ((ret = rtmp_send_packet(rt,
> &rt->last_metadata_pkt, 0,
> > +                                                0)) < 0)
> > +                        return ret;
> > +                }
> > +            }
> > +
> > +            // Send actual packet
> > +            if ((ret = rtmp_send_packet(rt, &rt->out_pkt, 0, 1)) < 0)
> >                  return ret;
> >              rt->flv_size = 0;
> >              rt->flv_off = 0;
> > @@ -3118,6 +3421,7 @@ static const AVOption rtmp_options[] = {
> >      {"listen",      "Listen for incoming rtmp connections",
> OFFSET(listen), AV_OPT_TYPE_INT, {.i64 = 0}, INT_MIN, INT_MAX, DEC,
> "rtmp_listen" },
> >      {"tcp_nodelay", "Use TCP_NODELAY to disable Nagle's algorithm",
> OFFSET(tcp_nodelay), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, DEC|ENC},
> >      {"timeout", "Maximum timeout (in seconds) to wait for incoming
> connections. -1 is infinite. Implies -rtmp_listen 1",
> OFFSET(listen_timeout), AV_OPT_TYPE_INT, {.i64 = -1}, INT_MIN, INT_MAX,
> DEC, "rtmp_listen" },
> > +    {"rtmp_reconnect_time", "Interval (in seconds) to force a client
> reconnection, it is based on media time. By default is 0 (no
> reconnection)", OFFSET(reconnect_interval), AV_OPT_TYPE_INT, {.i64 = 0}, 0,
> INT_MAX, ENC },
> >      { NULL },
> >  };
> >
> > --
> > 2.32.0
> >
> > _______________________________________________
> > ffmpeg-devel mailing list
> > ffmpeg-devel at ffmpeg.org
> > https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
> >
> > To unsubscribe, visit link above, or email
> > ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
>


More information about the ffmpeg-devel mailing list