xenium
Loading...
Searching...
No Matches
kirsch_kfifo_queue.hpp
1//
2// Copyright (c) 2018-2020 Manuel Pöter.
3// Licensed under the MIT License. See LICENSE file in the project root for full license information.
4//
5
6#ifndef XENIUM_KIRSCH_KFIFO_QUEUE_HPP
7#define XENIUM_KIRSCH_KFIFO_QUEUE_HPP
8
9#include <xenium/marked_ptr.hpp>
10#include <xenium/parameter.hpp>
11#include <xenium/policy.hpp>
12#include <xenium/utils.hpp>
13
14#include <xenium/detail/pointer_queue_traits.hpp>
15
16#include <algorithm>
17#include <atomic>
18#include <cstdint>
19#include <stdexcept>
20
21namespace xenium {
43 template <class T, class... Policies>
45 private:
46 using traits = detail::pointer_queue_traits_t<T, Policies...>;
47 using raw_value_type = typename traits::raw_type;
48 public:
49 using value_type = T;
50 using reclaimer = parameter::type_param_t<policy::reclaimer, parameter::nil, Policies...>;
51 static constexpr unsigned padding_bytes = parameter::value_param_t<unsigned, policy::padding_bytes, sizeof(raw_value_type), Policies...>::value;
52
53 static_assert(parameter::is_set<reclaimer>::value, "reclaimer policy must be specified");
54
55 template <class... NewPolicies>
57
60
63
66
73 void push(value_type value);
74
81 [[nodiscard]] bool try_pop(value_type& result);
82
83 private:
85
86 struct padded_entry {
87 std::atomic<marked_value> value;
88 // we use max here to avoid arrays of size zero which are not allowed by Visual C++
89 char padding[std::max(padding_bytes, 1u)];
90 };
91
92 struct unpadded_entry {
93 std::atomic<marked_value> value;
94 };
95 using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
96
97 public:
101 static constexpr std::size_t entry_size = sizeof(entry);
102
103 private:
104 struct segment;
105
106 struct segment_deleter {
107 void operator()(segment* seg) const { release_segment(seg); }
108 };
109 struct segment : reclaimer::template enable_concurrent_ptr<segment, 16, segment_deleter> {
110 using concurrent_ptr = typename reclaimer::template concurrent_ptr<segment, 16>;
111
112 explicit segment(uint64_t k) : k(k) {}
113 ~segment() {
114 for (unsigned i = 0; i < k; ++i) {
115 assert(items()[i].value.load(std::memory_order_relaxed).get() == nullptr);
116 }
117 }
118
119 void delete_remaining_items() {
120 for (unsigned i = 0; i < k; ++i) {
121 traits::delete_value(items()[i].value.load(std::memory_order_relaxed).get());
122 items()[i].value.store(nullptr, std::memory_order_relaxed);
123 }
124 }
125
126 entry* items() noexcept { return reinterpret_cast<entry*>(this + 1); }
127
128 std::atomic<bool> deleted{false};
129 const uint64_t k;
130 concurrent_ptr next{};
131 };
132
133 using concurrent_ptr = typename segment::concurrent_ptr;
134 using marked_ptr = typename concurrent_ptr::marked_ptr;
135 using guard_ptr = typename concurrent_ptr::guard_ptr;
136
137 segment* alloc_segment() const;
138 static void release_segment(segment* seg);
139
140 template <bool Empty>
141 bool find_index(marked_ptr segment, uint64_t& value_index, marked_value& old) const noexcept;
142 void advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept;
143 void advance_tail(marked_ptr tail_old) noexcept;
144 bool committed(marked_ptr segment, marked_value value, uint64_t index) noexcept;
145
146 const std::size_t k_;
147 concurrent_ptr head_;
148 concurrent_ptr tail_;
149 };
150
151 template <class T, class... Policies>
152 kirsch_kfifo_queue<T, Policies...>::kirsch_kfifo_queue(uint64_t k) :
153 k_(k)
154 {
155 const auto seg = alloc_segment();
156 head_.store(seg, std::memory_order_relaxed);
157 tail_.store(seg, std::memory_order_relaxed);
158 }
159
160 template <class T, class... Policies>
161 kirsch_kfifo_queue<T, Policies...>::~kirsch_kfifo_queue() {
162 auto seg = head_.load(std::memory_order_relaxed).get();
163 while (seg) {
164 auto next = seg->next.load(std::memory_order_relaxed).get();
165 seg->delete_remaining_items();
166 release_segment(seg);
167 seg = next;
168 }
169 }
170
171 template <class T, class... Policies>
172 auto kirsch_kfifo_queue<T, Policies...>::alloc_segment() const -> segment* {
173 void* data = ::operator new(sizeof(segment) + k_ * sizeof(entry));
174 auto result = new(data) segment(k_);
175 for (std::size_t i = 0; i < k_; ++i)
176 new(&result->items()[i]) entry();
177 return result;
178 }
179
180 template <class T, class... Policies>
181 void kirsch_kfifo_queue<T, Policies...>::release_segment(segment* seg) {
182 seg->~segment();
183 ::operator delete(seg);
184 }
185
186 template <class T, class... Policies>
188 if (value == nullptr)
189 throw std::invalid_argument("value cannot be nullptr");
190
191 raw_value_type raw_value = traits::get_raw(value);
192 guard_ptr tail_old;
193 for (;;) {
194 // (1) - this acquire-load synchronizes-with the release-CAS (9, 12, 14)
195 tail_old.acquire(tail_, std::memory_order_acquire);
196
197 // TODO - local linearizability
198
199 uint64_t idx = 0;
202 if (tail_old != tail_.load(std::memory_order_relaxed))
203 continue;
204
205 if (found_idx) {
206 const marked_value new_value(raw_value, old_value.mark() + 1);
207 // (2) - this release-CAS synchronizes-with the acquire-CAS (5)
208 if (tail_old->items()[idx].value.compare_exchange_strong(old_value, new_value,
209 std::memory_order_release, std::memory_order_relaxed) &&
210 committed(tail_old, new_value, idx)) {
211 traits::release(value);
212 // TODO - local linearizability
213 return;
214 }
215 } else {
216 advance_tail(tail_old);
217 }
218 }
219 }
220
221 template <class T, class... Policies>
223 guard_ptr head_old;
224 for (;;) {
225 // (3) - this acquire-load synchronizes-with the release-CAS (10)
226 head_old.acquire(head_, std::memory_order_acquire);
227 auto h = head_old.get();
228 (void)h;
229 uint64_t idx = 0;
232 if (head_old != head_.load(std::memory_order_relaxed))
233 continue;
234
235 // (4) - this acquire-load synchronizes-with the release-CAS (9, 12, 14)
236 marked_ptr tail_old = tail_.load(std::memory_order_acquire);
237 if (found_idx) {
238 assert(old_value.get() != (void*)0x100);
239 if (head_old.get() == tail_old.get())
240 advance_tail(tail_old);
241
242 const marked_value new_value(nullptr, old_value.mark() + 1);
243 // (5) - this acquire-CAS synchronizes-with the release-CAS (2)
244 if (head_old->items()[idx].value.compare_exchange_strong(old_value, new_value,
245 std::memory_order_acquire, std::memory_order_relaxed)) {
246 traits::store(result, old_value.get());
247 return true;
248 }
249 } else {
250 if (head_old.get() == tail_old.get() && tail_old == tail_.load(std::memory_order_relaxed))
251 return false; // queue is empty
252 advance_head(head_old, tail_old);
253 }
254 }
255 }
256
257 template <class T, class... Policies>
258 template <bool Empty>
260 marked_ptr segment, uint64_t& value_index, marked_value& old) const noexcept
261 {
262 const uint64_t k = segment->k;
263 const uint64_t random_index = utils::random() % k;
264 for (size_t i = 0; i < k; i++) {
265 uint64_t index = ((random_index + i) % k);
266 old = segment->items()[index].value.load(std::memory_order_relaxed);
267 if ((Empty && old.get() == nullptr) || (!Empty && old.get() != nullptr)) {
268 value_index = index;
269 return true;
270 }
271 }
272 return false;
273 }
274
275 template <class T, class... Policies>
276 bool kirsch_kfifo_queue<T, Policies...>::committed(
277 marked_ptr segment, marked_value value, uint64_t index) noexcept
278 {
279 if (value != segment->items()[index].value.load(std::memory_order_relaxed))
280 return true;
281
282 const marked_value empty_value(nullptr, value.mark() + 1);
283
284 if (segment->deleted.load(std::memory_order_relaxed) == true) {
285 // Insert tail segment has been removed, but we are fine if element still has been removed.
286 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
287 }
288
289 // (6) - this acquire-load synchronizes-with the release-CAS (10)
290 marked_ptr head_current = head_.load(std::memory_order_acquire);
291 if (segment.get() == head_current.get()) {
292 // Insert tail segment is now head.
293 marked_ptr new_head(head_current.get(), head_current.mark() + 1);
294 // This relaxed-CAS is part of a release sequence headed by (10)
295 if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed))
296 // We are fine if we can update head and thus fail any concurrent
297 // advance_head attempts.
298 return true;
299
300 // We are fine if element still has been removed.
301 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
302 }
303
304 if (segment->deleted.load(std::memory_order_relaxed) == false) {
305 // Insert tail segment still not deleted.
306 return true;
307 } else {
308 // Head and tail moved beyond this segment. Try to remove the item.
309 // We are fine if element still has been removed.
310 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
311 }
312 }
313
314 template <class T, class... Policies>
315 void kirsch_kfifo_queue<T, Policies...>::advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept {
316 // (7) - this acquire-load synchronizes-with the release-CAS (13)
317 const marked_ptr head_next_segment = head_current->next.load(std::memory_order_acquire);
318 if (head_current != head_.load(std::memory_order_relaxed))
319 return;
320
321 if (head_current.get() == tail_current.get()) {
322 // (8) - this acquire-load synchronizes-with the release-CAS (13)
323 const marked_ptr tail_next_segment = tail_current->next.load(std::memory_order_acquire);
324 if (tail_next_segment.get() == nullptr)
325 return;
326
327 if (tail_current == tail_.load(std::memory_order_relaxed)) {
328 marked_ptr new_tail(tail_next_segment.get(), tail_current.mark() + 1);
329 // (9) - this release-CAS synchronizes-with the acquire-load (1, 4)
330 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
331 }
332 }
333
334 head_current->deleted.store(true, std::memory_order_relaxed);
335
336 marked_ptr expected = head_current;
337 marked_ptr new_head(head_next_segment.get(), head_current.mark() + 1);
338 // (10) - this release-CAS synchronizes-with the acquire-load (3, 6)
339 if (head_.compare_exchange_strong(expected, new_head, std::memory_order_release, std::memory_order_relaxed)) {
340 head_current.reclaim();
341 }
342 }
343
344 template <class T, class... Policies>
345 void kirsch_kfifo_queue<T, Policies...>::advance_tail(marked_ptr tail_current) noexcept {
346 // (11) - this acquire-load synchronizes-with the release-CAS (13)
347 marked_ptr next_segment = tail_current->next.load(std::memory_order_acquire);
348 if (tail_current != tail_.load(std::memory_order_relaxed))
349 return;
350
351 if (next_segment.get() != nullptr) {
352 marked_ptr new_tail(next_segment.get(), next_segment.mark() + 1);
353 // (12) - this release-CAS synchronizes-with the acquire-load (1, 4)
354 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
355 } else {
356 auto seg = alloc_segment();
357 const marked_ptr new_segment(seg, next_segment.mark() + 1);
358 // TODO - insert own value to simplify push?
359 // (13) - this release-CAS synchronizes-with the acquire-load (7, 8, 11)
360 if (tail_current->next.compare_exchange_strong(next_segment, new_segment,
361 std::memory_order_release, std::memory_order_relaxed)) {
362 marked_ptr new_tail(seg, tail_current.mark() + 1);
363 // (14) - this release-CAS synchronizes-with the acquire-load (1, 4)
364 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
365 } else {
366 release_segment(seg);
367 }
368 }
369 }
370}
371#endif
An unbounded lock-free multi-producer/multi-consumer k-FIFO queue.
Definition kirsch_kfifo_queue.hpp:44
bool try_pop(value_type &result)
Definition kirsch_kfifo_queue.hpp:222
static constexpr std::size_t entry_size
Provides the effective size of a single queue entry (including padding).
Definition kirsch_kfifo_queue.hpp:101
void push(value_type value)
Definition kirsch_kfifo_queue.hpp:187
A pointer with an embedded mark/tag value.
Definition marked_ptr.hpp:41
Slim wrapper around std::hash with specialization for pointer types.
Definition hash.hpp:25
Policy to configure the number of padding bytes to add to each entry in kirsch_kfifo_queue and kirsch...
Definition policy.hpp:117
Policy to configure the reclamation scheme to be used.
Definition policy.hpp:25