[FFmpeg-devel] [PATCH v2 02/03] libavdevice/avfoundation.m: Replace mutex-based concurrency handling in avfoundation.m by a thread-safe fifo queue with maximum length.

Marvin Scholz epirat07 at gmail.com
Mon Dec 13 20:56:47 EET 2021



On 13 Dec 2021, at 17:39, Romain Beauxis wrote:

> This is the second patch of a series of 3 that cleanup and enhance the
> avfoundation implementation for libavdevice.
>
> This patch fixes the concurrency model. Avfoundation runs its own 
> producing thread
> to send produced frames and ffmpeg runs its own thread to consume 
> them.
>
> The existing implementation stores the last transmitted frame and uses 
> a mutex
> to avoid concurrent access. However, this leads to situations where 
> upcoming frames
> can be dropped if the ffmpeg thread is acessing the latest frame. This 
> happens
> even when the thread would otherwise catch up and process frames fast 
> enought.
>
> This patches changes this implementation to use a buffer queue with a 
> max queue length
> and encapsulated thread-safety. This greatly simplifies the logic of 
> the calling code
> and gives the consuming thread a chance to process all frames 
> concurrently to the producing
> thread while avoiding memory leaks.

Couldn't this just use CMSimpleQueue 
https://developer.apple.com/documentation/coremedia/cmsimplequeue?language=objc
or CMBufferQueue?

The implementation of the queue in this patch does not seem right, see 
review below.

>
> Signed-off-by: Romain Beauxis <toots at rastageeks.org>
> ---
> libavdevice/avfoundation.m | 220 +++++++++++++++++++++----------------
> 1 file changed, 127 insertions(+), 93 deletions(-)
>
> diff --git a/libavdevice/avfoundation.m b/libavdevice/avfoundation.m
> index 79c9207cfa..95414fd16a 100644
> --- a/libavdevice/avfoundation.m
> +++ b/libavdevice/avfoundation.m
> @@ -26,7 +26,6 @@
>  */
>
> #import <AVFoundation/AVFoundation.h>
> -#include <pthread.h>
>
> #include "libavutil/channel_layout.h"
> #include "libavutil/pixdesc.h"
> @@ -80,13 +79,97 @@
>     { AV_PIX_FMT_NONE, 0 }
> };
>
> +#define MAX_QUEUED_OBJECTS 10
> +
> + at interface AvdeviceAvfoundationBuffer : NSObject
> ++ (AvdeviceAvfoundationBuffer *) 
> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer;
> +- (CMSampleBufferRef) getCMSampleBuffer;
> + at end
> +
> + at implementation AvdeviceAvfoundationBuffer {
> +    CMSampleBufferRef sampleBuffer;
> +}
> +
> ++ (AvdeviceAvfoundationBuffer *) 
> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer {
> +    return [[AvdeviceAvfoundationBuffer alloc] init:sampleBuffer];
> +}
> +
> +- (id) init:(CMSampleBufferRef)buffer {
> +    sampleBuffer = buffer;
> +    return self;
> +}
> +
> +- (CMSampleBufferRef) getCMSampleBuffer {
> +    return sampleBuffer;
> +}
> + at end
> +
> + at interface AvdeviceAvfoundationBufferQueue : NSObject
> +- (CMSampleBufferRef) dequeue;
> +- (NSUInteger) count;
> +- (void) enqueue:(CMSampleBufferRef)obj;
> + at end
> +
> + at implementation AvdeviceAvfoundationBufferQueue {
> +    NSLock *mutex;
> +    NSMutableArray *queue;
> +}
> +
> +- (id) init {
> +    mutex = [[[NSLock alloc] init] retain];
> +    queue = [[[NSMutableArray alloc] init] retain];
> +    return self;
> +}
> +
> +- (oneway void) release {
> +    NSEnumerator *enumerator = [queue objectEnumerator];
> +    AvdeviceAvfoundationBuffer *buffer;
> +
> +    while (buffer = [enumerator nextObject]) {
> +        CFRelease([buffer getCMSampleBuffer]);
> +    }
> +
> +    [mutex release];
> +    [queue release];
> +}

Shouldn't this be done in dealloc instead of release?
Especially as retain is not subclassed, so this seems
like it could lead to over-releasing resources.

> +
> +- (NSUInteger) count {
> +    [mutex lock];
> +    NSUInteger c = [queue count];
> +    [mutex unlock];
> +    return c;
> +}

This does not look right, the count can change after it is returned
and the caller does not hold a lock to prevent this.

