[Libav-user] avio_alloc_context with multiple threads
Victor dMdB
ffmpeg at eml.cc
Mon Jan 2 18:23:53 EET 2017
On Mon, 2 Jan 2017, at 12:47 AM, Charles wrote:
> On 01/01/2017 10:07 AM, Victor dMdB wrote:
> > On Sun, 1 Jan 2017, at 11:04 PM, Charles wrote:
> >> On 01/01/2017 05:22 AM, Victor dMdB wrote:
> >>> If I have multiple streams of distinct data, how would I go about making
> >>> sure data is thread safe in real-time applications?
> >>>
> >>> My code is in C++, so I managed to pass a write function using extern C
> >>> but that makes the function global (ie not thread safe), I tried fixing
> >>> with thread_local but i then ran into other issues.
> >>>
> >>> I then tried using the private data ptr, but then I get a data race,
> >>> because I was just polling with a while loop in another thread, which is
> >>> rather unreliable.
> >>>
> >>> Are there any previous examples of how to do this?
> >>> _______________________________________________
> >>> Libav-user mailing list
> >>> Libav-user at ffmpeg.org
> >>> http://ffmpeg.org/mailman/listinfo/libav-user
> >>
> >> Muxing or Demuxing? Multiple streams are files or actual transport
> >> streams?
> >>
> >> Thanks
> >> Charles
> >
> >
> > In my case it is to mux mp4 files into rtp format, would it be different
> > if it were transport streams?
>
> It could change they way you want to tackle it.
>
> First, to make it thread safe you have to use some form of
> synchronization.
> I use a template LockingQueue.hpp
>
> Code :
> public :
> void PushItem( T f_ )
> {
> std::lock_guard<std::mutex> lock(m_queue_mtx);
> m_queue.push( f_ );
> }
>
> T PopItem()
> {
> T f_;
> std::lock_guard<std::mutex> lock(m_queue_mtx);
> f_ = m_queue.front();
> m_queue.pop();
> return f_;
> }
> protected :
> std::queue<T> m_queue;
> std::mutex m_queue_mtx;
>
> You can google thread safe multiple producers and there are lots of
> different ways to implement.
>
> Run the stream out as a thread and give it a SendPacket( AvPacket *
> in_pkt ) method that will PushItem( in_pkt )
>
> NOTE : You may need to do a memcpy depending on how the packets are
> allocated. Do memcpy before PushItem/SendPacket to prevent additional
> lock time
>
> In the while loop in the thread check the queue Size() and next_pkt =
> PopItem()
> Then do any ts scaling and stream re-ident and
> av_interleaved_write_frame( m_av_out_fmt_ctx, next_pkt );
>
> If the input is already in transport streams, then you could just round
> robin the input sockets and just forward each packet as it comes in.
>
> Thanks
> Charles
>
>
>
> _______________________________________________
> Libav-user mailing list
> Libav-user at ffmpeg.org
> http://ffmpeg.org/mailman/listinfo/libav-user
Ok thanks, I've looked at it a bit more but I seem to still be getting
some memory corruption in my polling thread. So the static function used
with avio_alloc_context might be:
static write(void* a, uint8_t* b, int c) {
LockingQueue * queue = reinterpret_cast<LockingQueue *>(a);
queue->PushItem(std::string( b, b + c ));
return c;
}
while the thread polling the queue would look like:
void pollQueue( std::shared_ptr<LockingQueue> buffer ){
while(true){
LockingQueue * queue = buffer.get();
if( queue != nullptr ){
if( queue->size() > 0 ){ //adding a size method
to LockingQueue
std::string pkt = queue->PopItem();
// Do what i need with pkt
}
}
}
}
And the setup would look something like :
std::shared_ptr<LockingQueue> queue =
std::make_shared<LockingQueue>();
std::thread poll = std::thread{ &pollQueue, this, queue };
avio_alloc_context(avioCtxBuf, avioCtxBuf_size, 1, queue.get(),
NULL, write, NULL);
poll.detach();
Would this be a thread-safe example?
More information about the Libav-user
mailing list