[FFmpeg-devel] [PATCH v3 02/03] libavdevice/avfoundation.m: Replace mutex-based concurrency handling in avfoundation.m by a thread-safe fifo queue with maximum length.

Romain Beauxis toots at rastageeks.org
Wed Dec 15 02:18:39 EET 2021


This is the second patch of a series of 3 that cleanup and enhance the
avfoundation implementation for libavdevice.

Changes since last version:
* Switched queue implementation to CMSimpleQueue

This patch fixes the concurrency model. Avfoundation runs its own producing thread
to send produced frames and ffmpeg runs its own thread to consume them.

The existing implementation stores the last transmitted frame and uses a mutex
to avoid concurrent access. However, this leads to situations where upcoming frames
can be dropped if the ffmpeg thread is acessing the latest frame. This happens
even when the thread would otherwise catch up and process frames fast enought.

This patches changes this implementation to use a buffer queue with a max queue length
and encapsulated thread-safety. This greatly simplifies the logic of the calling code
and gives the consuming thread a chance to process all frames concurrently to the producing
thread while avoiding memory leaks.

Signed-off-by: Romain Beauxis <toots at rastageeks.org>
---
libavdevice/avfoundation.m | 169 +++++++++++++++++--------------------
1 file changed, 76 insertions(+), 93 deletions(-)

diff --git a/libavdevice/avfoundation.m b/libavdevice/avfoundation.m
index 79c9207cfa..b602cfbe95 100644
--- a/libavdevice/avfoundation.m
+++ b/libavdevice/avfoundation.m
@@ -26,7 +26,7 @@
 */

#import <AVFoundation/AVFoundation.h>
-#include <pthread.h>
+#import <CoreMedia/CoreMedia.h>

#include "libavutil/channel_layout.h"
#include "libavutil/pixdesc.h"
@@ -80,13 +80,12 @@
    { AV_PIX_FMT_NONE, 0 }
};

+#define MAX_QUEUED_FRAMES 10
+
typedef struct
{
    AVClass*        class;

-    int             frames_captured;
-    int             audio_frames_captured;
-    pthread_mutex_t frame_lock;
    id              avf_delegate;
    id              avf_audio_delegate;

@@ -121,8 +120,8 @@
    AVCaptureSession         *capture_session;
    AVCaptureVideoDataOutput *video_output;
    AVCaptureAudioDataOutput *audio_output;
-    CMSampleBufferRef         current_frame;
-    CMSampleBufferRef         current_audio_frame;
+    CMSimpleQueueRef          audio_frames_queue;
+    CMSimpleQueueRef          video_frames_queue;

    AVCaptureDevice          *observed_device;
#if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
@@ -131,16 +130,6 @@
    int                      observed_quit;
} AVFContext;

-static void lock_frames(AVFContext* ctx)
-{
-    pthread_mutex_lock(&ctx->frame_lock);
-}
-
-static void unlock_frames(AVFContext* ctx)
-{
-    pthread_mutex_unlock(&ctx->frame_lock);
-}
-
/** FrameReciever class - delegate for AVCaptureSession
 */
@interface AVFFrameReceiver : NSObject
@@ -218,17 +207,8 @@ - (void)  captureOutput:(AVCaptureOutput *)captureOutput
  didOutputSampleBuffer:(CMSampleBufferRef)videoFrame
         fromConnection:(AVCaptureConnection *)connection
{
-    lock_frames(_context);
-
-    if (_context->current_frame != nil) {
-        CFRelease(_context->current_frame);
-    }
-
-    _context->current_frame = (CMSampleBufferRef)CFRetain(videoFrame);
-
-    unlock_frames(_context);
-
-    ++_context->frames_captured;
+    CFRetain(videoFrame);
+    CMSimpleQueueEnqueue(_context->video_frames_queue, videoFrame);
}