> +
> +- (CMSampleBufferRef) dequeue {
> +    [mutex lock];
> +
> +    if ([queue count] < 1) {
> +      [mutex unlock];
> +      return nil;
> +    }
> +
> +    AvdeviceAvfoundationBuffer *buffer = [queue objectAtIndex:0];
> +    CMSampleBufferRef sampleBuffer = [buffer getCMSampleBuffer];
> +    [queue removeObjectAtIndex:0];
> +    [mutex unlock];
> +
> +    return sampleBuffer;
> +}
> +
> +- (void) enqueue:(CMSampleBufferRef)buffer {
> +    [mutex lock];
> +    while (MAX_QUEUED_OBJECTS < [queue count]) {
> +      [queue removeObjectAtIndex:0];
> +    }
> +    [queue addObject:[AvdeviceAvfoundationBuffer 
> fromCMSampleBufferRef:(CMSampleBufferRef)CFRetain(buffer)]];
> +    [mutex unlock];
> +}
> + at end
> +
> typedef struct
> {
>     AVClass*        class;
>
> -    int             frames_captured;
> -    int             audio_frames_captured;
> -    pthread_mutex_t frame_lock;
>     id              avf_delegate;
>     id              avf_audio_delegate;
>
> @@ -121,8 +204,8 @@
>     AVCaptureSession         *capture_session;
>     AVCaptureVideoDataOutput *video_output;
>     AVCaptureAudioDataOutput *audio_output;
> -    CMSampleBufferRef         current_frame;
> -    CMSampleBufferRef         current_audio_frame;
> +    AvdeviceAvfoundationBufferQueue *audio_frames;
> +    AvdeviceAvfoundationBufferQueue *video_frames;
>
>     AVCaptureDevice          *observed_device;
> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
> @@ -131,16 +214,6 @@
>     int                      observed_quit;
> } AVFContext;
>
> -static void lock_frames(AVFContext* ctx)
> -{
> -    pthread_mutex_lock(&ctx->frame_lock);
> -}
> -
> -static void unlock_frames(AVFContext* ctx)
> -{
> -    pthread_mutex_unlock(&ctx->frame_lock);
> -}
> -
> /** FrameReciever class - delegate for AVCaptureSession
>  */
> @interface AVFFrameReceiver : NSObject
> @@ -218,17 +291,7 @@ - (void)  captureOutput:(AVCaptureOutput 
> *)captureOutput
>   didOutputSampleBuffer:(CMSampleBufferRef)videoFrame
>          fromConnection:(AVCaptureConnection *)connection
> {
> -    lock_frames(_context);
> -
> -    if (_context->current_frame != nil) {
> -        CFRelease(_context->current_frame);
> -    }
> -
> -    _context->current_frame = 
> (CMSampleBufferRef)CFRetain(videoFrame);
> -
> -    unlock_frames(_context);
> -
> -    ++_context->frames_captured;
> +    [_context->video_frames enqueue:videoFrame];
> }
>
> @end
> @@ -262,17 +325,7 @@ - (void)  captureOutput:(AVCaptureOutput 
> *)captureOutput
>   didOutputSampleBuffer:(CMSampleBufferRef)audioFrame
>          fromConnection:(AVCaptureConnection *)connection
> {
> -    lock_frames(_context);
> -
> -    if (_context->current_audio_frame != nil) {
> -        CFRelease(_context->current_audio_frame);
> -    }
> -
> -    _context->current_audio_frame = 
> (CMSampleBufferRef)CFRetain(audioFrame);
> -
> -    unlock_frames(_context);
> -
> -    ++_context->audio_frames_captured;
> +    [_context->audio_frames enqueue:audioFrame];
> }
>
> @end
> @@ -284,12 +337,16 @@ static void destroy_context(AVFContext* ctx)
>     [ctx->capture_session release];
>     [ctx->video_output    release];
>     [ctx->audio_output    release];
> +    [ctx->video_frames    release];
> +    [ctx->audio_frames    release];
>     [ctx->avf_delegate    release];
>     [ctx->avf_audio_delegate release];
>
>     ctx->capture_session = NULL;
>     ctx->video_output    = NULL;
>     ctx->audio_output    = NULL;
> +    ctx->video_frames    = NULL;
> +    ctx->audio_frames    = NULL;
>     ctx->avf_delegate    = NULL;
>     ctx->avf_audio_delegate = NULL;
>
> @@ -297,12 +354,6 @@ static void destroy_context(AVFContext* ctx)
>       AudioConverterDispose(ctx->audio_converter);
>       ctx->audio_converter = NULL;
>     }
> -
> -    pthread_mutex_destroy(&ctx->frame_lock);
> -
> -    if (ctx->current_frame) {
> -        CFRelease(ctx->current_frame);
> -    }
> }
>
> static void parse_device_name(AVFormatContext *s)
> @@ -630,18 +681,18 @@ static int get_video_config(AVFormatContext *s)
>     }
>
>     // Take stream info from the first frame.
> -    while (ctx->frames_captured < 1) {
> +    while ([ctx->video_frames count] < 1) {
>         CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>     }
>
> -    lock_frames(ctx);
> +    CMSampleBufferRef frame = [ctx->video_frames dequeue];
>
>     ctx->video_stream_index = stream->index;
>
>     avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>
> -    image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
> -    block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
> +    image_buffer = CMSampleBufferGetImageBuffer(frame);
> +    block_buffer = CMSampleBufferGetDataBuffer(frame);
>
>     if (image_buffer) {
>         image_buffer_size = CVImageBufferGetEncodedSize(image_buffer);
> @@ -657,10 +708,7 @@ static int get_video_config(AVFormatContext *s)
>         stream->codecpar->format     = ctx->pixel_format;
>     }
>
> -    CFRelease(ctx->current_frame);
> -    ctx->current_frame = nil;
> -
> -    unlock_frames(ctx);
> +    CFRelease(frame);
>
>     return 0;
> }
> @@ -680,27 +728,27 @@ static int get_audio_config(AVFormatContext *s)
>     }
>
>     // Take stream info from the first frame.
> -    while (ctx->audio_frames_captured < 1) {
> +    while ([ctx->audio_frames count] < 1) {
>         CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>     }
>
> -    lock_frames(ctx);
> +    CMSampleBufferRef frame = [ctx->audio_frames dequeue];
>
>     ctx->audio_stream_index = stream->index;
>
>     avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>
> -    format_desc = 
> CMSampleBufferGetFormatDescription(ctx->current_audio_frame);
> +    format_desc = CMSampleBufferGetFormatDescription(frame);
>     const AudioStreamBasicDescription *input_format = 
> CMAudioFormatDescriptionGetStreamBasicDescription(format_desc);
>
>     if (!input_format) {
> -        unlock_frames(ctx);
> +        CFRelease(frame);
>         av_log(s, AV_LOG_ERROR, "audio format not available\n");
>         return 1;
>     }
>
>     if (input_format->mFormatID != kAudioFormatLinearPCM) {
> -        unlock_frames(ctx);
> +        CFRelease(frame);
>         av_log(s, AV_LOG_ERROR, "only PCM audio format are supported 
> at the moment\n");
>         return 1;
>     }
> @@ -778,16 +826,13 @@ static int get_audio_config(AVFormatContext *s)
>     if (must_convert) {
>         OSStatus ret = AudioConverterNew(input_format, &output_format, 
> &ctx->audio_converter);
>         if (ret != noErr) {
> -            unlock_frames(ctx);
> +            CFRelease(frame);
>             av_log(s, AV_LOG_ERROR, "Error while allocating audio 
> converter\n");
>             return 1;
>         }
>     }
>
> -    CFRelease(ctx->current_audio_frame);
> -    ctx->current_audio_frame = nil;
> -
> -    unlock_frames(ctx);
> +    CFRelease(frame);
>
>     return 0;
> }
> @@ -805,8 +850,6 @@ static int avf_read_header(AVFormatContext *s)
>
>     ctx->num_video_devices = [devices count] + [devices_muxed count];
>
> -    pthread_mutex_init(&ctx->frame_lock, NULL);
> -
> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
>     CGGetActiveDisplayList(0, NULL, &num_screens);
> #endif
> @@ -1006,6 +1049,8 @@ static int avf_read_header(AVFormatContext *s)
>
>     // Initialize capture session
>     ctx->capture_session = [[AVCaptureSession alloc] init];
> +    ctx->video_frames    = [[AvdeviceAvfoundationBufferQueue alloc] 
> init];
> +    ctx->audio_frames    = [[AvdeviceAvfoundationBufferQueue alloc] 
> init];
>
>     if (video_device && add_video_device(s, video_device)) {
>         goto fail;
> @@ -1088,35 +1133,31 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>     AVFContext* ctx = (AVFContext*)s->priv_data;
>
>     do {
> -        CVImageBufferRef image_buffer;
> -        CMBlockBufferRef block_buffer;
> -        lock_frames(ctx);
> -
> -        if (ctx->current_frame != nil) {
> +        if (1 <= [ctx->video_frames count]) {
>             int status;
>             int length = 0;
> -
> -            image_buffer = 
> CMSampleBufferGetImageBuffer(ctx->current_frame);
> -            block_buffer = 
> CMSampleBufferGetDataBuffer(ctx->current_frame);
> +            CMSampleBufferRef video_frame = [ctx->video_frames 
> dequeue];
> +            CVImageBufferRef image_buffer = 
> CMSampleBufferGetImageBuffer(video_frame);;
> +            CMBlockBufferRef block_buffer = 
> CMSampleBufferGetDataBuffer(video_frame);
>
>             if (image_buffer != nil) {
>                 length = (int)CVPixelBufferGetDataSize(image_buffer);
>             } else if (block_buffer != nil) {
>                 length = 
> (int)CMBlockBufferGetDataLength(block_buffer);
>             } else  {
> -                unlock_frames(ctx);
> +                CFRelease(video_frame);
>                 return AVERROR(EINVAL);
>             }
>
>             if (av_new_packet(pkt, length) < 0) {
> -                unlock_frames(ctx);
> +                CFRelease(video_frame);
>                 return AVERROR(EIO);
>             }
>
>             CMItemCount count;
>             CMSampleTimingInfo timing_info;
>
> -            if 
> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_frame, 1, 
> &timing_info, &count) == noErr) {
> +            if 
> (CMSampleBufferGetOutputSampleTimingInfoArray(video_frame, 1, 
> &timing_info, &count) == noErr) {
>                 AVRational timebase_q = av_make_q(1, 
> timing_info.presentationTimeStamp.timescale);
>                 pkt->pts = pkt->dts = 
> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, 
> avf_time_base_q);
>             }
> @@ -1133,15 +1174,14 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>                     status = AVERROR(EIO);
>                 }
>              }
> -            CFRelease(ctx->current_frame);
> -            ctx->current_frame = nil;
> +            CFRelease(video_frame);
>
>             if (status < 0) {
> -                unlock_frames(ctx);
>                 return status;
>             }
> -        } else if (ctx->current_audio_frame != nil) {
> -            CMBlockBufferRef block_buffer = 
> CMSampleBufferGetDataBuffer(ctx->current_audio_frame);
> +        } else if (1 <= [ctx->audio_frames count]) {
> +            CMSampleBufferRef audio_frame = [ctx->audio_frames 
> dequeue];
> +            CMBlockBufferRef block_buffer = 
> CMSampleBufferGetDataBuffer(audio_frame);
>
>             size_t input_size = 
> CMBlockBufferGetDataLength(block_buffer);
>             int buffer_size = input_size / ctx->audio_buffers;
> @@ -1151,12 +1191,12 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>             UInt32 size = sizeof(output_size);
>             ret = AudioConverterGetProperty(ctx->audio_converter, 
> kAudioConverterPropertyCalculateOutputBufferSize, &size, 
> &output_size);
>             if (ret != noErr) {
> -                unlock_frames(ctx);
> +                CFRelease(audio_frame);
>                 return AVERROR(EIO);
>             }
>
>             if (av_new_packet(pkt, output_size) < 0) {
> -                unlock_frames(ctx);
> +                CFRelease(audio_frame);
>                 return AVERROR(EIO);
>             }
>
> @@ -1173,7 +1213,7 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>
>                     if (ret != kCMBlockBufferNoErr) {
>                         av_free(input_buffer);
> -                        unlock_frames(ctx);
> +                        CFRelease(audio_frame);
>                         return AVERROR(EIO);
>                     }
>                 }
> @@ -1191,7 +1231,7 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>                 av_free(input_buffer);
>
>                 if (ret != noErr) {
> -                    unlock_frames(ctx);
> +                    CFRelease(audio_frame);
>                     return AVERROR(EIO);
>                 }
>
> @@ -1199,7 +1239,7 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>             } else {
>                  ret = CMBlockBufferCopyDataBytes(block_buffer, 0, 
> pkt->size, pkt->data);
>                  if (ret != kCMBlockBufferNoErr) {
> -                     unlock_frames(ctx);
> +                     CFRelease(audio_frame);
>                      return AVERROR(EIO);
>                  }
>             }
> @@ -1207,7 +1247,7 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>             CMItemCount count;
>             CMSampleTimingInfo timing_info;
>
> -            if 
> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_audio_frame, 
> 1, &timing_info, &count) == noErr) {
> +            if 
> (CMSampleBufferGetOutputSampleTimingInfoArray(audio_frame, 1, 
> &timing_info, &count) == noErr) {
>                 AVRational timebase_q = av_make_q(1, 
> timing_info.presentationTimeStamp.timescale);
>                 pkt->pts = pkt->dts = 
> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, 
> avf_time_base_q);
>             }
> @@ -1215,21 +1255,15 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>             pkt->stream_index  = ctx->audio_stream_index;
>             pkt->flags        |= AV_PKT_FLAG_KEY;
>
> -            CFRelease(ctx->current_audio_frame);
> -            ctx->current_audio_frame = nil;
> -
> -            unlock_frames(ctx);
> +            CFRelease(audio_frame);
>         } else {
>             pkt->data = NULL;
> -            unlock_frames(ctx);
>             if (ctx->observed_quit) {
>                 return AVERROR_EOF;
>             } else {
>                 return AVERROR(EAGAIN);
>             }
>         }
> -
> -        unlock_frames(ctx);
>     } while (!pkt->data);
>
>     return 0;
> -- 
> 2.30.1 (Apple Git-130)
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".


More information about the ffmpeg-devel mailing list