6#ifndef XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
7#define XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
9#include <xenium/marked_ptr.hpp>
10#include <xenium/parameter.hpp>
11#include <xenium/policy.hpp>
12#include <xenium/utils.hpp>
14#include <xenium/detail/pointer_queue_traits.hpp>
23#pragma warning(disable: 26495)
46 template <
class T,
class... Policies>
49 using traits = detail::pointer_queue_traits_t<
T,
Policies...>;
50 using raw_value_type =
typename traits::raw_type;
84 std::atomic<marked_value> value;
86 char padding[std::max(padding_bytes, 1u)];
89 struct unpadded_entry {
90 std::atomic<marked_value> value;
93 using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
103 marked_idx() =
default;
106 uint64_t get()
const noexcept {
return val_ & val_mask; }
107 uint64_t mark()
const noexcept {
return val_ >> bits; }
108 bool operator==(
const marked_idx&
other)
const noexcept {
return this->val_ ==
other.val_; }
109 bool operator!=(
const marked_idx&
other)
const noexcept {
return this->val_ !=
other.val_; }
111 static constexpr unsigned bits = 16;
112 static constexpr uint64_t val_mask = (
static_cast<uint64_t>(1) << bits) - 1;
116 template <
bool Empty>
118 bool queue_full(
const marked_idx&
head_old,
const marked_idx&
tail_old)
const;
119 bool segment_empty(
const marked_idx&
head_old)
const;
124 std::uint64_t queue_size_;
128 std::atomic<marked_idx> head_;
129 std::atomic<marked_idx> tail_;
130 std::unique_ptr<entry[]> queue_;
133 template <
class T,
class... Policies>
134 kirsch_bounded_kfifo_queue<T, Policies...>::kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments) :
135 queue_size_(k * num_segments),
139 queue_(new entry[k * num_segments]())
142 template <
class T,
class... Policies>
143 kirsch_bounded_kfifo_queue<T, Policies...>::~kirsch_bounded_kfifo_queue() {
144 for (
unsigned i = 0; i < queue_size_; ++i)
145 traits::delete_value(queue_[i].value.load(std::memory_order_relaxed).get());
148 template <
class T,
class... Policies>
150 if (value ==
nullptr)
151 throw std::invalid_argument(
"value can not be nullptr");
153 raw_value_type
raw_value = traits::get_raw(value);
155 marked_idx
tail_old = tail_.load(std::memory_order_relaxed);
156 marked_idx
head_old = head_.load(std::memory_order_relaxed);
161 if (
tail_old != tail_.load(std::memory_order_relaxed))
168 if (queue_[
idx].value.compare_exchange_strong(
171 traits::release(value);
179 head_.compare_exchange_strong(
head_old,
new_head, std::memory_order_relaxed);
180 }
else if (
head_old == head_.load(std::memory_order_relaxed)) {
187 tail_.compare_exchange_strong(
tail_old,
new_tail, std::memory_order_relaxed);
195 marked_idx
head_old = head_.load(std::memory_order_relaxed);
196 marked_idx
tail_old = tail_.load(std::memory_order_relaxed);
201 if (
head_old != head_.load(std::memory_order_relaxed))
208 tail_.compare_exchange_strong(
tail_old,
new_tail, std::memory_order_relaxed);
212 if (queue_[
idx].value.compare_exchange_strong(
222 head_.compare_exchange_strong(
head_old,
new_head, std::memory_order_relaxed);
228 template <
bool Empty>
233 for (
size_t i = 0;
i < k_;
i++) {
237 old = queue_[index].value.load(std::memory_order_acquire);
246 template <
class T,
class... Policies>
247 bool kirsch_bounded_kfifo_queue<T, Policies...>::committed(
248 const marked_idx& tail_old, marked_value value, uint64_t index)
250 if (queue_[index].value.load(std::memory_order_relaxed) != value)
253 marked_idx tail_current = tail_.load(std::memory_order_relaxed);
254 marked_idx head_current = head_.load(std::memory_order_relaxed);
255 if (in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
257 }
else if (not_in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
258 marked_value new_value(
nullptr, value.mark() + 1);
259 if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
262 marked_idx new_head(head_current.get(), head_current.mark() + 1);
263 if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed))
266 marked_value new_value(
nullptr, value.mark() + 1);
267 if (!queue_[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed))
273 template <
class T,
class... Policies>
274 bool kirsch_bounded_kfifo_queue<T, Policies...>::queue_full(
275 const marked_idx& head_old,
const marked_idx& tail_old)
const
277 if (((tail_old.get() + k_) % queue_size_) == head_old.get() &&
278 (head_old == head_.load(std::memory_order_relaxed)))
283 template <
class T,
class... Policies>
284 bool kirsch_bounded_kfifo_queue<T, Policies...>::segment_empty(
const marked_idx& head_old)
const {
285 const uint64_t start = head_old.get();
286 for (
size_t i = 0; i < k_; i++) {
289 if (queue_[(start + i) % queue_size_].value.load(std::memory_order_acquire).get() !=
nullptr)
295 template <
class T,
class... Policies>
296 bool kirsch_bounded_kfifo_queue<T, Policies...>::in_valid_region(uint64_t tail_old,
297 uint64_t tail_current, uint64_t head_current)
const
299 bool wrap_around = tail_current < head_current;
301 return head_current < tail_old && tail_old <= tail_current;
302 return head_current < tail_old || tail_old <= tail_current;
305 template <
class T,
class... Policies>
306 bool kirsch_bounded_kfifo_queue<T, Policies...>::not_in_valid_region(uint64_t tail_old,
307 uint64_t tail_current, uint64_t head_current)
const
309 bool wrap_around = tail_current < head_current;
311 return tail_old < tail_current || head_current < tail_old;
312 return tail_old < tail_current && head_current < tail_old;
A bounded lock-free multi-producer/multi-consumer k-FIFO queue.
Definition kirsch_bounded_kfifo_queue.hpp:47
static constexpr std::size_t entry_size
Provides the effective size of a single queue entry (including padding).
Definition kirsch_bounded_kfifo_queue.hpp:99
bool try_pop(value_type &result)
Definition kirsch_bounded_kfifo_queue.hpp:193
bool try_push(value_type value)
Tries to push a new element to the queue. Progress guarantees: lock-free.
Definition kirsch_bounded_kfifo_queue.hpp:149
A pointer with an embedded mark/tag value.
Definition marked_ptr.hpp:41
Slim wrapper around std::hash with specialization for pointer types.
Definition hash.hpp:25
Policy to configure the number of padding bytes to add to each entry in kirsch_kfifo_queue and kirsch...
Definition policy.hpp:117