@end
@@ -262,17 +242,8 @@ - (void)  captureOutput:(AVCaptureOutput *)captureOutput
  didOutputSampleBuffer:(CMSampleBufferRef)audioFrame
         fromConnection:(AVCaptureConnection *)connection
{
-    lock_frames(_context);
-
-    if (_context->current_audio_frame != nil) {
-        CFRelease(_context->current_audio_frame);
-    }
-
-    _context->current_audio_frame = (CMSampleBufferRef)CFRetain(audioFrame);
-
-    unlock_frames(_context);
-
-    ++_context->audio_frames_captured;
+    CFRetain(audioFrame);
+    CMSimpleQueueEnqueue(_context->audio_frames_queue, audioFrame);
}

@end
@@ -287,6 +258,30 @@ static void destroy_context(AVFContext* ctx)
    [ctx->avf_delegate    release];
    [ctx->avf_audio_delegate release];

+    CMSampleBufferRef frame;
+
+    if (ctx->video_frames_queue) {
+        frame = (CMSampleBufferRef)CMSimpleQueueDequeue(ctx->video_frames_queue);
+        do {
+          CFRelease(frame);
+          frame = (CMSampleBufferRef)CMSimpleQueueDequeue(ctx->video_frames_queue);
+        } while (frame);
+
+        CFRelease(ctx->video_frames_queue);
+        ctx->video_frames_queue = NULL;
+    }
+
+    if (ctx->audio_frames_queue) {
+        frame = (CMSampleBufferRef)CMSimpleQueueDequeue(ctx->audio_frames_queue);
+        do {
+          CFRelease(frame);
+          frame = (CMSampleBufferRef)CMSimpleQueueDequeue(ctx->audio_frames_queue);
+        } while (frame);
+
+        CFRelease(ctx->audio_frames_queue);
+        ctx->audio_frames_queue = NULL;
+    }
+
    ctx->capture_session = NULL;
    ctx->video_output    = NULL;
    ctx->audio_output    = NULL;
@@ -297,12 +292,6 @@ static void destroy_context(AVFContext* ctx)
      AudioConverterDispose(ctx->audio_converter);
      ctx->audio_converter = NULL;
    }
-
-    pthread_mutex_destroy(&ctx->frame_lock);
-
-    if (ctx->current_frame) {
-        CFRelease(ctx->current_frame);
-    }
}

static void parse_device_name(AVFormatContext *s)
@@ -630,18 +619,18 @@ static int get_video_config(AVFormatContext *s)
    }

    // Take stream info from the first frame.
-    while (ctx->frames_captured < 1) {
+    while (CMSimpleQueueGetCount(ctx->video_frames_queue) < 1) {
        CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
    }

-    lock_frames(ctx);
+    CMSampleBufferRef frame = (CMSampleBufferRef)CMSimpleQueueDequeue(ctx->video_frames_queue);

    ctx->video_stream_index = stream->index;

    avpriv_set_pts_info(stream, 64, 1, avf_time_base);

-    image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
-    block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
+    image_buffer = CMSampleBufferGetImageBuffer(frame);
+    block_buffer = CMSampleBufferGetDataBuffer(frame);

    if (image_buffer) {
        image_buffer_size = CVImageBufferGetEncodedSize(image_buffer);
@@ -657,10 +646,7 @@ static int get_video_config(AVFormatContext *s)
        stream->codecpar->format     = ctx->pixel_format;
    }

-    CFRelease(ctx->current_frame);
-    ctx->current_frame = nil;
-
-    unlock_frames(ctx);
+    CFRelease(frame);

    return 0;
}
@@ -680,27 +666,27 @@ static int get_audio_config(AVFormatContext *s)
    }

    // Take stream info from the first frame.
