Skip to content

Commit 20a072c

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent d47d8f9 commit 20a072c

6 files changed

Lines changed: 85 additions & 8 deletions

File tree

cmake/dependencies.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ find_package(Unwind REQUIRED)
88
find_package(LZ4 REQUIRED)
99
find_package(OpenSSL REQUIRED)
1010
find_package(Boost REQUIRED)
11+
find_package(NUMA REQUIRED)
1112

1213
if (ENABLE_INPUT_PCAP)
1314
pkg_check_modules(PCAP REQUIRED libpcap)
@@ -19,7 +20,6 @@ endif()
1920

2021
if (ENABLE_INPUT_NFB)
2122
find_package(NFB REQUIRED)
22-
find_package(NUMA REQUIRED)
2323
endif()
2424

2525
if (ENABLE_OUTPUT_UNIREC OR ENABLE_NEMEA)

include/ipfixprobe/outputPlugin/outputStorage/outputStorageReader.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include "outputStorage.hpp"
4+
#include "threadAffinitySetter.hpp"
45

56
#include <memory>
67

include/ipfixprobe/outputPlugin/outputStorage/outputStorageWriter.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include "outputStorage.hpp"
4+
#include "threadAffinitySetter.hpp"
45

56
#include <memory>
67

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#pragma once
2+
3+
#include "threadUtils.hpp"
4+
5+
#include <iostream>
6+
#include <memory>
7+
#include <vector>
8+
9+
#include <numa.h>
10+
11+
namespace ipxp::output {
12+
13+
class ThreadAffinitySetter {
14+
public:
15+
static void setNumaNode(const size_t nodeIndex)
16+
{
17+
const uint16_t threadIndex = getThreadId();
18+
const std::size_t numaIndex = nodeIndex % m_architectureInfo.cpusByNumaNode.size();
19+
const std::size_t cpuIndex
20+
= threadIndex % m_architectureInfo.cpusByNumaNode[numaIndex].size();
21+
const int cpuToBind = m_architectureInfo.cpusByNumaNode[numaIndex][cpuIndex];
22+
23+
cpu_set_t cpuset;
24+
CPU_ZERO(&cpuset);
25+
CPU_SET(cpuToBind, &cpuset);
26+
const pthread_t current_thread = pthread_self();
27+
const int errCode = pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset);
28+
if (errCode != 0) {
29+
throw std::system_error(errCode, std::generic_category(), "Failed to pin thread");
30+
}
31+
}
32+
33+
private:
34+
struct ArchitectureInfo {
35+
ArchitectureInfo()
36+
{
37+
if (numa_available() == -1) {
38+
cpusByNumaNode
39+
= {std::views::iota(0, static_cast<int>(std::thread::hardware_concurrency()))
40+
| std::ranges::to<std::vector>()};
41+
return;
42+
}
43+
44+
const auto deleter = [](bitmask* mask) {
45+
if (mask)
46+
numa_free_cpumask(mask);
47+
};
48+
const int maxNodes = numa_max_node();
49+
for (int currentNode = 0; currentNode <= maxNodes; currentNode++) {
50+
std::unique_ptr<bitmask, decltype(deleter)> mask(numa_allocate_cpumask(), deleter);
51+
52+
if (numa_node_to_cpus(currentNode, mask.get()) != 0) {
53+
throw std::runtime_error(
54+
"Failed to get CPU mask for NUMA node " + std::to_string(currentNode));
55+
}
56+
auto cpusOfNode = std::views::iota(0, (int) mask->size)
57+
| std::views::filter([&](int cpuIndex) {
58+
return numa_bitmask_isbitset(mask.get(), cpuIndex);
59+
})
60+
| std::ranges::to<std::vector<int>>();
61+
cpusByNumaNode.push_back(std::move(cpusOfNode));
62+
}
63+
}
64+
65+
std::vector<std::vector<int>> cpusByNumaNode;
66+
};
67+
68+
static inline ArchitectureInfo m_architectureInfo;
69+
};
70+
71+
} // namespace ipxp::output

