Skip to content

Commit d24f68e

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent bcd5e2a commit d24f68e

3 files changed

Lines changed: 125 additions & 3 deletions

File tree

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#pragma once
2+
3+
#include "backoffScheme.hpp"
4+
#include "mcOutputStorage.hpp"
5+
#include "outputStorage.hpp"
6+
7+
namespace ipxp::output {
8+
9+
class MC2OutputStorage : public MCOutputStorage {
10+
public:
11+
explicit MC2OutputStorage(const uint8_t writersCount) noexcept
12+
: MCOutputStorage(writersCount)
13+
{
14+
}
15+
16+
bool storeContainer(ContainerWrapper container, const uint8_t writerId) noexcept override
17+
{
18+
Queue& queue = m_queues[writerId];
19+
const std::size_t writeIndex = queue.enqueCount % queue.storage.size();
20+
if (queue.enqueCount >= queue.storage.size()
21+
&& queue.enqueCount - queue.storage.size() >= queue.cachedLowestHeadIndex) {
22+
d_writerUpdated++;
23+
queue.cachedLowestHeadIndex = queue.lowestHeadIndex();
24+
}
25+
BackoffScheme backoffScheme(10, std::numeric_limits<std::size_t>::max());
26+
while (queue.enqueCount >= queue.storage.size()
27+
&& queue.enqueCount - queue.storage.size() >= queue.cachedLowestHeadIndex) {
28+
/*d_deallocatedContainers++;
29+
container.deallocate(*m_allocationBuffer);
30+
std::this_thread::yield();
31+
return false;*/
32+
backoffScheme.backoff();
33+
queue.cachedLowestHeadIndex = queue.lowestHeadIndex();
34+
}
35+
36+
std::atomic_thread_fence(std::memory_order_seq_cst);
37+
queue.storage[writeIndex].assign(container, *m_allocationBuffer);
38+
std::atomic_thread_fence(std::memory_order_seq_cst);
39+
queue.enqueCount++;
40+
return true;
41+
}
42+
43+
std::optional<ReferenceCounterHandler<OutputContainer>> getContainer(
44+
const std::size_t readerGroupIndex,
45+
const uint8_t localReaderIndex,
46+
const uint8_t globalReaderIndex) noexcept override
47+
{
48+
ReaderData& readerData = m_readersData[globalReaderIndex].get();
49+
if (readerData.lastReadSuccessful) {
50+
m_queues[readerData.lastQueueIndex % m_queues.size()]
51+
.groupData[readerGroupIndex]
52+
->confirmedIndex++;
53+
}
54+
55+
if (readerData.shiftQueue) {
56+
readerData.shiftQueue = false;
57+
readerData.lastQueueIndex++;
58+
// readerData.cachedEnqueCount = 0;
59+
}
60+
for (uint8_t queueShifts = 0; queueShifts < m_totalWritersCount; queueShifts++) {
61+
const uint8_t currentQueueIndex = readerData.lastQueueIndex % m_queues.size();
62+
Queue& queue = m_queues[currentQueueIndex];
63+
queue.sync(readerGroupIndex);
64+
const std::size_t dequeCount = queue.groupData[readerGroupIndex]->dequeueCount++;
65+
const std::size_t d_x = readerData.cachedEnqueCounts[currentQueueIndex];
66+
const std::size_t d_enqueCount = queue.enqueCount.load();
67+
if (dequeCount >= readerData.cachedEnqueCounts[currentQueueIndex]) {
68+
readerData.cachedEnqueCounts[currentQueueIndex] = queue.enqueCount;
69+
}
70+
const std::size_t d_y = readerData.cachedEnqueCounts[currentQueueIndex];
71+
if (dequeCount >= readerData.cachedEnqueCounts[currentQueueIndex]) {
72+
queue.groupData[readerGroupIndex]->dequeueCount--;
73+
readerData.lastQueueIndex++;
74+
readerData.readWithoutShift = 0;
75+
// readerData.cachedEnqueCount = 0;
76+
continue;
77+
}
78+
readerData.readWithoutShift++;
79+
// TODO originally was 256
80+
bool d_s = false;
81+
if (readerData.readWithoutShift == queue.storage.size()) {
82+
shiftAllQueues();
83+
d_s = true;
84+
}
85+
std::atomic_thread_fence(std::memory_order_seq_cst);
86+
const std::size_t readIndex
87+
= queue.groupData[readerGroupIndex]->headIndex++ % queue.storage.size();
88+
std::atomic_thread_fence(std::memory_order_seq_cst);
89+
90+
auto& y = queue.groupData[readerGroupIndex];
91+
if (readerData.cachedEnqueCounts[currentQueueIndex] > queue.enqueCount) {
92+
throw std::runtime_error("XXXXX");
93+
}
94+
if (queue.storage[readIndex].empty()) {
95+
throw std::runtime_error("Should not happen");
96+
}
97+
if (queue.storage[readIndex].getContainer().readTimes == 4) {
98+
throw std::runtime_error("Bad read times");
99+
}
100+
101+
readerData.lastReadSuccessful = true;
102+
return std::make_optional<ReferenceCounterHandler<OutputContainer>>(
103+
getReferenceCounter(queue.storage[readIndex]));
104+
}
105+
d_nulloptsReturned++;
106+
readerData.lastReadSuccessful = false;
107+
std::this_thread::yield();
108+
return std::nullopt;
109+
}
110+
111+
bool finished(const std::size_t readerGroupIndex) noexcept override
112+
{
113+
return !writersPresent()
114+
&& std::ranges::all_of(m_queues, [&](const Queue& queue) { return queue.finished(); });
115+
}
116+
117+
private:
118+
};
119+
120+
} // namespace ipxp::output

