Skip to content

Commit d85f688

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent d4092d2 commit d85f688

1 file changed

Lines changed: 174 additions & 0 deletions

File tree

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
#pragma once
2+
3+
// #include "bucketAllocator.hpp"
4+
#include "bOutputStorage.hpp"
5+
#include "backoffScheme.hpp"
6+
#include "fastRandomGenerator.hpp"
7+
#include "outputStorage.hpp"
8+
#include "spinlock.hpp"
9+
10+
#include <bit>
11+
#include <cstddef>
12+
#include <optional>
13+
#include <random>
14+
15+
#include <boost/container/static_vector.hpp>
16+
17+
namespace ipxp::output {
18+
19+
class B2OutputStorage : public BOutputStorage {
20+
public:
21+
explicit B2OutputStorage(const uint8_t writersCount) noexcept
22+
: BOutputStorage(writersCount)
23+
{
24+
}
25+
26+
bool storeContainer(ContainerWrapper container, const uint8_t writerIndex) noexcept override
27+
{
28+
WriterData& writerData = m_writersData[writerIndex].get();
29+
const uint16_t containersLeft = writerData.bucketAllocation.containersLeft();
30+
switch (containersLeft) {
31+
case 1:
32+
getNextContainer(writerData.bucketAllocation).assign(container, *m_allocationBuffer);
33+
[[fallthrough]];
34+
case 0:
35+
break;
36+
default:
37+
getNextContainer(writerData.bucketAllocation).assign(container, *m_allocationBuffer);
38+
return true;
39+
}
40+
41+
uint8_t loopCounter = 0;
42+
BackoffScheme backoffScheme(2, std::numeric_limits<std::size_t>::max());
43+
// const uint16_t initialPosition = writerData.writePosition;
44+
do {
45+
const bool overflowed = writerData.randomShift();
46+
d_writerShifts++;
47+
if (overflowed) {
48+
writerData.cachedLowestReaderGeneration = m_lowestReaderGeneration.load();
49+
if (containersLeft == 0) {
50+
container.deallocate(*m_allocationBuffer);
51+
}
52+
d_writerYields++;
53+
backoffScheme.backoff();
54+
}
55+
56+
if (m_buckets[writerData.writePosition].generation
57+
>= writerData.cachedLowestReaderGeneration
58+
|| !BucketAllocation::isValidBucketIndex(
59+
m_buckets[writerData.writePosition].bucketIndex)
60+
|| !m_buckets[writerData.writePosition].lock.try_lock()) {
61+
continue;
62+
}
63+
if (m_buckets[writerData.writePosition].generation
64+
>= writerData.cachedLowestReaderGeneration
65+
|| !BucketAllocation::isValidBucketIndex(
66+
m_buckets[writerData.writePosition].bucketIndex)) {
67+
m_buckets[writerData.writePosition].lock.unlock();
68+
continue;
69+
}
70+
break;
71+
} while (true);
72+
73+
m_buckets[writerData.writePosition].bucketIndex
74+
= writerData.bucketAllocation.reset(m_buckets[writerData.writePosition].bucketIndex);
75+
std::atomic_thread_fence(std::memory_order_release);
76+
77+
writerData.generation = m_highestReaderGeneration + WINDOW_SIZE;
78+
m_buckets[writerData.writePosition].generation = writerData.generation;
79+
80+
m_buckets[writerData.writePosition].lock.unlock();
81+
82+
if (containersLeft == 0) {
83+
getNextContainer(writerData.bucketAllocation).assign(container, *m_allocationBuffer);
84+
}
85+
return true;
86+
}
87+
88+
std::optional<ReferenceCounterHandler<OutputContainer>> getContainer(
89+
const std::size_t readerGroupIndex,
90+
const uint8_t localReaderIndex,
91+
const uint8_t globalReaderIndex) noexcept override
92+
{
93+
ReaderData& readerData = m_readersData[globalReaderIndex].get();
94+
// const uint64_t readPosition = readerData.readPosition;
95+
if (readerData.bucketAllocation.containersLeft()) {
96+
return std::make_optional<ReferenceCounterHandler<OutputContainer>>(
97+
getReferenceCounter(getNextContainer(readerData.bucketAllocation)));
98+
}
99+
100+
uint8_t loopCounter = 0;
101+
uint64_t cachedGeneration;
102+
uint16_t cachedBucketIndex;
103+
const uint16_t initialPosition = readerData.readPosition;
104+
do {
105+
readerData.shift(m_readerGroupSizes[readerGroupIndex], localReaderIndex);
106+
d_readerShifts++;
107+
108+
auto& y = m_buckets[readerData.readPosition];
109+
if (readerData.isOnBufferBegin(m_readerGroupSizes[readerGroupIndex])) {
110+
if (!writersPresent()) {
111+
readerData.generation++;
112+
updateLowestReaderGeneration(globalReaderIndex);
113+
return std::nullopt;
114+
}
115+
if (!readerData.seenValidBucket) {
116+
updateLowestReaderGeneration(globalReaderIndex);
117+
std::this_thread::yield();
118+
d_readerYields++;
119+
readerData.skipLoop = true;
120+
return std::nullopt;
121+
}
122+
readerData.generation++;
123+
readerData.seenValidBucket = false;
124+
readerData.skipLoop = false;
125+
updateLowestReaderGeneration(globalReaderIndex);
126+
}
127+
cachedGeneration = m_buckets[readerData.readPosition].generation;
128+
std::atomic_thread_fence(std::memory_order_acquire);
129+
cachedBucketIndex = m_buckets[readerData.readPosition].bucketIndex;
130+
if (cachedGeneration >= readerData.generation + 2) {
131+
readerData.seenValidBucket = true;
132+
}
133+
} while (cachedGeneration != readerData.generation
134+
|| !BucketAllocation::isValidBucketIndex(cachedBucketIndex));
135+
136+
readerData.seenValidBucket = true;
137+
readerData.bucketAllocation.reset(m_buckets[readerData.readPosition].bucketIndex);
138+
139+
return std::make_optional<ReferenceCounterHandler<OutputContainer>>(
140+
getReferenceCounter(getNextContainer(readerData.bucketAllocation)));
141+
}
142+
143+
bool finished([[maybe_unused]] const std::size_t readerGroupIndex) noexcept override
144+
{
145+
return !writersPresent() && getHighestWriterGeneration() < m_lowestReaderGeneration;
146+
}
147+
148+
protected:
149+
void updateLowestReaderGeneration(const uint8_t globalReaderIndex) noexcept
150+
{
151+
boost::container::static_vector<uint64_t, MAX_READERS_COUNT> readerGenerations
152+
= m_readersData
153+
| std::views::transform([](const CacheAlligned<ReaderData>& readerDataAlligned) {
154+
return readerDataAlligned->generation;
155+
})
156+
| std::ranges::to<boost::container::static_vector<uint64_t, MAX_READERS_COUNT>>();
157+
const uint64_t highestReaderGeneration = *std::ranges::max_element(readerGenerations);
158+
uint64_t expected;
159+
do {
160+
expected = m_highestReaderGeneration.load();
161+
if (highestReaderGeneration <= expected) {
162+
break;
163+
}
164+
} while (m_highestReaderGeneration.compare_exchange_weak(
165+
expected,
166+
highestReaderGeneration,
167+
std::memory_order_release));
168+
// m_highestReaderGeneration = highestReaderGeneration;
169+
const uint64_t lowestReaderGeneration = *std::ranges::min_element(readerGenerations);
170+
m_lowestReaderGeneration = lowestReaderGeneration;
171+
}
172+
};
173+
174+
} // namespace ipxp::output

0 commit comments

Comments
 (0)