10#ifdef RDK_THREADSAFE_SSS
11#ifndef CONCURRENT_QUEUE
12#define CONCURRENT_QUEUE
13#include <condition_variable>
19class ConcurrentQueue {
21 unsigned int d_capacity;
23 std::vector<E> d_elements;
24 unsigned int d_head, d_tail;
25 mutable std::mutex d_lock;
26 std::condition_variable d_notEmpty, d_notFull;
29 ConcurrentQueue<E>(
const ConcurrentQueue<E>&);
30 ConcurrentQueue<E>& operator=(
const ConcurrentQueue<E>&);
33 ConcurrentQueue<E>(
unsigned int capacity)
34 : d_capacity(capacity), d_done(false), d_head(0), d_tail(0) {
35 std::vector<E> elements(capacity);
36 d_elements = elements;
42 void push(
const E& element);
64void ConcurrentQueue<E>::push(
const E& element) {
65 std::unique_lock<std::mutex> lk(d_lock);
68 while (d_head + d_capacity == d_tail) {
71 bool wasEmpty = (d_head == d_tail);
72 d_elements.at(d_tail % d_capacity) = element;
78 d_notEmpty.notify_all();
83bool ConcurrentQueue<E>::pop(E& element) {
84 std::unique_lock<std::mutex> lk(d_lock);
87 while (d_head == d_tail) {
93 bool wasFull = (d_head + d_capacity == d_tail);
94 element = d_elements.at(d_head % d_capacity);
100 d_notFull.notify_all();
106bool ConcurrentQueue<E>::isEmpty()
const {
107 std::unique_lock<std::mutex> lk(d_lock);
108 return (d_head == d_tail);
112bool ConcurrentQueue<E>::getDone()
const {
113 std::unique_lock<std::mutex> lk(d_lock);
118void ConcurrentQueue<E>::setDone() {
119 std::unique_lock<std::mutex> lk(d_lock);
121 d_notEmpty.notify_all();
125void ConcurrentQueue<E>::clear() {
126 std::unique_lock<std::mutex> lk(d_lock);