-    while (ctx->audio_frames_captured < 1) {
+    while (CMSimpleQueueGetCount(ctx->audio_frames_queue) < 1) {
        CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
    }

-    lock_frames(ctx);
+    CMSampleBufferRef frame = (CMSampleBufferRef)CMSimpleQueueDequeue(ctx->audio_frames_queue);

    ctx->audio_stream_index = stream->index;

    avpriv_set_pts_info(stream, 64, 1, avf_time_base);

-    format_desc = CMSampleBufferGetFormatDescription(ctx->current_audio_frame);
+    format_desc = CMSampleBufferGetFormatDescription(frame);
    const AudioStreamBasicDescription *input_format = CMAudioFormatDescriptionGetStreamBasicDescription(format_desc);

    if (!input_format) {
-        unlock_frames(ctx);
+        CFRelease(frame);
        av_log(s, AV_LOG_ERROR, "audio format not available\n");
        return 1;
    }

    if (input_format->mFormatID != kAudioFormatLinearPCM) {
-        unlock_frames(ctx);
+        CFRelease(frame);
        av_log(s, AV_LOG_ERROR, "only PCM audio format are supported at the moment\n");
        return 1;
    }
@@ -778,16 +764,13 @@ static int get_audio_config(AVFormatContext *s)
    if (must_convert) {
        OSStatus ret = AudioConverterNew(input_format, &output_format, &ctx->audio_converter);
        if (ret != noErr) {
-            unlock_frames(ctx);
+            CFRelease(frame);
            av_log(s, AV_LOG_ERROR, "Error while allocating audio converter\n");
            return 1;
        }
    }

-    CFRelease(ctx->current_audio_frame);
-    ctx->current_audio_frame = nil;
-
-    unlock_frames(ctx);
+    CFRelease(frame);

    return 0;
}
@@ -805,8 +788,6 @@ static int avf_read_header(AVFormatContext *s)

    ctx->num_video_devices = [devices count] + [devices_muxed count];

-    pthread_mutex_init(&ctx->frame_lock, NULL);
-
#if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
    CGGetActiveDisplayList(0, NULL, &num_screens);
#endif
@@ -1007,6 +988,19 @@ static int avf_read_header(AVFormatContext *s)
    // Initialize capture session
    ctx->capture_session = [[AVCaptureSession alloc] init];

+    OSStatus ret;
+    ret = CMSimpleQueueCreate(kCFAllocatorDefault, MAX_QUEUED_FRAMES, &ctx->video_frames_queue);
+   
+    if (ret != noErr) {
+        goto fail;
+    }
+
+    ret = CMSimpleQueueCreate(kCFAllocatorDefault, MAX_QUEUED_FRAMES, &ctx->audio_frames_queue); 
+
+    if (ret != noErr) {
+        goto fail;
+    }
+
    if (video_device && add_video_device(s, video_device)) {
        goto fail;
    }
@@ -1088,35 +1082,31 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
    AVFContext* ctx = (AVFContext*)s->priv_data;

    do {
-        CVImageBufferRef image_buffer;
-        CMBlockBufferRef block_buffer;
-        lock_frames(ctx);
-
-        if (ctx->current_frame != nil) {
+        if (1 <= CMSimpleQueueGetCount(ctx->video_frames_queue)) {
            int status;
            int length = 0;
-
-            image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
-            block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
+            CMSampleBufferRef video_frame = (CMSampleBufferRef)CMSimpleQueueDequeue(ctx->video_frames_queue);
+            CVImageBufferRef image_buffer = CMSampleBufferGetImageBuffer(video_frame);;
+            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(video_frame);

            if (image_buffer != nil) {
                length = (int)CVPixelBufferGetDataSize(image_buffer);
            } else if (block_buffer != nil) {
                length = (int)CMBlockBufferGetDataLength(block_buffer);
            } else  {
-                unlock_frames(ctx);
+                CFRelease(video_frame);
                return AVERROR(EINVAL);
            }

            if (av_new_packet(pkt, length) < 0) {
-                unlock_frames(ctx);
+                CFRelease(video_frame);
                return AVERROR(EIO);
            }

            CMItemCount count;
            CMSampleTimingInfo timing_info;

-            if (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_frame, 1, &timing_info, &count) == noErr) {
+            if (CMSampleBufferGetOutputSampleTimingInfoArray(video_frame, 1, &timing_info, &count) == noErr) {
                AVRational timebase_q = av_make_q(1, timing_info.presentationTimeStamp.timescale);
                pkt->pts = pkt->dts = av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, avf_time_base_q);
            }
@@ -1133,15 +1123,14 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
                    status = AVERROR(EIO);
                }
             }