include/ipfixprobe/outputPlugin/outputStorage/mcOutputStorage.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ class MCOutputStorage : public OutputStorage {
157157
&& std::ranges::all_of(m_queues, [&](const Queue& queue) { return queue.finished(); });
158158
}
159159

160-
private:
160+
protected:
161161
struct ReaderData {
162162
// uint64_t cachedEnqueCount {0};
163163
std::atomic<uint16_t> readWithoutShift {0};

tests/performance/outputStorage/testOutputStorage.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <outputStorage/ffq2OutputStorage.hpp>
1414
#include <outputStorage/ffqOutputStorage.hpp>
1515
#include <outputStorage/lfnbOutputStorage.hpp>
16+
#include <outputStorage/mc2OutputStorage.hpp>
1617
#include <outputStorage/mcOutputStorage.hpp>
1718
#include <outputStorage/mqOutputStorage.hpp>
1819
#include <outputStorage/ringOutputStorage.hpp>
@@ -183,10 +184,11 @@ void makePerformanceTest(std::string_view storageName)
183184

184185
TEST(TestOutputStorage, XXX)
185186
{
187+
makePerformanceTest<ipxp::output::MC2OutputStorage>("MC2OutputStorage");
188+
makePerformanceTest<ipxp::output::MCOutputStorage>("MCOutputStorage");
186189
makePerformanceTest<ipxp::output::B2OutputStorage>("B2OutputStorage");
187190
makePerformanceTest<ipxp::output::BOutputStorage>("BOutputStorage");
188191
makePerformanceTest<ipxp::output::FFQ2OutputStorage>("FFQ2OutputStorage");
189-
makePerformanceTest<ipxp::output::MCOutputStorage>("MCOutputStorage");
190192
makePerformanceTest<ipxp::output::LFNBOutputStorage>("LFNBOutputStorage");
191193
makePerformanceTest<ipxp::output::FFQOutputStorage>("FFQOutputStorage");
192194

@@ -222,7 +224,7 @@ TEST(TestOutputStorage, Debug)
222224
for (const auto testIndex : std::views::iota(0, 100)) {
223225
std::cout << " Debug Loop Iteration " << testIndex << "\n";
224226
// makePerformanceTest<ipxp::output::MCOutputStorage>("MCOutputStorage");
225-
stressTest<ipxp::output::B2OutputStorage>(false);
227+
stressTest<ipxp::output::MC2OutputStorage>(false);
226228
}
227229
}
228230

0 commit comments

Comments
 (0)