tests/performance/outputStorage/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ target_link_libraries(testOutputStorage PRIVATE
1717
GTest::gtest
1818
GTest::gtest_main
1919
ipfixprobe-core
20+
numa::numa
2021
)
2122

2223
add_test(NAME testOutputStorage COMMAND testOutputStorage)

tests/performance/outputStorage/testOutputStorage.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <outputStorage/mq2OutputStorage.hpp>
1919
#include <outputStorage/mqOutputStorage.hpp>
2020
#include <outputStorage/ringOutputStorage.hpp>
21+
#include <outputStorage/threadAffinitySetter.hpp>
2122
// #include <outputStorage/serializedOutputStorage.hpp>
2223
// #include <outputStorage/serializedOutputStorageBlocking.hpp>
2324
#include <outputStorage/outputStorageRegistrar.hpp>
@@ -67,6 +68,7 @@ void makeTest(
6768
for (DummyReader& reader : readerGroup) {
6869
readContainers.back().emplace_back(
6970
std::async(std::launch::async, [&reader, &readersLatch]() {
71+
ipxp::output::ThreadAffinitySetter::setNumaNode(0);
7072
readersLatch.count_down();
7173
return reader.readContainers();
7274
}));
@@ -76,12 +78,14 @@ void makeTest(
7678
readersLatch.wait();
7779
std::vector<std::future<void>> writerFutures
7880
= writers | std::views::transform([&](DummyWriter<OutputStorageType>& writer) {
79-
return std::async(std::launch::async, [&]() { writer.writeContainers(); });
81+
return std::async(std::launch::async, [&]() {
82+
ipxp::output::ThreadAffinitySetter::setNumaNode(0);
83+
writer.writeContainers();
84+
});
8085
})
8186
| std::ranges::to<std::vector<std::future<void>>>();
8287

8388
std::ranges::for_each(writerFutures, [](std::future<void>& future) { future.get(); });
84-
// writers.clear();
8589

8690
const std::vector<std::size_t> containersReadInGroups
8791
= readContainers
@@ -164,10 +168,6 @@ void shortTestLoop(const bool immitateWork)
164168
template<typename OutputStorageType>
165169
void makePerformanceTest(std::string_view storageName)
166170
{
167-
std::cout << "==========================================================" << std::endl;
168-
std::cout << storageName << ", 32 Writers, 4 Group 8 Reader\n";
169-
makeTest<OutputStorageType>(32, {8, 8, 8, 8}, false, 20'000'000);
170-
171171
std::cout << "==========================================================" << std::endl;
172172
std::cout << storageName << ", 1 Writers, 1 Reader\n";
173173
makeTest<OutputStorageType>(1, {1}, false, 80'000'000);
@@ -188,11 +188,14 @@ void makePerformanceTest(std::string_view storageName)
188188
std::cout << storageName << ", 4 Writers, 4 Group 1 Reader\n";
189189
makeTest<OutputStorageType>(4, {1, 1, 1, 1}, false, 80'000'000);
190190

191-
std::cout << std::endl;
191+
std::cout << "==========================================================" << std::endl;
192+
std::cout << storageName << ", 32 Writers, 4 Group 8 Reader\n";
193+
makeTest<OutputStorageType>(32, {8, 8, 8, 8}, false, 20'000'000);
192194
}
193195

194196
TEST(TestOutputStorage, XXX)
195197
{
198+
ipxp::output::ThreadAffinitySetter::setNumaNode(0);
196199
std::cout << "==========================================================" << std::endl;
197200
std::cout << "MQ2OutputStorage, 1 Writers, 1 Reader\n";
198201
makeTest<ipxp::output::MQ2OutputStorage<void*>>(1, {1}, false, 80'000'000);

0 commit comments

Comments
 (0)