-            CFRelease(ctx->current_frame);
-            ctx->current_frame = nil;
+            CFRelease(video_frame);

            if (status < 0) {
-                unlock_frames(ctx);
                return status;
            }
-        } else if (ctx->current_audio_frame != nil) {
-            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(ctx->current_audio_frame);
+        } else if (1 <= CMSimpleQueueGetCount(ctx->audio_frames_queue)) {
+            CMSampleBufferRef audio_frame = (CMSampleBufferRef)CMSimpleQueueDequeue(ctx->audio_frames_queue);
+            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(audio_frame);

            size_t input_size = CMBlockBufferGetDataLength(block_buffer);
            int buffer_size = input_size / ctx->audio_buffers;
@@ -1151,12 +1140,12 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
            UInt32 size = sizeof(output_size);
            ret = AudioConverterGetProperty(ctx->audio_converter, kAudioConverterPropertyCalculateOutputBufferSize, &size, &output_size);
            if (ret != noErr) {
-                unlock_frames(ctx);
+                CFRelease(audio_frame);
                return AVERROR(EIO);
            }

            if (av_new_packet(pkt, output_size) < 0) {
-                unlock_frames(ctx);
+                CFRelease(audio_frame);
                return AVERROR(EIO);
            }

@@ -1173,7 +1162,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)

                    if (ret != kCMBlockBufferNoErr) {
                        av_free(input_buffer);
-                        unlock_frames(ctx);
+                        CFRelease(audio_frame);
                        return AVERROR(EIO);
                    }
                }
@@ -1191,7 +1180,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
                av_free(input_buffer);

                if (ret != noErr) {
-                    unlock_frames(ctx);
+                    CFRelease(audio_frame);
                    return AVERROR(EIO);
                }

@@ -1199,7 +1188,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
            } else {
                 ret = CMBlockBufferCopyDataBytes(block_buffer, 0, pkt->size, pkt->data);
                 if (ret != kCMBlockBufferNoErr) {
-                     unlock_frames(ctx);
+                     CFRelease(audio_frame);
                     return AVERROR(EIO);
                 }
            }
@@ -1207,7 +1196,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
            CMItemCount count;
            CMSampleTimingInfo timing_info;

-            if (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_audio_frame, 1, &timing_info, &count) == noErr) {
+            if (CMSampleBufferGetOutputSampleTimingInfoArray(audio_frame, 1, &timing_info, &count) == noErr) {
                AVRational timebase_q = av_make_q(1, timing_info.presentationTimeStamp.timescale);
                pkt->pts = pkt->dts = av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, avf_time_base_q);
            }
@@ -1215,21 +1204,15 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
            pkt->stream_index  = ctx->audio_stream_index;
            pkt->flags        |= AV_PKT_FLAG_KEY;

-            CFRelease(ctx->current_audio_frame);
-            ctx->current_audio_frame = nil;
-
-            unlock_frames(ctx);
+            CFRelease(audio_frame);
        } else {
            pkt->data = NULL;
-            unlock_frames(ctx);
            if (ctx->observed_quit) {
                return AVERROR_EOF;
            } else {
                return AVERROR(EAGAIN);
            }
        }
-
-        unlock_frames(ctx);
    } while (!pkt->data);

    return 0;
-- 
2.30.1 (Apple Git-130)



More information about the ffmpeg-devel mailing list