sandbox
Loading...
Searching...
No Matches
spsc_queue.hpp
1// SPDX-License-Identifier: MIT
2#ifndef LIBSBX_CONTAINERS_SPSC_QUEUE_HPP_
3#define LIBSBX_CONTAINERS_SPSC_QUEUE_HPP_
4
5#include <atomic>
6
7#include <libsbx/utility/fast_mod.hpp>
8
9#include <libsbx/memory/cache.hpp>
10
11namespace sbx::containers {
12
13template<typename Type, std::size_t Capacity>
15
16public:
17
18 ~spsc_queue() {
19 auto tmp = Type{};
20
21 while (pop(tmp)) {
22
23 }
24 }
25
26 bool push(const Type& value) {
27 const auto head = _head.load(std::memory_order_relaxed);
28 const auto next = increment(head);
29
30 if (next == _tail.load(std::memory_order_acquire)) {
31 return false; // full
32 }
33
34 auto* slot = _slot(head);
35 std::construct_at(slot, value);
36
37 _head.store(next, std::memory_order_release);
38
39 return true;
40 }
41
42 bool pop(Type& out) {
43 const auto tail = _tail.load(std::memory_order_relaxed);
44
45 if (tail == _head.load(std::memory_order_acquire)) {
46 return false;
47 }
48
49 auto* element = _slot(tail);
50 out = std::move(*element);
51 std::destroy_at(element);
52
53 _tail.store(increment(tail), std::memory_order_release);
54
55 return true;
56 }
57
58private:
59
60 static constexpr auto increment(const std::size_t i) -> std::size_t {
61 return utility::fast_mod(i + 1, Capacity);
62 }
63
64 auto _slot(const std::size_t index) noexcept -> Type* {
65 return std::launder(reinterpret_cast<Type*>(_buffer.data()) + index);
66 }
67
68 auto _slot(const std::size_t index) const noexcept -> const Type* {
69 return std::launder(reinterpret_cast<const Type*>(_buffer.data()) + index);
70 }
71
72 alignas(memory::cacheline::size) std::atomic<std::size_t> _head{0};
73 alignas(memory::cacheline::size) std::atomic<std::size_t> _tail{0};
74 alignas(Type) std::array<std::byte, Capacity * sizeof(Type)> _buffer{};
75
76}; // class spsc_queue
77
78} // namespace sbx::containers
79
80#endif // LIBSBX_CONTAINERS_SPSC_QUEUE_HPP_
Definition: spsc_queue.hpp:14