28 namespace seqan3::contrib
43 template <
typename TSpec =
void>
46 template <
typename TSpec = Tag<
void>>
49 template <
typename TValue,
typename TSpec = Suspendable<>>
50 class ConcurrentQueue;
53 using Limit = Tag<Limit_>;
55 template <
typename TValue,
typename TSpec>
56 class ConcurrentQueue<TValue, Suspendable<TSpec> >
60 typedef typename TString::size_type TSize;
86 assert(writerCount == 0u);
89 while (readerCount != 0u)
94 template <
typename TValue>
95 class ConcurrentQueue<TValue, Suspendable<Limit> >:
96 public ConcurrentQueue<TValue, Suspendable<> >
99 typedef ConcurrentQueue<TValue, Suspendable<> > TBase;
100 typedef typename TBase::TString TString;
101 typedef typename TBase::TSize TSize;
105 ConcurrentQueue(TSize maxSize):
108 this->data.resize(maxSize);
113 ConcurrentQueue(ConcurrentQueue
const & other):
114 TBase((TBase const &)other)
118 template <
typename TValue,
typename TSpec>
120 lockReading(ConcurrentQueue<TValue, Suspendable<TSpec> > &)
123 template <
typename TValue,
typename TSpec>
125 unlockReading(ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
129 if (--me.readerCount != 0u)
132 me.less.notify_all();
135 template <
typename TValue,
typename TSpec>
137 lockWriting(ConcurrentQueue<TValue, Suspendable<TSpec> > &)
140 template <
typename TValue,
typename TSpec>
142 unlockWriting(ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
146 if (--me.writerCount != 0u)
149 me.more.notify_all();
152 template <
typename TValue,
typename TSize,
typename TSpec>
154 setReaderCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize readerCount)
157 me.readerCount = readerCount;
160 template <
typename TValue,
typename TSize,
typename TSpec>
162 setWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize writerCount)
165 me.writerCount = writerCount;
168 template <
typename TValue,
typename TSize1,
typename TSize2,
typename TSpec>
170 setReaderWriterCount(ConcurrentQueue<TValue, Suspendable<TSpec> > & me, TSize1 readerCount, TSize2 writerCount)
173 me.readerCount = readerCount;
174 me.writerCount = writerCount;
177 template <
typename TValue,
typename TSize,
typename TSpec>
179 waitForMinSize(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
183 while (me.occupied < minSize && me.writerCount > 0u)
185 return me.occupied >= minSize;
188 template <
typename TValue,
typename TSpec>
190 empty(ConcurrentQueue<TValue, Suspendable<TSpec> >
const & me)
192 return me.occupied == 0;
195 template <
typename TValue,
typename TSpec>
196 inline typename ConcurrentQueue<TValue, Suspendable<TSpec> >::SizeType
197 length(ConcurrentQueue<TValue, Suspendable<TSpec> >
const & me)
202 template <
typename TValue,
typename TSpec>
204 _popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
207 typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
208 typedef typename TQueue::TString TString;
209 typedef typename TString::size_type TSize;
211 TSize cap = me.data.size();
213 while (me.occupied == 0u && me.writerCount > 0u)
216 if (me.occupied == 0u)
219 assert(me.occupied > 0u);
223 result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.front));
227 me.front = (me.front + 1) % cap;
239 template <
typename TValue,
typename TSpec>
241 _popBack(TValue & result,
242 ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
245 typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
246 typedef typename TQueue::TString TString;
247 typedef typename TString::size_type TSize;
249 TSize cap = me.data.size();
251 while (me.occupied == 0u && me.writerCount > 0u)
254 if (me.occupied == 0u)
257 assert(me.occupied > 0u);
259 me.back = (me.back + cap - 1) % cap;
263 result = std::ranges::iter_move(std::ranges::next(me.data.begin(), me.back));
278 template <
typename TValue,
typename TSpec>
280 popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
283 return _popFront(result, me, lock);
286 template <
typename TValue>
288 popFront(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit> > & me)
292 if (!_popFront(result, me, lk))
295 me.less.notify_all();
299 template <
typename TValue,
typename TSpec>
301 popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<TSpec> > & me)
304 return _popBack(result, me, lk);
307 template <
typename TValue>
309 popBack(TValue & result, ConcurrentQueue<TValue, Suspendable<Limit> > & me)
313 if (!_popBack(result, me, lk))
316 me.less.notify_all();
321 template <
typename TValue,
typename TValue2,
typename TSpec,
typename TExpand>
323 appendValue(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
325 [[maybe_unused]] Tag<TExpand> expandTag)
327 typedef ConcurrentQueue<TValue, Suspendable<TSpec> > TQueue;
328 typedef typename TQueue::TString TString;
329 typedef typename TString::size_type TSize;
333 TSize cap = me.data.size();
335 if (me.occupied >= cap)
340 me.data.resize(cap + 1);
341 TSize delta = me.data.size() - cap;
347 std::ranges::move_backward(std::span{me.data.data() + me.front, me.data.data() + cap},
348 me.data.data() + me.data.size());
349 if (me.occupied != 0 && me.back <= me.front)
356 *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
357 me.back = (me.back + 1) % cap;
367 me.more.notify_all();
371 template <
typename TValue,
typename TValue2,
typename TSpec,
typename TExpand>
373 appendValue(ConcurrentQueue<TValue, Suspendable<Limit> > & me,
375 Tag<TExpand> expandTag);
377 template <
typename TValue,
typename TValue2>
379 appendValue(ConcurrentQueue<TValue, Suspendable<Limit> > & me,
383 typedef ConcurrentQueue<TValue, Suspendable<Limit> > TQueue;
384 typedef typename TQueue::TString TString;
385 typedef typename TString::size_type TSize;
389 TSize cap = me.data.size();
391 while (me.occupied >= cap && me.readerCount > 0u)
394 if (me.occupied >= cap)
397 assert(me.occupied < cap);
400 *std::ranges::next(me.data.begin(), me.back) = std::forward<TValue2>(val);
401 me.back = (me.back + 1) % cap;
410 me.more.notify_all();
414 template <
typename TValue,
typename TValue2,
typename TSpec>
416 appendValue(ConcurrentQueue<TValue, Suspendable<TSpec> > & me,
419 return appendValue(me, std::forward<TValue2>(val), TSpec{});
The <ranges> header from C++20's standard library.
Provides std::span from the C++20 standard library.